Skip to content
Extraits de code Groupes Projets
Valider dab1a749 rédigé par Vany Ingenzi's avatar Vany Ingenzi
Parcourir les fichiers

Added function to write to statistics, added stat struct, added stats incrementing in the writer

parent 25a92034
Aucune branche associée trouvée
Aucune étiquette associée trouvée
Aucune requête de fusion associée trouvée
Fichier ajouté
Fichier ajouté
......@@ -107,4 +107,68 @@ int wait_for_client(int sfd)
ipv6_to_str_unexpanded(text, &(cast->sin6_addr));
DEBUG("Successfully connected to IPv6 addresss: %s, port : %d", text, ntohs(cast->sin6_port));
return 0;
}
void write_stats_to_file(const char * pathname, transfer_stats_t * stats_file)
{
if (pathname == NULL || stats_file == NULL)
return;
int fd = open(pathname, O_RDWR|O_CREAT|O_TRUNC);
if (fd == -1)
{
// We can use strerror cause here the error is known since we are using standard library open
DEBUG("%s", strerror(errno));
return;
}
char buffer[100];
sprintf((char *) &buffer, "data_sent:%llu\n", stats_file->data_sent);
write(fd, buffer, strlen(buffer));
sprintf((char *) &buffer, "data_received:%llu\n", stats_file->data_received);
write(fd, buffer, strlen(buffer));
sprintf((char *) &buffer, "data_truncated_received:%llu\n", stats_file->data_truncated_received);
write(fd, buffer, strlen(buffer));
sprintf((char *) &buffer, "fec_sent:%llu\n", stats_file->fec_sent);
write(fd, buffer, strlen(buffer));
sprintf((char *) &buffer, "fec_received:%llu\n", stats_file->fec_received);
write(fd, buffer, strlen(buffer));
sprintf((char *) &buffer, "ack_sent:%llu\n", stats_file->ack_sent);
write(fd, buffer, strlen(buffer));
sprintf((char *) &buffer, "ack_received:%llu\n", stats_file->ack_received);
write(fd, buffer, strlen(buffer));
sprintf((char *) &buffer, "nack_received:%llu\n", stats_file->nack_received);
write(fd, buffer, strlen(buffer));
sprintf((char *) &buffer, "packet_ignored:%llu\n", stats_file->packet_ignored);
write(fd, buffer, strlen(buffer));
#ifdef __RECEIVER_UTILS_
sprintf((char *) &buffer, "packet_duplicated:%llu\n", stats_file->packet_duplicated);
write(fd, buffer, strlen(buffer));
sprintf((char *) &buffer, "packet_recovered:%llu\n", stats_file->packet_recovered);
write(fd, buffer, strlen(buffer));
#endif
#ifdef __SENDER_UTILS_H_
sprintf((char *) &buffer, "min_rtt:%llu\n", stats_file->min_rtt);
write(fd, buffer, strlen(buffer));
sprintf((char *) &buffer, "max_rtt:%llu\n", stats_file->max_rtt);
write(fd, buffer, strlen(buffer));
sprintf((char *) &buffer, "packet_retransmitted:%llu\n", stats_file->packet_retransmitted);
write(fd, buffer, strlen(buffer));
#endif
close(fd);
DEBUG("Wrote the transfer statistics to %s.", pathname);
return;
}
\ No newline at end of file
......@@ -18,11 +18,35 @@
#include <sys/socket.h>
#include <unistd.h>
typedef struct __attribute__((__packed__))
{
unsigned long long int data_sent;
unsigned long long int data_received;
unsigned long long int data_truncated_received;
unsigned long long int fec_sent;
unsigned long long int fec_received;
unsigned long long int ack_sent;
unsigned long long int ack_received;
unsigned long long int nack_sent;
unsigned long long int nack_received;
unsigned long long int packet_ignored;
#ifdef __RECEIVER_UTILS_
unsigned long long int packet_duplicated;
unsigned long long int packet_recovered;
#endif
#ifdef __SENDER_UTILS_H_
unsigned long long int min_rtt;
unsigned long long int max_rtt;
unsigned long long int packet_retransmitted;
#endif
}transfer_stats_t;
/* Resolve the resource name to an usable IPv6 address
* @address: The name to resolve
* @rval: Where the resulting IPv6 address descriptor should be stored
* @return: NULL if it succeeded, or a pointer towards
/**
* @brief Resolve the resource name to an usable IPv6 address
* @param address: The name to resolve
* @param rval: Where the resulting IPv6 address descriptor should be stored
* @param return: NULL if it succeeded, or a pointer towards
* a string describing the error if any.
* (const char* means the caller cannot modify or free the return value,
* so do not use malloc!)
......@@ -30,25 +54,38 @@
const char * real_address(const char *address, struct sockaddr_in6 *rval);
/* Creates a socket and initialize it
* @source_addr: if !NULL, the source address that should be bound to this socket
* @src_port: if >0, the port on which the socket is listening
* @dest_addr: if !NULL, the destination address to which the socket should send data
* @dst_port: if >0, the destination port to which the socket should be connected
* @return: a file descriptor number representing the socket,
/**
* @brief: Creates a socket and initialize it
* @param source_addr: if !NULL, the source address that should be bound to this socket.
* @param src_port: if >0, the port on which the socket is listening.
* @param dest_addr: if !NULL, the destination address to which the socket should send data.
* @param dst_port: if >0, the destination port to which the socket should be connected.
*
* @return: a file descriptor number representing the socket
* or -1 in case of error (explanation will be printed on stderr)
*/
int create_socket(struct sockaddr_in6 *source_addr, int src_port, struct sockaddr_in6 *dest_addr, int dst_port);
/* Block the caller until a message is received on sfd,
/**
* @brief Block the caller until a message is received on sfd,
* and connect the socket to the source addresse of the received message
* @sfd: a file descriptor to a bound socket but not yet connected
* @return: 0 in case of success, -1 otherwise
* @POST: This call is idempotent, it does not 'consume' the data of the message,
* @param sfd: a file descriptor to a bound socket but not yet connected
* @param return: 0 in case of success, -1 otherwise
*
* @warning: This call is idempotent, it does not 'consume' the data of the message,
* and could be repeated several times blocking only at the first call.
*/
int wait_for_client(int sfd);
/**
* @brief Writes the content of the stats_file in the pathname file.
*
* @param pathname: The pathname to the stats file.
* @param stats_file: Pointer to the stats file output.
*
* @returns In case of an error it is printed on the stderr.
*/
void write_stats_to_file(const char * pathname, transfer_stats_t * stats_file);
#endif
\ No newline at end of file
......@@ -60,9 +60,8 @@ int main(int argc, char **argv) {
return (EXIT_FAILURE);
}
ASSERT(stats_filename == NULL);
receiver_read_write_loop(socket, (const char *) stats_filename);
receiver_read_write_loop(socket);
close(socket);
free(addr);
return EXIT_SUCCESS;
......
#include "receiver_utils.h"
/**
* @brief The function is going to check if we need to send an ACK or NACK on the @pfd,
* and will send at most one packet [either an ACK or NACK or nothing].
*
* @param pfd: The pollfd struct that we used to get the file descriptor to send the messsage on.
* @param state: The current stae of the receiver.
*
* @return 0 upon success, else -1 with a message printed on the stderr.
*
* @modifies: state
*/
int send_if_inneed(struct pollfd * pfd, receiver_state_t * state)
{
// Priority to NACKs
......@@ -24,6 +35,7 @@ int send_if_inneed(struct pollfd * pfd, receiver_state_t * state)
}
DEBUG("Sent NACK that [%d] has been truncated", pkt_get_seqnum(state->nack_to_send[nack_idx]));
pkt_del(state->nack_to_send[nack_idx]);
state->stats->nack_sent++;
state->nack_to_send[nack_idx] = NULL;
} else if (state->ack_to_send != NULL) /* There's an ack to send */
{
......@@ -42,11 +54,22 @@ int send_if_inneed(struct pollfd * pfd, receiver_state_t * state)
}
DEBUG("Sent ACK saying we are waiting for %d, timestamp %d", pkt_get_seqnum(state->ack_to_send), pkt_get_timestamp(state->ack_to_send));
pkt_del(state->ack_to_send);
state->stats->ack_sent++;
state->ack_to_send = NULL;
}
return 0;
}
/**
* @brief Write the payload of the packet in the state->recvd_data_buf[seqnum_to_consume] to the stdout.
*
* @param state: The current stae of the receiver.
* @param seqnum_to_consume: The seqnum of the packet to consume.
*
* @return 0 upon success, else -1 with a message printed on the stderr.
*
* @modifies: state.
*/
int consume_data_pkt(receiver_state_t * state, uint8_t seqnum_to_consume)
{
ASSERT(state->recvd_data_buf[seqnum_to_consume] != NULL);
......@@ -63,6 +86,14 @@ int consume_data_pkt(receiver_state_t * state, uint8_t seqnum_to_consume)
return 0;
}
/**
* @brief This function checks if 4 consecutive packets, starting from the positions_to_start, have been received.
*
* @param state: The current stae of the receiver.
* @param position_to_start: The position from which to start the inspection.
*
* @returns 1 Upon true else 0.
*/
uint16_t next_four_packets_received(receiver_state_t * state, uint16_t position_to_start)
{
/*
......@@ -105,10 +136,20 @@ uint16_t can_consume(receiver_state_t * state, const pkt_t * latest_packet_recei
for (; next_four_packets_received(state, start_to_consume % TWO_EXP_EIGHT); start_to_consume+=4)
to_consume+=4;
}
return to_consume;
}
/**
* @brief Updates the state, upon new data entrance. This function is the core function that handles the writing of the
* date to the stdout. Updates the receiver window, last_received_in_order; etc.
*
* @param state: The current receiver state.
* @param pkt: The data packet that just been received.
*
* @returns: 0 upon success else -1 with an error message on the stderr.
*
* @modifies: state
*/
int update_buffer_upon_new_data(receiver_state_t * state, const pkt_t * pkt)
{
// Find free space
......@@ -123,6 +164,8 @@ int update_buffer_upon_new_data(receiver_state_t * state, const pkt_t * pkt)
memcpy((void *) pkt_to_store, (void *) pkt, sizeof(pkt_t));
state->recvd_data_buf[seqnum] = pkt_to_store;
state->curr_recv_window = (state->curr_recv_window > 0) ? state->curr_recv_window-1 : 0;
} else {
state->stats->packet_duplicated++;
}
// Update last received in order
......@@ -157,7 +200,8 @@ int update_buffer_upon_new_data(receiver_state_t * state, const pkt_t * pkt)
* there will be an ACK to send.
*
* @param state: The connection state structure.
* @param pkt: The packet for which we should prepare ack. Can be nullable in that case we just send the ack latest received
* @param pkt: The packet for which we should prepare ack. Can be nullable in that case we just send the ack latest received.
*
* @returns 0 upon success else -1
*
* @modifies: @state.
......@@ -200,14 +244,16 @@ int prepare_ack_to_send(receiver_state_t * state)
/**
* @brief This function handles PTYPE_DATA arriving packets and updates the state.
*
* @param state: The receiver state
* @param pkt: The DATA packet
* @param state: The receiver state.
* @param pkt: The DATA packet.
*
* @returns 0 upon success else -1.
*
* @modifies: state.
*/
int handle_data_pkt(receiver_state_t * state, const pkt_t * pkt)
{
state->stats->data_received++;
DEBUG("Received data packet seqnum %d with timestamp %d | current_window_size : %d, current_window_start : %d",
pkt_get_seqnum(pkt), pkt_get_timestamp(pkt), state->curr_recv_window, state->recv_window_start);
......@@ -233,6 +279,17 @@ int handle_data_pkt(receiver_state_t * state, const pkt_t * pkt)
return 0;
}
/**
* @brief: This function uses the fec packet to recover the missing packet with missing_seqnum as seqnum.
*
* @param state: The receiver state.
* @param pkt: The FEC packet we just received.
* @param missing_seqnum: The packet we are going to recover from the fec packet.
*
* @returns 0 upon success else -1 with an error message on stderr.
*
* @modifies: state.
*/
int use_fec(receiver_state_t * state, const pkt_t * fec, uint16_t missing_seqnum)
{
pkt_t * new_packet = pkt_new();
......@@ -280,9 +337,20 @@ int use_fec(receiver_state_t * state, const pkt_t * fec, uint16_t missing_seqnum
if (handle_data_pkt(state, new_packet) != 0) return -1;
pkt_del(new_packet);
state->stats->packet_recovered++;
return 0;
}
/**
* @brief: This function checks whether fec with seqnum as fec_seqnum, can be used to recover
* a missing packet.
*
* @param state: The receiver state.
* @param fec_seqnum: The seqnum of the fec packet that we just received.
* @param missing_seqnum: The pointer to the variable to which put the missind_seqnum if the the fec can be used.
*
* @returns: 1 upon True else 0. if True missing_seqnum set to the packet that can be recoverd by the fec.
*/
int can_fec_be_used(receiver_state_t * state, uint16_t fec_seqnum, uint16_t * missing_seqnum)
{
uint16_t idx = fec_seqnum;
......@@ -302,13 +370,21 @@ int can_fec_be_used(receiver_state_t * state, uint16_t fec_seqnum, uint16_t * mi
return (in_range_available == FEC_CALCULATED_ON-1 && in_range_missing == 1);
}
int potential_usage_of_fec(receiver_state_t * state, const pkt_t * pkt)
/**
* @brief: This function checks the potential usage of the fec, if the fec can be used then it uses the fec else it doesn't.
*
* @param state: The receiver state.
* @param pkt: The fec packet.
*
* @returns: 0 upon success else -1
*/
int potential_usage_of_fec(receiver_state_t * state, const pkt_t * fec)
{
uint16_t fec_seqnum = pkt_get_seqnum(pkt);
uint16_t fec_seqnum = pkt_get_seqnum(fec);
uint16_t missing_data_seqnum = 0;
if ( can_fec_be_used(state, fec_seqnum, &missing_data_seqnum) )
{
if ( use_fec(state, pkt, missing_data_seqnum) != 0 ) return -1;
if ( use_fec(state, fec, missing_data_seqnum) != 0 ) return -1;
} else
{
DEBUG("Received FEC with seqnum [%d] but wasn't used", fec_seqnum);
......@@ -323,18 +399,20 @@ int potential_usage_of_fec(receiver_state_t * state, const pkt_t * pkt)
* @param pkt: The DATA packet.
* @returns 0 upon success else -1.
*
* @requires: - The packet is in the current receiving window
* @requires: The packet is in the current receiving window
*
* @modifies: state.
*/
int handle_fec_pkt(receiver_state_t * state, const pkt_t * pkt)
{
state->stats->fec_received++;
ASSERT(state != NULL && pkt != NULL);
uint8_t seqnum = pkt_get_seqnum(pkt);
if (seqnum + 3 < state->last_received_in_order )
{
DEBUG("Received FEC with seqnum [%d] but wasn't used since last received in order : %d",
pkt_get_seqnum(pkt), state->last_received_in_order);
state->stats->packet_ignored++;
return 0;
}
return potential_usage_of_fec(state, pkt);
......@@ -356,6 +434,7 @@ int handle_fec_pkt(receiver_state_t * state, const pkt_t * pkt)
*/
int handle_truncated_pkt(receiver_state_t * state, const pkt_t * pkt)
{
state->stats->data_truncated_received++;
DEBUG("Received a truncated packet with seqnum %d", pkt_get_seqnum(pkt));
if (pkt_get_type(pkt) != PTYPE_DATA) return 0;
......@@ -382,7 +461,7 @@ int handle_truncated_pkt(receiver_state_t * state, const pkt_t * pkt)
* and modifies @state.
*
* @requires:
* - @pkt was validly decoded by pkt_decoded().
* - @pkt was validly decoded by pkt_decoded().l
*
* @param pkt: The packet received.
* @param state: The current connection state.
......@@ -453,6 +532,7 @@ int handle_incoming(struct pollfd * pfd, receiver_state_t * state)
DEBUG("Received a damaged packet with %d status. and seqnum as %d", pkt_status, pkt_get_seqnum(pkt));
// See if there's a FEC that can be used and update buffer
pkt_del(pkt);
state->stats->packet_ignored++;
return 0;
}
pkt_del(pkt);
......@@ -474,7 +554,7 @@ void reception_loop(struct pollfd * pfd, receiver_state_t * state)
{
int ready = poll(pfd, 1, -1);
if (ready == -1 || pfd->revents & POLLERR) /* In case of error on socket */
if (ready == -1 || pfd->revents & POLLERR) /* In case of error on socket */
{
DEBUG("Error on socket");
return;
......@@ -484,7 +564,7 @@ void reception_loop(struct pollfd * pfd, receiver_state_t * state)
{
if (handle_incoming(pfd, state) != 0)
return;
if (state->last_data_packet < TWO_EXP_EIGHT) /* This means we received last data packet transfer */
if (state->last_data_packet < TWO_EXP_EIGHT) /* This means we received last data packet transfer */
gettimeofday(&last_packet_received, NULL);
} else if (pfd->revents & POLLOUT)
......@@ -493,7 +573,7 @@ void reception_loop(struct pollfd * pfd, receiver_state_t * state)
return;
}
if (state->last_data_packet < TWO_EXP_EIGHT)
if (state->last_data_packet < TWO_EXP_EIGHT) /* This is a signal we use between us in order to mark the last datapacket containing data */
{
gettimeofday(&current_time, NULL);
if (current_time.tv_sec - last_packet_received.tv_sec > RECEIVER_INACTIVE_TIMEOUT)
......@@ -515,6 +595,13 @@ receiver_state_t * state_new()
receiver_state_t * to_return = (receiver_state_t *) calloc(1, sizeof(receiver_state_t));
if (to_return == NULL) return NULL;
to_return->stats = calloc(sizeof(transfer_stats_t), 1);
if (to_return->stats == NULL)
{
free(to_return);
return NULL;
}
for (size_t i = 0; i < TWO_EXP_EIGHT; i++)
to_return->recvd_data_buf[i] = NULL;
......@@ -528,13 +615,14 @@ receiver_state_t * state_new()
to_return->transfer_done = 0;
to_return->next_to_consume = 0;
to_return->last_packet = 256; // No valid seqnum will reach this value, max value 255
to_return->last_data_packet = 256;
to_return->last_data_packet = 256;
return to_return;
}
/**
* @brief This function frees the memory allocated by the state_new() function.
* @param state: The structure to free.
*
*/
void state_del(receiver_state_t * state)
{
......@@ -546,12 +634,7 @@ void state_del(receiver_state_t * state)
free(state);
}
/**
* @brief This main loop for the receiver.
* @param sfd: A valid socket.
* @returns As soon as an error occurs or, a the total transfer came through.
*/
void receiver_read_write_loop(int sfd)
void receiver_read_write_loop(int sfd, const char * pathname)
{
receiver_state_t * state = state_new();
struct pollfd * pfd = (struct pollfd *) calloc(1, sizeof(struct pollfd));
......@@ -565,7 +648,9 @@ void receiver_read_write_loop(int sfd)
pfd->events = POLLIN | POLLOUT;
if (wait_for_client(sfd) == 0)
reception_loop(pfd, state);
DEBUG("Done with done status equal to %d", state->transfer_done);
DEBUG("Done the transfer with done status being %s", (state->transfer_done) ? "true" : "false");
write_stats_to_file(pathname, state->stats);
state_del(state);
free(pfd);
}
\ No newline at end of file
......@@ -48,15 +48,18 @@ typedef struct __attribute__((__packed__))
uint16_t last_packet;
uint16_t last_data_packet;
uint16_t next_to_consume;
transfer_stats_t * stats;
} receiver_state_t;
/**
* Loop reading on socket and printing to the stdout
* @brief Loop reading on socket and printing to the stdout
* @param sfd : The socket file descriptor. It is both bound and connected.
* @return: as soon as the whole transfer is done.
* @param pathname: Pathname to where write the stats.
*
* @returns As soon as an error occurs or, the total transfer came through.
*/
void receiver_read_write_loop(int sfd);
void receiver_read_write_loop(int sfd, const char * pathname);
/**
* Creates and initialize a receiver_state_t structure
......
......@@ -21,7 +21,10 @@ mkdir "${TEST_OUTPUT_FILES}/" 2>/dev/null
touch "${TEST_OUTPUT_FILES}/adv_${BSN_PRE}_received_file.${BSN_EXT}" \
"${TEST_OUTPUT_FILES}/adv_valgrind_${BSN_PRE}_receiver.log" \
"${TEST_OUTPUT_FILES}/adv_valgrind_${BSN_PRE}_sender.log"
"${TEST_OUTPUT_FILES}/adv_valgrind_${BSN_PRE}_sender.log" \
"${TEST_OUTPUT_FILES}/${BSN_PRE}_receiver_stats.csv" \
"${TEST_OUTPUT_FILES}/${BSN_PRE}_sender_stats.csv"
# The next 2 lines come from: https://unix.stackexchange.com/questions/55913/whats-the-easiest-way-to-find-an-unused-local-port
......@@ -35,7 +38,7 @@ port2=$(comm -23 <(seq 65000 65200 | sort) <(ss -Htan | awk '{print $4}' | cut -
# We launch the receiver and capture its output
valgrind --leak-check=full --log-file=${TEST_OUTPUT_FILES}/adv_valgrind_${BSN_PRE}_receiver.log \
./receiver ::1 $port1 1> ${TEST_OUTPUT_FILES}/adv_${BSN_PRE}_received_file.${BSN_EXT} \
./receiver ::1 $port1 -s ${TEST_OUTPUT_FILES}/${BSN_PRE}_receiver_stats.csv 1> ${TEST_OUTPUT_FILES}/adv_${BSN_PRE}_received_file.${BSN_EXT} \
2> ${TEST_OUTPUT_FILES}/adv_${BSN_PRE}_receiver.log & receiver_pid=$!
cleanup()
......@@ -48,7 +51,7 @@ trap cleanup SIGINT # Kill the background procces in case of ^-C
# We start the transfer
if ! valgrind --leak-check=full --log-file=${TEST_OUTPUT_FILES}/adv_valgrind_${BSN_PRE}_sender.log \
./sender -f ${FILENAME} ::1 $port2 2> ${TEST_OUTPUT_FILES}/adv_${BSN_PRE}_sender.log ; then
./sender -f ${FILENAME} ::1 $port2 -s ${TEST_OUTPUT_FILES}/${BSN_PRE}_sender_stats.csv 2> ${TEST_OUTPUT_FILES}/adv_${BSN_PRE}_sender.log ; then
echo "Crash du sender!"
cat ${TEST_OUTPUT_FILES}/adv_${BSN_PRE}_sender.log
err=1 # We record the error
......
......@@ -32,7 +32,7 @@ port=$(comm -23 <(seq 65000 65200 | sort) <(ss -Htan | awk '{print $4}' | cut -d
# We launch the receiver and capture its output
valgrind --leak-check=full --log-file=${TEST_OUTPUT_FILES}/valgrind_${BSNM_PRE}_receiver.log \
./receiver ::1 $port 1> ${TEST_OUTPUT_FILES}/${BSNM_PRE}_received_file.${BSNM_EXT} \
./receiver ::1 $port -s ${TEST_OUTPUT_FILES}/${BSNM_PRE}_receiver_stats.csv 1> ${TEST_OUTPUT_FILES}/${BSNM_PRE}_received_file.${BSNM_EXT} \
2> ${TEST_OUTPUT_FILES}/${BSNM_PRE}_receiver.log & receiver_pid=$!
cleanup()
......@@ -45,7 +45,7 @@ trap cleanup SIGINT # Kill the background procces in case of ^-C
# We start the transfer
if ! valgrind --leak-check=full --log-file=${TEST_OUTPUT_FILES}/valgrind_${BSNM_PRE}_sender.log \
./sender -f ${FILENAME} ::1 $port 2> ${TEST_OUTPUT_FILES}/${BSNM_PRE}_sender.log ; then
./sender -f ${FILENAME} ::1 $port -s ${TEST_OUTPUT_FILES}/${BSNM_PRE}_sender_stats.csv 2> ${TEST_OUTPUT_FILES}/${BSNM_PRE}_sender.log ; then
echo "The sender crashed!"
cat ${TEST_OUTPUT_FILES}/${BSNM_PRE}_sender.log
err=1 # We record the error
......
0% Chargement en cours ou .
You are about to add 0 people to the discussion. Proceed with caution.
Terminez d'abord l'édition de ce message.
Veuillez vous inscrire ou vous pour commenter