Newer
Older
Vany Ingenzi
a validé
/**
* @brief The function is going to check if we need to send an ACK or NACK on the @pfd,
* and will send at most one packet [either an ACK or NACK or nothing].
*
* @param pfd: The pollfd struct that we used to get the file descriptor to send the messsage on.
* @param state: The current stae of the receiver.
*
* @return 0 upon success, else -1 with a message printed on the stderr.
*
* @modifies: state
*/
int send_if_inneed(struct pollfd * pfd, receiver_state_t * state)
Vany Ingenzi
a validé
// Priority to NACKs
int nack_idx = 0;
for(; nack_idx < RECV_MAX_SLCTV_RPT_WDW && state->nack_to_send[nack_idx] == NULL; nack_idx++);
char buffer[PKT_MIN_LEN];
size_t len = PKT_MIN_LEN;
if (nack_idx != RECV_MAX_SLCTV_RPT_WDW) /* There's a nack to send */
{
if (pkt_set_window(state->nack_to_send[nack_idx], state->curr_recv_window) != 0 ||
pkt_encode(state->nack_to_send[nack_idx], (char *) buffer, &len) != PKT_OK)
{
ERROR("Encoding NACK to send failed");
return -1;
}
size_t written = write(pfd->fd, (void *) buffer, len);
if ( written != len)
{
ERROR("When witing a NACK to socket, didn't written the correct len");
return -1;
}
DEBUG("Sent NACK that [%d] has been truncated", pkt_get_seqnum(state->nack_to_send[nack_idx]));
Vany Ingenzi
a validé
pkt_del(state->nack_to_send[nack_idx]);
Vany Ingenzi
a validé
state->stats->nack_sent++;
Vany Ingenzi
a validé
state->nack_to_send[nack_idx] = NULL;
} else if (state->ack_to_send != NULL) /* There's an ack to send */
{
if (pkt_set_window(state->ack_to_send, state->curr_recv_window) != 0 ||
pkt_encode(state->ack_to_send, (char *) buffer, &len) != PKT_OK ||
pkt_set_seqnum(state->ack_to_send, (state->last_received_in_order + 1 % TWO_EXP_EIGHT)))
Vany Ingenzi
a validé
{
Vany Ingenzi
a validé
return -1;
}
size_t written = write(pfd->fd, (void *) buffer, len);
Vany Ingenzi
a validé
{
ERROR("When writing ACK to socket, didn't write the correct len");
Vany Ingenzi
a validé
return -1;
}
DEBUG("Sent ACK saying we are waiting for %d, timestamp %d", pkt_get_seqnum(state->ack_to_send), pkt_get_timestamp(state->ack_to_send));
Vany Ingenzi
a validé
pkt_del(state->ack_to_send);
Vany Ingenzi
a validé
state->stats->ack_sent++;
Vany Ingenzi
a validé
state->ack_to_send = NULL;
}
return 0;
}
Vany Ingenzi
a validé
/**
* @brief Write the payload of the packet in the state->recvd_data_buf[seqnum_to_consume] to the stdout.
*
* @param state: The current stae of the receiver.
* @param seqnum_to_consume: The seqnum of the packet to consume.
*
* @return 0 upon success, else -1 with a message printed on the stderr.
*
* @modifies: state.
*/
int consume_data_pkt(receiver_state_t * state, uint8_t seqnum_to_consume)
Vany Ingenzi
a validé
{
ASSERT(state->recvd_data_buf[seqnum_to_consume] != NULL);
size_t written = fwrite((void *) pkt_get_payload(state->recvd_data_buf[seqnum_to_consume]), sizeof(char), (size_t) pkt_get_length(state->recvd_data_buf[seqnum_to_consume]), stdout);
if (written != pkt_get_length(state->recvd_data_buf[seqnum_to_consume]))
Vany Ingenzi
a validé
{
ERROR("Couldn't write the full packet content");
return -1;
}
fflush(stdout);
state->latest_timestamp = pkt_get_timestamp(state->recvd_data_buf[seqnum_to_consume]);
pkt_del(state->recvd_data_buf[seqnum_to_consume]);
state->recvd_data_buf[seqnum_to_consume] = NULL;
Vany Ingenzi
a validé
return 0;
Vany Ingenzi
a validé
/**
* @brief This function checks if 4 consecutive packets, starting from the positions_to_start, have been received.
*
* @param state: The current stae of the receiver.
* @param position_to_start: The position from which to start the inspection.
*
* @returns 1 Upon true else 0.
*/
uint16_t next_four_packets_received(receiver_state_t * state, uint16_t position_to_start)
{
/*
256 is dividable by 4 if the the sender respects the constraint of sending a FEC each 4 packets then
we just have to check increasingly only. Hence why the loop consition checks if the next 4 packets are received
and doesn't check the circular condition.
*/
ASSERT(position_to_start < TWO_EXP_EIGHT);
if ( position_to_start > TWO_EXP_EIGHT - FEC_CALCULATED_ON)
return 0;
for (uint16_t i = position_to_start; i < position_to_start + FEC_CALCULATED_ON; i++)
{
if (state->recvd_data_buf[i] == NULL)
return 0;
}
return 1;
}
/**
* @brief Checks if we can consume data from the buffer.
*
* @param state: The current receiver state.
* @param latest_packet_received: The latest valid packet we received
*
* @return The number of packets to consume.
* @modifies: state.
*/
uint16_t can_consume(receiver_state_t * state, const pkt_t * latest_packet_received){
// We have received the last 4 paquets harmless or we just received the last packet
int to_consume = 0;
ASSERT(state->next_to_consume >= 0 && state->next_to_consume < TWO_EXP_EIGHT);
if (pkt_get_length(latest_packet_received) == 0)
{ // Last packet we read everything from the buffer
for (uint16_t i = state->next_to_consume; state->recvd_data_buf[i] != NULL; i = (i + 1) % TWO_EXP_EIGHT )
to_consume++;
} else
{ // We only read blocks of 4 packets.
uint16_t start_to_consume = state->next_to_consume;
for (; next_four_packets_received(state, start_to_consume % TWO_EXP_EIGHT); start_to_consume+=4)
to_consume+=4;
}
return to_consume;
Vany Ingenzi
a validé
}
Vany Ingenzi
a validé
/**
* @brief Updates the state, upon new data entrance. This function is the core function that handles the writing of the
* date to the stdout. Updates the receiver window, last_received_in_order; etc.
*
* @param state: The current receiver state.
* @param pkt: The data packet that just been received.
*
* @returns: 0 upon success else -1 with an error message on the stderr.
*
* @modifies: state
*/
int update_buffer_upon_new_data(receiver_state_t * state, const pkt_t * pkt)
Vany Ingenzi
a validé
{
// Find free space
uint8_t seqnum = pkt_get_seqnum(pkt);
Vany Ingenzi
a validé
// New packet
if (state->recvd_data_buf[seqnum] == NULL)
Vany Ingenzi
a validé
{
pkt_t * pkt_to_store = pkt_new();
if (pkt_to_store == NULL)
return -1;
memcpy((void *) pkt_to_store, (void *) pkt, sizeof(pkt_t));
state->recvd_data_buf[seqnum] = pkt_to_store;
state->curr_recv_window = (state->curr_recv_window > 0) ? state->curr_recv_window-1 : 0;
Vany Ingenzi
a validé
} else {
state->stats->packet_duplicated++;
Vany Ingenzi
a validé
}
// Update last received in order
Vany Ingenzi
a validé
if (seqnum == (state->last_received_in_order + 1) % TWO_EXP_EIGHT)
Vany Ingenzi
a validé
state->last_received_in_order = seqnum;
for (;state->recvd_data_buf[idx] != NULL; idx = (idx + 1) % TWO_EXP_EIGHT)
uint16_t to_consume = can_consume(state, pkt);
if (to_consume == 0)
DEBUG("Going to consume the next %d packets.", to_consume);
while (to_consume > 0)
Vany Ingenzi
a validé
{
DEBUG("Consuming packet : %d | curr_recv_window = %d, recv_window_start = %d",
state->next_to_consume, state->curr_recv_window, state->recv_window_start);
if (consume_data_pkt(state, state->next_to_consume) != 0) return -1;
state->curr_recv_window = (state->curr_recv_window < RECV_MAX_SLCTV_RPT_WDW) ? state->curr_recv_window + 1 : RECV_MAX_SLCTV_RPT_WDW;
Vany Ingenzi
a validé
state->recv_window_start = (state->recv_window_start + 1) % TWO_EXP_EIGHT;
state->next_to_consume = (state->next_to_consume + 1) % TWO_EXP_EIGHT;
Vany Ingenzi
a validé
}
return 0;
}
/**
* @brief This function ensures that when the receiver gets the chance to send something on the connection
* there will be an ACK to send.
*
* @param state: The connection state structure.
Vany Ingenzi
a validé
* @param pkt: The packet for which we should prepare ack. Can be nullable in that case we just send the ack latest received.
*
* @returns 0 upon success else -1
*
* @modifies: @state.
*/
int prepare_ack_to_send(receiver_state_t * state)
Vany Ingenzi
a validé
if ( state->ack_to_send != NULL ) /* There's an unsent ack to send */
{
if (pkt_set_window(state->ack_to_send, state->curr_recv_window) != 0 ||
pkt_set_seqnum(state->ack_to_send, (state->last_received_in_order + 1) % TWO_EXP_EIGHT) != 0)
Vany Ingenzi
a validé
{
ERROR("When updating an ACK that hasn't been done.");
Vany Ingenzi
a validé
return -1;
}
} else /* We have to create an ack to send */
{
state->ack_to_send = pkt_new();
if (state->ack_to_send == NULL)
{
ERROR("Couldn't create a ACK for the new packet.");
return -1;
}
if (pkt_set_type(state->ack_to_send, PTYPE_ACK) != 0 || pkt_set_tr(state->ack_to_send, 0) ||
pkt_set_seqnum(state->ack_to_send, (state->last_received_in_order + 1) % TWO_EXP_EIGHT) != 0)
Vany Ingenzi
a validé
{
ERROR("Setting up a ACK packet to send.");
return -1;
}
}
if (pkt_set_timestamp(state->ack_to_send, state->latest_timestamp) != 0)
{
ERROR("Setting up timestamp for the packet");
return -1;
}
/**
* @brief This function handles PTYPE_DATA arriving packets and updates the state.
Vany Ingenzi
a validé
* @param state: The receiver state.
* @param pkt: The DATA packet.
*
* @returns 0 upon success else -1.
*
* @modifies: state.
*/
int handle_data_pkt(receiver_state_t * state, const pkt_t * pkt)
Vany Ingenzi
a validé
state->stats->data_received++;
DEBUG("Received data packet seqnum %d with timestamp %d | current_window_size : %d, current_window_start : %d",
pkt_get_seqnum(pkt), pkt_get_timestamp(pkt), state->curr_recv_window, state->recv_window_start);
if (update_buffer_upon_new_data(state, pkt) != 0) return -1;
if (prepare_ack_to_send(state) != 0) return -1;
state->transfer_done = ( pkt_get_length(pkt) == 0 && state->last_received_in_order == pkt_get_seqnum(pkt) ) ? 1 : 0;
DEBUG("Received the last packet");
return prepare_ack_to_send(state);
}
/* This is the last packet with length 0 and everything has been received */
if (state->last_packet == state->last_received_in_order)
state->transfer_done = 1;
/* This is the last packet that ACTUALY contains data */
if (pkt_get_timestamp(pkt) == SECOND_TO_LAST_PKT && state->last_received_in_order == pkt_get_seqnum(pkt))
state->last_data_packet = pkt_get_seqnum(pkt);
Vany Ingenzi
a validé
return 0;
Vany Ingenzi
a validé
/**
* @brief: This function uses the fec packet to recover the missing packet with missing_seqnum as seqnum.
*
* @param state: The receiver state.
* @param pkt: The FEC packet we just received.
* @param missing_seqnum: The packet we are going to recover from the fec packet.
*
* @returns 0 upon success else -1 with an error message on stderr.
*
* @modifies: state.
*/
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
int use_fec(receiver_state_t * state, const pkt_t * fec, uint16_t missing_seqnum)
{
pkt_t * new_packet = pkt_new();
if ( new_packet == NULL )
{
ERROR("An error occured when initiating a new packet in order to use FEC");
return -1;
}
uint16_t fec_seqnum = pkt_get_seqnum(fec);
pkt_set_type(new_packet, PTYPE_DATA);
pkt_set_seqnum(new_packet, missing_seqnum);
pkt_set_length(new_packet, pkt_get_length(fec));
for ( uint16_t idx = fec_seqnum; idx < fec_seqnum + FEC_CALCULATED_ON; idx++ )
{
if ( state->recvd_data_buf[idx] != NULL)
pkt_set_length(new_packet, pkt_get_length(new_packet) ^ pkt_get_length(state->recvd_data_buf[idx]));
}
uint16_t payload_length = pkt_get_length(new_packet);
uint8_t new_payload[payload_length];
uint8_t * fec_payload = (uint8_t *) pkt_get_payload(fec);
uint8_t * other_payloads[FEC_CALCULATED_ON-1];
for ( uint16_t idx = fec_seqnum, idx_other_payloads = 0; idx < fec_seqnum + FEC_CALCULATED_ON; idx++ )
{
if ( state->recvd_data_buf[idx] != NULL)
{
other_payloads[idx_other_payloads] = (uint8_t *) pkt_get_payload(state->recvd_data_buf[idx]);
idx_other_payloads++;
}
}
for ( uint16_t i = 0; i < payload_length; i++ )
{
new_payload[i] = fec_payload[i];
for ( uint16_t idx_other_payloads = 0; idx_other_payloads < FEC_CALCULATED_ON-1; idx_other_payloads++ )
new_payload[i] = new_payload[i] ^ other_payloads[idx_other_payloads][i];
}
pkt_set_payload(new_packet, (char *) new_payload, payload_length);
DEBUG("Used FEC packet [%d] to recover data packet with seqnum [%d] | The new packet payload length is %d",
fec_seqnum, missing_seqnum, payload_length);
// Add packet to received data buffer and treat the packet
// We pass the packet to the rest as if it had just came from the network
if (handle_data_pkt(state, new_packet) != 0) return -1;
pkt_del(new_packet);
Vany Ingenzi
a validé
state->stats->packet_recovered++;
Vany Ingenzi
a validé
/**
* @brief: This function checks whether fec with seqnum as fec_seqnum, can be used to recover
* a missing packet.
*
* @param state: The receiver state.
* @param fec_seqnum: The seqnum of the fec packet that we just received.
* @param missing_seqnum: The pointer to the variable to which put the missind_seqnum if the the fec can be used.
*
* @returns: 1 upon True else 0. if True missing_seqnum set to the packet that can be recoverd by the fec.
*/
int can_fec_be_used(receiver_state_t * state, uint16_t fec_seqnum, uint16_t * missing_seqnum)
{
uint16_t idx = fec_seqnum;
uint16_t in_range_missing = 0;
uint16_t in_range_available = 0;
for (; idx < fec_seqnum + FEC_CALCULATED_ON; idx++)
{
if (state->recvd_data_buf[idx] != NULL)
{
in_range_available++;
} else
{
in_range_missing++;
*missing_seqnum = idx;
}
}
return (in_range_available == FEC_CALCULATED_ON-1 && in_range_missing == 1);
}
Vany Ingenzi
a validé
/**
* @brief: This function checks the potential usage of the fec, if the fec can be used then it uses the fec else it doesn't.
*
* @param state: The receiver state.
* @param pkt: The fec packet.
*
* @returns: 0 upon success else -1
*/
int potential_usage_of_fec(receiver_state_t * state, const pkt_t * fec)
Vany Ingenzi
a validé
uint16_t fec_seqnum = pkt_get_seqnum(fec);
uint16_t missing_data_seqnum = 0;
if ( can_fec_be_used(state, fec_seqnum, &missing_data_seqnum) )
{
Vany Ingenzi
a validé
if ( use_fec(state, fec, missing_data_seqnum) != 0 ) return -1;
} else
{
DEBUG("Received FEC with seqnum [%d] but wasn't used", fec_seqnum);
}
return 0;
}
/**
* @brief This function handles PTYPE_FEC arriving packets and updates the state.
*
* @param state: The receiver state.
* @param pkt: The DATA packet.
* @returns 0 upon success else -1.
*
Vany Ingenzi
a validé
* @requires: The packet is in the current receiving window
*
* @modifies: state.
*/
int handle_fec_pkt(receiver_state_t * state, const pkt_t * pkt)
{
Vany Ingenzi
a validé
state->stats->fec_received++;
ASSERT(state != NULL && pkt != NULL);
uint8_t seqnum = pkt_get_seqnum(pkt);
if (seqnum + 3 < state->last_received_in_order )
{
DEBUG("Received FEC with seqnum [%d] but wasn't used since last received in order : %d",
pkt_get_seqnum(pkt), state->last_received_in_order);
Vany Ingenzi
a validé
state->stats->packet_ignored++;
return 0;
}
return potential_usage_of_fec(state, pkt);
Vany Ingenzi
a validé
}
* @brief This function handles the truncated packets and updates the state
* such that when it possible to send an NACK there will be atleast one NACK to send.
* @requires:
* - pkt was validly decoded by pkt_decoded().
* - state->nack_to_send[...] : To be considered as a FIFO queue
*
* @param state: The receiver state
* @param pkt: The received truncated packet
* @returns 0 upon success else -1
*
* @modifies: state
int handle_truncated_pkt(receiver_state_t * state, const pkt_t * pkt)
Vany Ingenzi
a validé
{
Vany Ingenzi
a validé
state->stats->data_truncated_received++;
DEBUG("Received a truncated packet with seqnum %d", pkt_get_seqnum(pkt));
int free_idx = 0;
for (; free_idx < RECV_MAX_SLCTV_RPT_WDW && state->nack_to_send[free_idx] != NULL; free_idx++);
Vany Ingenzi
a validé
pkt_t * pkt_to_send = pkt_new();
if (pkt_to_send == NULL) return -1;
if (pkt_set_type(pkt_to_send, PTYPE_NACK) != 0 || pkt_set_tr(pkt_to_send, 0) ||
pkt_set_seqnum(pkt_to_send, pkt_get_seqnum(pkt)) != 0)
{
ERROR("Setting up a NACK packet to send.");
return -1;
}
state->nack_to_send[free_idx] = pkt_to_send;
return 0;
/**
* @brief This function handles the mechanisms for a valid @pkt
* and modifies @state.
*
* @requires:
Vany Ingenzi
a validé
* - @pkt was validly decoded by pkt_decoded().l
*
* @param pkt: The packet received.
* @param state: The current connection state.
* @returns 0 Upon success and -1 upon failure.
*
* @modifies: @state
*/
int handle_valid_pkt(receiver_state_t * state, const pkt_t * pkt)
Vany Ingenzi
a validé
// Is it in the receive_window ?
uint8_t seqnum = pkt_get_seqnum(pkt);
int in_window = (seqnum >= state->recv_window_start);
if ( state->recv_window_start + RECV_MAX_SLCTV_RPT_WDW >= TWO_EXP_EIGHT )
in_window = in_window || ( seqnum < (state->recv_window_start + RECV_MAX_SLCTV_RPT_WDW) % TWO_EXP_EIGHT );
in_window = in_window && ( seqnum < (state->recv_window_start + RECV_MAX_SLCTV_RPT_WDW) % TWO_EXP_EIGHT );
Vany Ingenzi
a validé
if (!in_window)
DEBUG("Received packet [%d] Out of window with timestamp %d | receive window start at : %d (included) till %d (excluded)",
seqnum, pkt_get_timestamp(pkt), state->recv_window_start, (state->recv_window_start + RECV_MAX_SLCTV_RPT_WDW) % TWO_EXP_EIGHT);
Vany Ingenzi
a validé
uint8_t tr = pkt_get_tr(pkt);
return 0;
/**
* @brief This function handles all the necessary mechanisms when
* there's a message on. It requires a valid state.
*
* @warning Modifiers state
*
* @param pfd: The struct pollfd sur lequel cherche le fd et lire.
* @returns 0 upon success and -1 in cas of failure.
*/
int handle_incoming(struct pollfd * pfd, receiver_state_t * state)
char buffer[PKT_MAX_LEN];
ssize_t read_bytes = read(pfd->fd, (void *) buffer, PKT_MAX_LEN);
if (read_bytes == -1) return 0;
pkt_t * pkt = pkt_new();
if (pkt == NULL)
ERROR("Failed to get a new packet when handling incoming.");
return -1;
}
pkt_status_code pkt_status = pkt_decode(buffer, read_bytes, pkt);
if (pkt_status == 0)
if (handle_valid_pkt(state, (const pkt_t *) pkt) != 0) return -1;
DEBUG("Received a damaged packet with %d status. and seqnum as %d", pkt_status, pkt_get_seqnum(pkt));
// See if there's a FEC that can be used and update buffer
Vany Ingenzi
a validé
state->stats->packet_ignored++;
pkt_del(pkt);
return 0;
* @brief The main loop that loops priting the received data on the stdout
* till the whole package is received or there's a timeout.
*
* @param pfd: The pfd of the socket on which to receive file from.
* @param state: The buffer state.
* @returns As soons as the whole file is received.
void reception_loop(struct pollfd * pfd, receiver_state_t * state)
struct timeval current_time, last_packet_received;
while (state->ack_to_send != NULL || state->transfer_done == 0)
Vany Ingenzi
a validé
if (ready == -1 || pfd->revents & POLLERR) /* In case of error on socket */
if (handle_incoming(pfd, state) != 0)
return;
Vany Ingenzi
a validé
if (state->last_data_packet < TWO_EXP_EIGHT) /* This means we received last data packet transfer */
gettimeofday(&last_packet_received, NULL);
} else if (pfd->revents & POLLOUT)
{
if (send_if_inneed(pfd, state) != 0)
return;
Vany Ingenzi
a validé
if (state->last_data_packet < TWO_EXP_EIGHT) /* This is a signal we use between us in order to mark the last datapacket containing data */
{
gettimeofday(¤t_time, NULL);
if (current_time.tv_sec - last_packet_received.tv_sec > RECEIVER_INACTIVE_TIMEOUT)
{
DEBUG("Ended on timeout %d after all data packets have been sent", RECEIVER_INACTIVE_TIMEOUT);
return;
}
}
* @brief This function allocates enough memory and initiates the connection
receiver_state_t * state_new()
Vany Ingenzi
a validé
{
receiver_state_t * to_return = (receiver_state_t *) calloc(1, sizeof(receiver_state_t));
if (to_return == NULL) return NULL;
Vany Ingenzi
a validé
Vany Ingenzi
a validé
to_return->stats = calloc(sizeof(transfer_stats_t), 1);
if (to_return->stats == NULL)
{
free(to_return);
return NULL;
}
Vany Ingenzi
a validé
for (size_t i = 0; i < TWO_EXP_EIGHT; i++)
Vany Ingenzi
a validé
for (size_t i = 0; i < RECV_MAX_SLCTV_RPT_WDW; i++)
Vany Ingenzi
a validé
to_return->curr_recv_window = RECV_MAX_SLCTV_RPT_WDW;
to_return->recv_window_start = 0;
to_return->last_received_in_order = -1; // Sign indicating that it hasn't received something
to_return->ack_to_send = NULL; // Sign that there's no ack to send
to_return->transfer_done = 0;
to_return->next_to_consume = 0;
to_return->last_packet = 256; // No valid seqnum will reach this value, max value 255
Vany Ingenzi
a validé
to_return->last_data_packet = 256;
Vany Ingenzi
a validé
}
* @brief This function frees the memory allocated by the state_new() function.
Vany Ingenzi
a validé
*
void state_del(receiver_state_t * state)
Vany Ingenzi
a validé
{
Vany Ingenzi
a validé
for (int i = 0; i < 31; i++)
Vany Ingenzi
a validé
free(state->ack_to_send);
Vany Ingenzi
a validé
free(state);
Vany Ingenzi
a validé
}
Vany Ingenzi
a validé
void receiver_read_write_loop(int sfd, const char * pathname)
receiver_state_t * state = state_new();
struct pollfd * pfd = (struct pollfd *) calloc(1, sizeof(struct pollfd));
if (state == NULL || pfd == NULL)
{
Vany Ingenzi
a validé
if (state != NULL) state_del(state);
free(pfd);
return;
}
pfd->fd = sfd;
pfd->events = POLLIN | POLLOUT;
Vany Ingenzi
a validé
if (wait_for_client(sfd) == 0)
Vany Ingenzi
a validé
DEBUG("Done the transfer with done status being %s", (state->transfer_done) ? "true" : "false");
write_stats_to_file(pathname, state->stats, RECEIVER);
Vany Ingenzi
a validé
state_del(state);