Newer
Older
int send_if_inneed(struct pollfd * pfd, receiver_state_t * state)
Vany Ingenzi
a validé
// Priority to NACKs
int nack_idx = 0;
for(; nack_idx < RECV_MAX_SLCTV_RPT_WDW && state->nack_to_send[nack_idx] == NULL; nack_idx++);
char buffer[PKT_MIN_LEN];
size_t len = PKT_MIN_LEN;
if (nack_idx != RECV_MAX_SLCTV_RPT_WDW) /* There's a nack to send */
{
if (pkt_set_window(state->nack_to_send[nack_idx], state->curr_recv_window) != 0 ||
pkt_encode(state->nack_to_send[nack_idx], (char *) buffer, &len) != PKT_OK)
{
ERROR("Encoding NACK to send failed");
return -1;
}
size_t written = write(pfd->fd, (void *) buffer, len);
if ( written != len)
{
ERROR("When witing a NACK to socket, didn't written the correct len");
return -1;
}
DEBUG("Sent NACK that [%d] has been truncated", pkt_get_seqnum(state->nack_to_send[nack_idx]));
Vany Ingenzi
a validé
DEBUG_DUMP(buffer, len);
pkt_del(state->nack_to_send[nack_idx]);
state->nack_to_send[nack_idx] = NULL;
} else if (state->ack_to_send != NULL) /* There's an ack to send */
{
if (pkt_set_window(state->ack_to_send, state->curr_recv_window) != 0 ||
pkt_encode(state->ack_to_send, (char *) buffer, &len) != PKT_OK ||
pkt_set_seqnum(state->ack_to_send, (state->last_received_in_order + 1 % TWO_EXP_EIGHT)))
Vany Ingenzi
a validé
{
Vany Ingenzi
a validé
return -1;
}
size_t written = write(pfd->fd, (void *) buffer, len);
Vany Ingenzi
a validé
{
ERROR("When writing ACK to socket, didn't write the correct len");
Vany Ingenzi
a validé
return -1;
}
DEBUG("Sent ACK saying we are waiting for %d, timestamp %d", pkt_get_seqnum(state->ack_to_send), pkt_get_timestamp(state->ack_to_send));
Vany Ingenzi
a validé
pkt_del(state->ack_to_send);
state->ack_to_send = NULL;
}
return 0;
}
int consume_data_pkt(receiver_state_t * state, uint8_t seqnum_to_consume)
Vany Ingenzi
a validé
{
ASSERT(state->recvd_buf[seqnum_to_consume] != NULL);
Vany Ingenzi
a validé
size_t written = fwrite((void *) pkt_get_payload(state->recvd_buf[seqnum_to_consume]), sizeof(char), (size_t) pkt_get_length(state->recvd_buf[seqnum_to_consume]), stdout);
if (written != pkt_get_length(state->recvd_buf[seqnum_to_consume]))
{
ERROR("Couldn't write the full packet content");
return -1;
}
fflush(stdout);
state->latest_timestamp = pkt_get_timestamp(state->recvd_buf[seqnum_to_consume]);
Vany Ingenzi
a validé
pkt_del(state->recvd_buf[seqnum_to_consume]);
state->recvd_buf[seqnum_to_consume] = NULL;
return 0;
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
}
uint16_t next_four_packets_received(receiver_state_t * state, uint16_t position_to_start)
{
/*
256 is dividable by 4 if the the sender respects the constraint of sending a FEC each 4 packets then
we just have to check increasingly only. Hence why the loop consition checks if the next 4 packets are received
and doesn't check the circular condition.
*/
ASSERT(position_to_start < TWO_EXP_EIGHT);
if ( position_to_start > TWO_EXP_EIGHT - FEC_CALCULATED_ON)
return 0;
for (uint16_t i = position_to_start; i < position_to_start + FEC_CALCULATED_ON; i++)
{
if (state->recvd_buf[i] == NULL)
return 0;
}
return 1;
}
/**
* @brief Checks if we can consume data from the buffer.
*
* @param state: The current receiver state.
* @param latest_packet_received: The latest valid packet we received
*
* @return The number of packets to consume.
* @modifies: state.
*/
uint16_t can_consume(receiver_state_t * state, const pkt_t * latest_packet_received){
// We have received the last 4 paquets harmless or we just received the last packet
int to_consume = 0;
ASSERT(state->next_to_consume >= 0 && state->next_to_consume < TWO_EXP_EIGHT);
if (pkt_get_length(latest_packet_received) == 0)
{ // Last packet we read everything from the buffer
for (uint16_t i = state->next_to_consume; state->recvd_buf[i] != NULL; i = (i + 1) % TWO_EXP_EIGHT )
to_consume++;
} else
{ // We only read blocks of 4 packets.
uint16_t start_to_consume = state->next_to_consume;
for (; next_four_packets_received(state, start_to_consume % TWO_EXP_EIGHT); start_to_consume+=4)
to_consume+=4;
}
return to_consume;
Vany Ingenzi
a validé
}
int update_buffer_upon_new_data(receiver_state_t * state, const pkt_t * pkt)
Vany Ingenzi
a validé
{
// Find free space
uint8_t seqnum = pkt_get_seqnum(pkt);
Vany Ingenzi
a validé
// New packet
if (state->recvd_buf[seqnum] == NULL)
{
pkt_t * pkt_to_store = pkt_new();
if (pkt_to_store == NULL)
return -1;
memcpy((void *) pkt_to_store, (void *) pkt, sizeof(pkt_t));
state->recvd_buf[seqnum] = pkt_to_store;
state->curr_recv_window = (state->curr_recv_window > 0) ? state->curr_recv_window-1 : 0;
Vany Ingenzi
a validé
}
// Update last received in order
Vany Ingenzi
a validé
if (seqnum == (state->last_received_in_order + 1) % TWO_EXP_EIGHT)
Vany Ingenzi
a validé
state->last_received_in_order = seqnum;
uint16_t idx = seqnum;
for (;state->recvd_buf[idx] != NULL; idx = (idx + 1) % TWO_EXP_EIGHT)
state->last_received_in_order = idx;
uint16_t to_consume = can_consume(state, pkt);
if (to_consume == 0)
DEBUG("Going to consume the next %d packets.", to_consume);
while (to_consume > 0)
Vany Ingenzi
a validé
{
DEBUG("Consuming packet : %d | curr_recv_window = %d, recv_window_start = %d",
state->next_to_consume, state->curr_recv_window, state->recv_window_start);
if (consume_data_pkt(state, state->next_to_consume) != 0) return -1;
state->curr_recv_window = (state->curr_recv_window < RECV_MAX_SLCTV_RPT_WDW) ? state->curr_recv_window + 1 : RECV_MAX_SLCTV_RPT_WDW;
Vany Ingenzi
a validé
state->recv_window_start = (state->recv_window_start + 1) % TWO_EXP_EIGHT;
state->next_to_consume = (state->next_to_consume + 1) % TWO_EXP_EIGHT;
Vany Ingenzi
a validé
}
return 0;
}
/**
* @brief This function ensures that when the receiver gets the chance to send something on the connection
* there will be an ACK to send.
*
* @param state: The connection state structure.
* @param pkt: The packet for which we should prepare ack. Can be nullable in that case we just send the ack latest received
* @returns 0 upon success else -1
*
* @modifies: @state.
*/
int prepare_ack_to_send(receiver_state_t * state)
Vany Ingenzi
a validé
if ( state->ack_to_send != NULL ) /* There's an unsent ack to send */
{
if (pkt_set_window(state->ack_to_send, state->curr_recv_window) != 0 ||
pkt_set_seqnum(state->ack_to_send, (state->last_received_in_order + 1) % TWO_EXP_EIGHT) != 0)
Vany Ingenzi
a validé
{
ERROR("When updating an ACK that hasn't been done.");
Vany Ingenzi
a validé
return -1;
}
} else /* We have to create an ack to send */
{
state->ack_to_send = pkt_new();
if (state->ack_to_send == NULL)
{
ERROR("Couldn't create a ACK for the new packet.");
return -1;
}
if (pkt_set_type(state->ack_to_send, PTYPE_ACK) != 0 || pkt_set_tr(state->ack_to_send, 0) ||
pkt_set_seqnum(state->ack_to_send, (state->last_received_in_order + 1) % TWO_EXP_EIGHT) != 0)
Vany Ingenzi
a validé
{
ERROR("Setting up a ACK packet to send.");
return -1;
}
}
if (pkt_set_timestamp(state->ack_to_send, state->latest_timestamp) != 0)
{
ERROR("Setting up timestamp for the packet");
return -1;
}
* @brief This function handles PTYPE_FEC arriving packets and updates the state.
*
* @param state: The receiver state.
* @param pkt: The DATA packet.
* @returns 0 upon success else -1.
*
* @modifies: state.
*/
int handle_fec_pkt(receiver_state_t * state, const pkt_t * pkt)
{
ASSERT(state != NULL && pkt != NULL);
if (state->last_received_in_order > pkt_get_seqnum(pkt))
{
DEBUG("Received FEC with seqnum %d but wasn't used since last received in order is %d", pkt_get_seqnum(pkt), state->last_received_in_order);
return 0;
}
// Add FEC to state buffer of fec
// See if there's FEC that can be used and update buffer
return 0;
}
/**
* @brief This function handles PTYPE_DATA arriving packets and updates the state.
*
* @param state: The receiver state
* @param pkt: The DATA packet
* @returns 0 upon success else -1.
*
* @modifies: state.
*/
int handle_data_pkt(receiver_state_t * state, const pkt_t * pkt)
DEBUG("Received data packet seqnum %d with timestamp %d | current_window_size : %d, current_window_start : %d",
pkt_get_seqnum(pkt), pkt_get_timestamp(pkt), state->curr_recv_window, state->recv_window_start);
if (update_buffer_upon_new_data(state, pkt) != 0) return -1;
if (prepare_ack_to_send(state) != 0) return -1;
state->transfer_done = ( pkt_get_length(pkt) == 0 && state->last_received_in_order == pkt_get_seqnum(pkt) ) ? 1 : 0;
DEBUG("Received the last packet");
return prepare_ack_to_send(state);
}
/* This is the last packet with length 0 and everything has been received */
if (state->last_packet == state->last_received_in_order)
state->transfer_done = 1;
/* This is the last packet that ACTUALY contains data */
if (pkt_get_timestamp(pkt) == SECOND_TO_LAST_PKT && state->last_received_in_order == pkt_get_seqnum(pkt))
state->last_data_packet = pkt_get_seqnum(pkt);
Vany Ingenzi
a validé
return 0;
}
* @brief This function handles the truncated packets and updates the state
* such that when it possible to send an NACK there will be atleast one NACK to send.
* @requires:
* - pkt was validly decoded by pkt_decoded().
* - state->nack_to_send[...] : To be considered as a FIFO queue
*
* @param state: The receiver state
* @param pkt: The received truncated packet
* @returns 0 upon success else -1
*
* @modifies: state
int handle_truncated_pkt(receiver_state_t * state, const pkt_t * pkt)
Vany Ingenzi
a validé
{
int free_idx = 0;
for (; free_idx < RECV_MAX_SLCTV_RPT_WDW && state->nack_to_send[free_idx] != NULL; free_idx++);
if ( free_idx == 0 ) return 0;
Vany Ingenzi
a validé
pkt_t * pkt_to_send = pkt_new();
if (pkt_to_send == NULL) return -1;
if (pkt_set_type(pkt_to_send, PTYPE_NACK) != 0 || pkt_set_tr(pkt_to_send, 0) ||
pkt_set_seqnum(pkt_to_send, pkt_get_seqnum(pkt)) != 0)
{
ERROR("Setting up a NACK packet to send.");
return -1;
}
state->nack_to_send[free_idx] = pkt_to_send;
return 0;
/**
* @brief This function handles the mechanisms for a valid @pkt
* and modifies @state.
*
* @requires:
* - @pkt was validly decoded by pkt_decoded().
*
* @param pkt: The packet received.
* @param state: The current connection state.
* @returns 0 Upon success and -1 upon failure.
*
* @modifies: @state
*/
int handle_valid_pkt(receiver_state_t * state, const pkt_t * pkt)
Vany Ingenzi
a validé
// Is it in the receive_window ?
uint8_t seqnum = pkt_get_seqnum(pkt);
int in_window = (seqnum >= state->recv_window_start);
if ( state->recv_window_start + RECV_MAX_SLCTV_RPT_WDW >= TWO_EXP_EIGHT )
in_window = in_window || ( seqnum < (state->recv_window_start + RECV_MAX_SLCTV_RPT_WDW) % TWO_EXP_EIGHT );
in_window = in_window && ( seqnum < (state->recv_window_start + RECV_MAX_SLCTV_RPT_WDW) % TWO_EXP_EIGHT );
Vany Ingenzi
a validé
if (!in_window)
DEBUG("Received packet [%d] Out of window with timestamp %d | receive window start at : %d (included) till %d (excluded)",
seqnum, pkt_get_timestamp(pkt), state->recv_window_start, (state->recv_window_start + RECV_MAX_SLCTV_RPT_WDW) % TWO_EXP_EIGHT);
Vany Ingenzi
a validé
uint8_t tr = pkt_get_tr(pkt);
return 0;
/**
* @brief This function handles all the necessary mechanisms when
* there's a message on. It requires a valid state.
*
* @warning Modifiers state
*
* @param pfd: The struct pollfd sur lequel cherche le fd et lire.
* @returns 0 upon success and -1 in cas of failure.
*/
int handle_incoming(struct pollfd * pfd, receiver_state_t * state)
char buffer[PKT_MAX_LEN];
ssize_t read_bytes = read(pfd->fd, (void *) buffer, PKT_MAX_LEN);
if (read_bytes == -1) return 0;
pkt_t * pkt = pkt_new();
if (pkt == NULL)
ERROR("Failed to get a new packet when handling incoming.");
return -1;
}
pkt_status_code pkt_status = pkt_decode(buffer, read_bytes, pkt);
if (pkt_status == 0)
if (handle_valid_pkt(state, (const pkt_t *) pkt) != 0) return -1;
Vany Ingenzi
a validé
DEBUG("Received a damaged packet with %d status.", pkt_status);
// See if there's a FEC that can be used and update buffer
return prepare_ack_to_send(state);
pkt_del(pkt);
return 0;
* @brief The main loop that loops priting the received data on the stdout
* till the whole package is received or there's a timeout.
*
* @param pfd: The pfd of the socket on which to receive file from.
* @param state: The buffer state.
* @returns As soons as the whole file is received.
void reception_loop(struct pollfd * pfd, receiver_state_t * state)
struct timeval current_time, last_packet_received;
while (state->ack_to_send != NULL || state->transfer_done == 0)
if (ready == -1 || pfd->revents & POLLERR) /* In case of error on socket */
if (handle_incoming(pfd, state) != 0)
return;
if (state->last_data_packet < TWO_EXP_EIGHT) /* This means we received last data packet transfer */
gettimeofday(&last_packet_received, NULL);
} else if (pfd->revents & POLLOUT)
{
if (send_if_inneed(pfd, state) != 0)
return;
if (state->last_data_packet < TWO_EXP_EIGHT)
{
gettimeofday(¤t_time, NULL);
if (current_time.tv_sec - last_packet_received.tv_sec > RECEIVER_INACTIVE_TIMEOUT)
{
DEBUG("Ended on timeout %d after all data packets have been sent", RECEIVER_INACTIVE_TIMEOUT);
return;
}
}
* @brief This function allocates enough memory and initiates the connection
receiver_state_t * state_new()
Vany Ingenzi
a validé
{
receiver_state_t * to_return = (receiver_state_t *) calloc(1, sizeof(receiver_state_t));
if (to_return == NULL) return NULL;
Vany Ingenzi
a validé
for (size_t i = 0; i < TWO_EXP_EIGHT; i++)
Vany Ingenzi
a validé
for (size_t i = 0; i < RECV_MAX_SLCTV_RPT_WDW; i++)
Vany Ingenzi
a validé
to_return->curr_recv_window = RECV_MAX_SLCTV_RPT_WDW;
to_return->recv_window_start = 0;
to_return->last_received_in_order = -1; // Sign indicating that it hasn't received something
to_return->ack_to_send = NULL; // Sign that there's no ack to send
to_return->transfer_done = 0;
to_return->next_to_consume = 0;
to_return->last_packet = 256; // No valid seqnum will reach this value, max value 255
to_return->last_data_packet = 256;
return to_return;
Vany Ingenzi
a validé
}
* @brief This function frees the memory allocated by the state_new() function.
void state_del(receiver_state_t * state)
Vany Ingenzi
a validé
{
Vany Ingenzi
a validé
for (int i = 0; i < 31; i++)
pkt_del(state->recvd_buf[i]);
free(state->ack_to_send);
free(state);
}
* @brief This main loop for the receiver.
* @param sfd: A valid socket.
* @returns As soon as an error occurs or, a the total transfer came through.
*/
void receiver_read_write_loop(int sfd)
{
receiver_state_t * state = state_new();
struct pollfd * pfd = (struct pollfd *) calloc(1, sizeof(struct pollfd));
if (state == NULL || pfd == NULL)
{
Vany Ingenzi
a validé
if (state != NULL) state_del(state);
free(pfd);
return;
}
pfd->fd = sfd;
pfd->events = POLLIN | POLLOUT;
Vany Ingenzi
a validé
if (wait_for_client(sfd) == 0)
DEBUG("Done with done status equal to %d", state->transfer_done);
Vany Ingenzi
a validé
state_del(state);