Skip to content
Extraits de code Groupes Projets
Valider 972a7a7c rédigé par Samuel de Meester de Ravestein's avatar Samuel de Meester de Ravestein
Parcourir les fichiers

fix and cleaner sender

parent 7b98416e
Aucune branche associée trouvée
Aucune étiquette associée trouvée
Aucune requête de fusion associée trouvée
...@@ -109,6 +109,11 @@ int wait_for_client(int sfd) ...@@ -109,6 +109,11 @@ int wait_for_client(int sfd)
return 0; return 0;
} }
unsigned long long int time_milliseconds(struct timeval *time)
{
return ((unsigned long long int) time->tv_sec * 1000) + ((unsigned long long int) time->tv_usec / 1000);
}
void write_stats_to_file(const char * pathname, transfer_stats_t * stats_file, agents_t caller) void write_stats_to_file(const char * pathname, transfer_stats_t * stats_file, agents_t caller)
{ {
if (pathname == NULL || stats_file == NULL) if (pathname == NULL || stats_file == NULL)
...@@ -161,10 +166,10 @@ void write_stats_to_file(const char * pathname, transfer_stats_t * stats_file, a ...@@ -161,10 +166,10 @@ void write_stats_to_file(const char * pathname, transfer_stats_t * stats_file, a
if (caller == SENDER) if (caller == SENDER)
{ {
ret = sprintf((char *) &buffer, "min_rtt:%llu ms\n", stats_file->min_rtt); ret = sprintf((char *) &buffer, "min_rtt:%llu\n", stats_file->min_rtt);
ret = write(fd, buffer, strlen(buffer)); ret = write(fd, buffer, strlen(buffer));
ret = sprintf((char *) &buffer, "max_rtt:%llu ms\n", stats_file->max_rtt); ret = sprintf((char *) &buffer, "max_rtt:%llu\n", stats_file->max_rtt);
ret = write(fd, buffer, strlen(buffer)); ret = write(fd, buffer, strlen(buffer));
ret = sprintf((char *) &buffer, "packet_retransmitted:%llu\n", stats_file->packet_retransmitted); ret = sprintf((char *) &buffer, "packet_retransmitted:%llu\n", stats_file->packet_retransmitted);
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <sys/time.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <unistd.h> #include <unistd.h>
...@@ -41,7 +42,7 @@ typedef struct __attribute__((__packed__)) ...@@ -41,7 +42,7 @@ typedef struct __attribute__((__packed__))
typedef enum{ typedef enum{
SENDER, SENDER,
RECEIVER, RECEIVER,
}agents_t; } agents_t;
/** /**
...@@ -80,6 +81,14 @@ int create_socket(struct sockaddr_in6 *source_addr, int src_port, struct sockadd ...@@ -80,6 +81,14 @@ int create_socket(struct sockaddr_in6 *source_addr, int src_port, struct sockadd
*/ */
int wait_for_client(int sfd); int wait_for_client(int sfd);
/**
* @brief Return the in milliseconds
*
* @param time : Structure representing the in seconds an mico seconds
* @return long long int : return the time in milliseconds
*/
unsigned long long int time_milliseconds(struct timeval *time);
/** /**
* @brief Writes the content of the stats_file in the pathname file. * @brief Writes the content of the stats_file in the pathname file.
* *
......
...@@ -111,15 +111,14 @@ int main(int argc, char **argv) { ...@@ -111,15 +111,14 @@ int main(int argc, char **argv) {
struct timeval closing_pkt_sent_time; struct timeval closing_pkt_sent_time;
struct timeval curr_time; struct timeval curr_time;
while ((state->fec_enabled && state->last_pkt_sent != LAST_FEC) || (state->s_window_size != MAX_WINDOW_SIZE) || while ((state->last_pkt_sent != CLOSING_PKT) || (state->s_window_size != MAX_WINDOW_SIZE))
(!state->fec_enabled && state->last_pkt_sent != CLOSING_PKT))
{ {
// Blocking system call // Blocking system call
int rvalue = poll(pfd, 1, -1); // -1 means that there are no setted time out int rvalue = poll(pfd, 1, -1); // -1 means that there are no setted time out
if (rvalue == -1) if (rvalue == -1)
{ {
free(pfd); free(pfd);
state_del(state, stats_filename); state_del(state);
close(socket_fd); close(socket_fd);
close(sending_fd); close(sending_fd);
ERROR("poll function failed"); ERROR("poll function failed");
...@@ -128,20 +127,20 @@ int main(int argc, char **argv) { ...@@ -128,20 +127,20 @@ int main(int argc, char **argv) {
// Setting a timer only when waiting for the very last ACK // Setting a timer only when waiting for the very last ACK
gettimeofday(&curr_time, NULL); gettimeofday(&curr_time, NULL);
if (state->last_pkt_sent == LAST_FEC && ((curr_time.tv_sec - closing_pkt_sent_time.tv_sec) > SENDER_INACTIVE_TIMEOUT)) if (state->last_pkt_sent == CLOSING_PKT && ((time_milliseconds(&curr_time) - time_milliseconds(&closing_pkt_sent_time)) > SENDER_INACTIVE_TIMEOUT))
{ {
DEBUG("The sender hasn't received any news from the receiver for too long so it TIMEOUT"); DEBUG("The sender hasn't received any news from the receiver for too long so it TIMEOUT.");
break; break;
} }
if ((pfd->revents & POLLIN) && (pfd->revents & POLLOUT)) if ((pfd->revents & POLLIN) && (pfd->revents & POLLOUT))
{ {
DEBUG("The sender is reading from the socket"); DEBUG("The sender is reading from the socket.");
rvalue = handle_returning_ack_nack(state, socket_fd); rvalue = handle_returning_ack_nack(state, socket_fd);
if (rvalue == -1) if (rvalue == -1)
{ {
free(pfd); free(pfd);
state_del(state, stats_filename); state_del(state);
close(socket_fd); close(socket_fd);
close(sending_fd); close(sending_fd);
ERROR("handle_returning_pkt function failed"); ERROR("handle_returning_pkt function failed");
...@@ -155,26 +154,26 @@ int main(int argc, char **argv) { ...@@ -155,26 +154,26 @@ int main(int argc, char **argv) {
rvalue = read_and_send(state, sending_fd, socket_fd); rvalue = read_and_send(state, sending_fd, socket_fd);
if (rvalue == -1) if (rvalue == -1)
{ {
if (state->last_pkt_sent == LAST_FEC) if (state->fec_enabled && state->last_pkt_sent == CLOSING_PKT)
{ {
DEBUG("The very last PTYPE_FEC could not be send because the receiver probably disconnected"); DEBUG("The very last PTYPE_FEC could not be send because the receiver probably disconnected which is not a problem !");
break; break;
} }
else else
{ {
free(pfd); free(pfd);
state_del(state, stats_filename); state_del(state);
close(socket_fd); close(socket_fd);
close(sending_fd); close(sending_fd);
ERROR("read_and_send function failed"); ERROR("read_and_send function failed");
return EXIT_FAILURE; return EXIT_FAILURE;
} }
} }
// Let's start the timer for the last pkt sent // Let's start the timer after the closing pkt (PTYPE_DATA with length = 0)
if (state->last_pkt_sent == LAST_FEC) if (state->last_pkt_sent == CLOSING_PKT)
{ {
gettimeofday(&closing_pkt_sent_time, NULL); gettimeofday(&closing_pkt_sent_time, NULL);
DEBUG("A timer of -> %ds <- has started after sending the last FEC pkt !", SENDER_INACTIVE_TIMEOUT); DEBUG("A timer of -> %dms <- has started after sending the last FEC pkt !", SENDER_INACTIVE_TIMEOUT);
} }
} }
else if (pfd->revents & POLLOUT) else if (pfd->revents & POLLOUT)
...@@ -184,15 +183,15 @@ int main(int argc, char **argv) { ...@@ -184,15 +183,15 @@ int main(int argc, char **argv) {
{ {
// If an error occured when trying to send back the CLOSING_PKT (so when the last FEC has been sent), // If an error occured when trying to send back the CLOSING_PKT (so when the last FEC has been sent),
// we guess that the receiver has simply disconnected and the ACK of the CLOSING_PKT was lost. // we guess that the receiver has simply disconnected and the ACK of the CLOSING_PKT was lost.
if (state->last_pkt_sent == LAST_FEC) if (state->last_pkt_sent == CLOSING_PKT)
{ {
DEBUG("The sender can't send anything to the receiver anymore (which has probably disconnected) so it'll disconnect !"); DEBUG("The sender can't send anything to the receiver anymore (which has probably disconnected) so the sender will also disconnect !");
break; break;
} }
else else
{ {
free(pfd); free(pfd);
state_del(state, stats_filename); state_del(state);
close(socket_fd); close(socket_fd);
close(sending_fd); close(sending_fd);
ERROR("checking_timer function failed"); ERROR("checking_timer function failed");
...@@ -202,8 +201,14 @@ int main(int argc, char **argv) { ...@@ -202,8 +201,14 @@ int main(int argc, char **argv) {
} }
} }
DEBUG("Sender disconnected"); DEBUG("Sender disconnected");
if (stats_filename != NULL)
{
write_stats_to_file(stats_filename, state->stats, SENDER);
}
free(pfd); free(pfd);
state_del(state, stats_filename); state_del(state);
close(sending_fd); close(sending_fd);
close(socket_fd); close(socket_fd);
return EXIT_SUCCESS; return EXIT_SUCCESS;
......
#include "sender_utils.h" #include "sender_utils.h"
long long int time_milliseconds(struct timeval *time)
{
return ((long long int) time->tv_sec * 1000) + ((long long int) time->tv_usec / 1000);
}
sender_state_t *state_new(bool fec_enabled) sender_state_t *state_new(bool fec_enabled)
{ {
sender_state_t *state = (sender_state_t *) calloc(1, sizeof(sender_state_t)); sender_state_t *state = (sender_state_t *) calloc(1, sizeof(sender_state_t));
...@@ -37,10 +32,8 @@ sender_state_t *state_new(bool fec_enabled) ...@@ -37,10 +32,8 @@ sender_state_t *state_new(bool fec_enabled)
return state; return state;
} }
void state_del(sender_state_t *state, const char * pathname) void state_del(sender_state_t *state)
{ {
write_stats_to_file(pathname, state->stats, SENDER);
// To be sure, we free the pkt that might not have been freed (in case of an error or timeout) // To be sure, we free the pkt that might not have been freed (in case of an error or timeout)
for (uint8_t i = 0; i < WINDOW_SIZE; i++) for (uint8_t i = 0; i < WINDOW_SIZE; i++)
{ {
...@@ -69,7 +62,8 @@ bool can_send(sender_state_t *state) ...@@ -69,7 +62,8 @@ bool can_send(sender_state_t *state)
// it was the end of the file (so that I can set a timer for timeout) // it was the end of the file (so that I can set a timer for timeout)
return state->s_window_size == MAX_WINDOW_SIZE; return state->s_window_size == MAX_WINDOW_SIZE;
} }
else if (state->fec_enabled && (state->last_pkt_sent == CLOSING_PKT)) // Case: we're in FEC mode, the closing pkt has been sent but we will still send a last FEC if possible
else if ((state->fec_enabled) && (state->last_pkt_sent == CLOSING_PKT) && (state->FEC_nbr > 0))
{ {
return true; return true;
} }
...@@ -140,6 +134,7 @@ int handle_returning_ack_nack(sender_state_t *state, int socket_fd) ...@@ -140,6 +134,7 @@ int handle_returning_ack_nack(sender_state_t *state, int socket_fd)
} }
else else
{ {
state->stats->packet_retransmitted++;
DEBUG("The NACK with the seqnum: %d has been received, so the pkt will be sent back", seqnum); DEBUG("The NACK with the seqnum: %d has been received, so the pkt will be sent back", seqnum);
state->r_window_size = r_window - 1; // -1 Because the receiver doesn't count yet the sended pkt state->r_window_size = r_window - 1; // -1 Because the receiver doesn't count yet the sended pkt
pkt_t *n_pkt = state->buffer[place]; pkt_t *n_pkt = state->buffer[place];
...@@ -168,6 +163,7 @@ int handle_returning_ack_nack(sender_state_t *state, int socket_fd) ...@@ -168,6 +163,7 @@ int handle_returning_ack_nack(sender_state_t *state, int socket_fd)
} }
else else
{ {
state->stats->packet_retransmitted++;
DEBUG("The receiver is asking AGAIN, the seqnum: %d so the sender sends it back", seqnum_nack); DEBUG("The receiver is asking AGAIN, the seqnum: %d so the sender sends it back", seqnum_nack);
state->r_window_size = r_window - 1; // -1 Because the receiver doesn't count yet the sended pkt state->r_window_size = r_window - 1; // -1 Because the receiver doesn't count yet the sended pkt
pkt_t *n_pkt = state->buffer[place_last_nack]; pkt_t *n_pkt = state->buffer[place_last_nack];
...@@ -188,7 +184,7 @@ int handle_returning_ack_nack(sender_state_t *state, int socket_fd) ...@@ -188,7 +184,7 @@ int handle_returning_ack_nack(sender_state_t *state, int socket_fd)
{ {
struct timeval time; struct timeval time;
gettimeofday(&time, NULL); gettimeofday(&time, NULL);
unsigned long long int delta_time = time_milliseconds(&time) - time_milliseconds(&state->timers[state->tail]); unsigned long long int delta_time = time_milliseconds(&time) - time_milliseconds(&state->timers_first_send[state->tail]);
if (state->stats->min_rtt == 0 || state->stats->min_rtt > delta_time) if (state->stats->min_rtt == 0 || state->stats->min_rtt > delta_time)
{ {
state->stats->min_rtt = delta_time; state->stats->min_rtt = delta_time;
...@@ -212,6 +208,7 @@ int handle_returning_ack_nack(sender_state_t *state, int socket_fd) ...@@ -212,6 +208,7 @@ int handle_returning_ack_nack(sender_state_t *state, int socket_fd)
// Send back the asked ACK if there is one to send back // Send back the asked ACK if there is one to send back
if (place_last_nack != OUT_OFF_WINDOW) if (place_last_nack != OUT_OFF_WINDOW)
{ {
state->stats->packet_retransmitted++;
pkt_t *n_pkt = state->buffer[place_last_nack]; pkt_t *n_pkt = state->buffer[place_last_nack];
if (send_pkt(state, n_pkt, place_last_nack, socket_fd) == -1) return -1; if (send_pkt(state, n_pkt, place_last_nack, socket_fd) == -1) return -1;
} }
...@@ -230,6 +227,7 @@ int checking_timer(sender_state_t *state, int socket_fd) ...@@ -230,6 +227,7 @@ int checking_timer(sender_state_t *state, int socket_fd)
// When the timer is over, we send the packet back // When the timer is over, we send the packet back
if ((time_milliseconds(&time) - time_milliseconds(&state->timers[state->tail])) >= TIMER_LIMIT) if ((time_milliseconds(&time) - time_milliseconds(&state->timers[state->tail])) >= TIMER_LIMIT)
{ {
state->stats->packet_retransmitted++;
pkt_t *pkt = state->buffer[state->tail]; pkt_t *pkt = state->buffer[state->tail];
DEBUG("The pkt with seqnum: %d has timeout", pkt_get_seqnum(pkt)); DEBUG("The pkt with seqnum: %d has timeout", pkt_get_seqnum(pkt));
if (send_pkt(state, pkt, state->tail, socket_fd) == -1) return -1; if (send_pkt(state, pkt, state->tail, socket_fd) == -1) return -1;
...@@ -274,7 +272,6 @@ int send_FEC(sender_state_t *state, int socket_fd) ...@@ -274,7 +272,6 @@ int send_FEC(sender_state_t *state, int socket_fd)
{ {
if (state->last_pkt_sent == CLOSING_PKT) if (state->last_pkt_sent == CLOSING_PKT)
{ {
state->last_pkt_sent = LAST_FEC;
DEBUG("Sending LAST FEC pkt with seqnum: %d", pkt_get_seqnum(state->FEC)); DEBUG("Sending LAST FEC pkt with seqnum: %d", pkt_get_seqnum(state->FEC));
} }
else else
...@@ -306,7 +303,7 @@ int send_FEC(sender_state_t *state, int socket_fd) ...@@ -306,7 +303,7 @@ int send_FEC(sender_state_t *state, int socket_fd)
int read_and_send(sender_state_t *state, int sending_fd, int socket_fd) int read_and_send(sender_state_t *state, int sending_fd, int socket_fd)
{ {
// Checking whether I need to send a PTYPE_FEC or PTYPE_DATA // Checking whether I need to send a PTYPE_FEC or PTYPE_DATA
if ((state->fec_enabled) && (state->FEC_nbr == 4 || state->last_pkt_sent == CLOSING_PKT)) if ((state->fec_enabled) && ((state->FEC_nbr == 4) || ((state->last_pkt_sent == CLOSING_PKT) && (state->FEC_nbr > 0))))
{ {
return send_FEC(state, socket_fd); return send_FEC(state, socket_fd);
} }
...@@ -342,7 +339,7 @@ int read_and_send(sender_state_t *state, int sending_fd, int socket_fd) ...@@ -342,7 +339,7 @@ int read_and_send(sender_state_t *state, int sending_fd, int socket_fd)
{ {
pkt_set_timestamp(pkt, SECOND_TO_LAST_PKT); pkt_set_timestamp(pkt, SECOND_TO_LAST_PKT);
state->last_pkt_sent = LAST_DATA_PKT; state->last_pkt_sent = LAST_DATA_PKT;
DEBUG("The LAST DATATYPE is being sent !"); DEBUG("The LAST PTYPE_DATA is being sent !");
} }
else else
{ {
...@@ -364,6 +361,9 @@ int read_and_send(sender_state_t *state, int sending_fd, int socket_fd) ...@@ -364,6 +361,9 @@ int read_and_send(sender_state_t *state, int sending_fd, int socket_fd)
{ {
construct_FEC(state, pkt); construct_FEC(state, pkt);
} }
struct timeval timer;
gettimeofday(&timer, NULL);
state->timers_first_send[state->head] = timer;
state->map_seqnum_to_buffer_place[pkt_get_seqnum(pkt)] = state->head; state->map_seqnum_to_buffer_place[pkt_get_seqnum(pkt)] = state->head;
state->head = (state->head + 1) % WINDOW_SIZE; state->head = (state->head + 1) % WINDOW_SIZE;
......
...@@ -19,16 +19,16 @@ ...@@ -19,16 +19,16 @@
#define SEQNUM_RANGE 256 #define SEQNUM_RANGE 256
#define TIMER_LIMIT 2000 // It's in milli seconds, in this network, the latency to send is = [0, 2000ms] #define TIMER_LIMIT 2000 // It's in milli seconds, in this network, the latency to send is = [0, 2000ms]
#define SENDER_INACTIVE_TIMEOUT 30 // Only waits 30sec for the ACK of the CLOSING_PKT (the very last sended pkt) #define SENDER_INACTIVE_TIMEOUT 30000 // Only waits 30sec (it's in milliseconds) for the ACK of the CLOSING_PKT (the very last sended pkt)
#define WINDOW_SIZE 31 #define WINDOW_SIZE 31
#define OUT_OFF_WINDOW 255 #define OUT_OFF_WINDOW 255
// It is used to identify what kind of pkt was sent lately // It is used to identify what kind of PTYPE_DATA pkt was sent lately
// /!\ It doesn't trace the sended FEC !
typedef enum { typedef enum {
RANDOM_PKT = 0, RANDOM_PKT = 0,
LAST_DATA_PKT = 1, LAST_DATA_PKT = 1,
CLOSING_PKT = 2, CLOSING_PKT = 2
LAST_FEC = 3
} last_sended_pkt_t; } last_sended_pkt_t;
...@@ -38,13 +38,14 @@ typedef enum { ...@@ -38,13 +38,14 @@ typedef enum {
typedef struct state { typedef struct state {
uint8_t r_window_size; // receiver buffer space uint8_t r_window_size; // receiver buffer space
uint8_t s_window_size; // sender (our) buffer space uint8_t s_window_size; // sender (our) buffer space
struct timeval timers[WINDOW_SIZE]; // Time in seconds (corresponds to the timers of the sended pkt) struct timeval timers[WINDOW_SIZE]; // timeval struct corresponding to the last time the pkt was sended
struct timeval timers_first_send[WINDOW_SIZE]; // timeval struct corresponding to the first time the pkt was sended (only used for the stats)
pkt_t *buffer[WINDOW_SIZE]; // When the buffer fields are not used, they MUST be se to NULL pkt_t *buffer[WINDOW_SIZE]; // When the buffer fields are not used, they MUST be se to NULL
uint8_t head; // place last element insert +1 in the buffer (free place) | Head and tail are used to know uint8_t head; // place last element insert +1 in the buffer (free place) | Head and tail are used to know
uint8_t tail; // place oldest element insert in the buffer | the start and end of the sender window (of the sended pkt) uint8_t tail; // place oldest element insert in the buffer | the start and end of the sender window (of the sended pkt)
uint8_t next_seqnum; uint8_t next_seqnum;
uint8_t map_seqnum_to_buffer_place[SEQNUM_RANGE]; // Default value is: OUT_OFF_WINDOW uint8_t map_seqnum_to_buffer_place[SEQNUM_RANGE]; // Default value is: OUT_OFF_WINDOW
last_sended_pkt_t last_pkt_sent; // Can either be: RANDOM_PKT, LAST_DATA_PKT, CLOSING_PKT or LAST_FEC last_sended_pkt_t last_pkt_sent; // Can either be: RANDOM_PKT, LAST_DATA_PKT or CLOSING_PKT
bool fec_enabled; bool fec_enabled;
pkt_t *FEC; // The pkt FEC in construction pkt_t *FEC; // The pkt FEC in construction
uint8_t FEC_nbr; // The number of PTYPE_DATA stacked on FEC uint8_t FEC_nbr; // The number of PTYPE_DATA stacked on FEC
...@@ -64,9 +65,8 @@ sender_state_t *state_new(bool fec_enabled); ...@@ -64,9 +65,8 @@ sender_state_t *state_new(bool fec_enabled);
* @brief Deletion if the structure representing the sender state. * @brief Deletion if the structure representing the sender state.
* *
* @param state : The variable representing the sender (state). * @param state : The variable representing the sender (state).
* @param pathname
*/ */
void state_del(sender_state_t *state, const char * pathname); void state_del(sender_state_t *state);
/** /**
* @brief Checking if the sender is allowed to send a NEW pkt * @brief Checking if the sender is allowed to send a NEW pkt
...@@ -81,7 +81,7 @@ void state_del(sender_state_t *state, const char * pathname); ...@@ -81,7 +81,7 @@ void state_del(sender_state_t *state, const char * pathname);
bool can_send(sender_state_t *state); bool can_send(sender_state_t *state);
/** /**
* @brief Cautious this function is used for sending and sending back pkt ! * @brief Cautious this function is used for sending and sending back PTYPE_DATA pkt !
* *
* @param state : The variable representing the sender (state). * @param state : The variable representing the sender (state).
* @param pkt : The pkt to be sent. * @param pkt : The pkt to be sent.
......
0% Chargement en cours ou .
You are about to add 0 people to the discussion. Proceed with caution.
Terminez d'abord l'édition de ce message.
Veuillez vous inscrire ou vous pour commenter