Skip to content
Extraits de code Groupes Projets
Valider 05d686ea rédigé par Vany Ingenzi's avatar Vany Ingenzi
Parcourir les fichiers

Implemented a functionnal receiver without FEC handling, changed malloc when...

Implemented a functionnal receiver without FEC handling, changed malloc when creating a new pkt to calloc in order to prevent conditional jumps when encoding
parent 3a200430
Aucune branche associée trouvée
Aucune étiquette associée trouvée
Aucune requête de fusion associée trouvée
...@@ -7,14 +7,18 @@ ...@@ -7,14 +7,18 @@
#define __RECEIVER_UTILS_ #define __RECEIVER_UTILS_
/* We are using 8 bits to encode the seqnum therefore to fight redondance our window is of 2**(8)/2 */ /* We are using 8 bits to encode the seqnum therefore to fight redondance our window is of 2**(8)/2 */
#define MAX_SELECTIVE_REPEAT_WINDOW 8 #define RECV_MAX_SLCTV_RPT_WDW 31
#define TWO_EXP_EIGHT 256
/* Represent the receiver's connection state */ /* Represent the receiver's connection state */
typedef struct __attribute__((__packed__)) typedef struct __attribute__((__packed__))
{ {
uint8_t receive_window; // Initialised at 16 uint8_t curr_recv_window;
uint8_t recv_window_start;
uint8_t last_received_in_order; uint8_t last_received_in_order;
pkt_t buffer[31]; // Buffer pkt_t * recvd_buf[TWO_EXP_EIGHT];
pkt_t * ack_to_send;
pkt_t * nack_to_send[RECV_MAX_SLCTV_RPT_WDW];
} con_state_t; } con_state_t;
...@@ -25,4 +29,18 @@ typedef struct __attribute__((__packed__)) ...@@ -25,4 +29,18 @@ typedef struct __attribute__((__packed__))
*/ */
void receiver_read_write_loop(int sfd); void receiver_read_write_loop(int sfd);
/**
* Creates and initialize a con_state_t structure
* @return: a valid pointer upon success, else NULL
*/
con_state_t * state_new();
/**
* Deletes the con_state_t structure pointed by the
* state argument.
* @state: the state to delete.
* @return: /
*/
void state_del(con_state_t * state);
#endif #endif
\ No newline at end of file
...@@ -102,5 +102,9 @@ int wait_for_client(int sfd) ...@@ -102,5 +102,9 @@ int wait_for_client(int sfd)
int err = connect(sfd, (struct sockaddr *) &peer_addr, sizeof(struct sockaddr_storage)); int err = connect(sfd, (struct sockaddr *) &peer_addr, sizeof(struct sockaddr_storage));
if (err != EISCONN && err != 0) if (err != EISCONN && err != 0)
return -1; return -1;
char text[100];
struct sockaddr_in6 * cast = (struct sockaddr_in6 *) &peer_addr;
ipv6_to_str_unexpanded(text, &(cast->sin6_addr));
DEBUG("Successfully connected to IPv6 addresss: %s, port : %d", text, ntohs(cast->sin6_port));
return 0; return 0;
} }
\ No newline at end of file
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
/************* Functions definition *************/ /************* Functions definition *************/
pkt_t* pkt_new() pkt_t* pkt_new()
{ {
pkt_t * toReturn = (pkt_t *) malloc(sizeof(pkt_t)); pkt_t * toReturn = (pkt_t *) calloc(1, sizeof(pkt_t));
return toReturn; return toReturn;
} }
......
...@@ -4,14 +4,178 @@ ...@@ -4,14 +4,178 @@
#include "our_utils.h" #include "our_utils.h"
#include "receiver.h" #include "receiver.h"
int send_if_inneed() int send_if_inneed(struct pollfd * pfd, con_state_t * state)
{ {
return 1; // Priority to NACKs
int nack_idx = 0;
for(; nack_idx < RECV_MAX_SLCTV_RPT_WDW && state->nack_to_send[nack_idx] == NULL; nack_idx++);
char buffer[PKT_MIN_LEN];
size_t len = PKT_MIN_LEN;
if (nack_idx != RECV_MAX_SLCTV_RPT_WDW) /* There's a nack to send */
{
if (pkt_set_window(state->nack_to_send[nack_idx], state->curr_recv_window) != 0 ||
pkt_encode(state->nack_to_send[nack_idx], (char *) buffer, &len) != PKT_OK)
{
ERROR("Encoding NACK to send failed");
return -1;
}
size_t written = write(pfd->fd, (void *) buffer, len);
if ( written != len)
{
ERROR("When witing a NACK to socket, didn't written the correct len");
return -1;
}
DEBUG("\nSent NACK");
DEBUG_DUMP(buffer, len);
pkt_del(state->nack_to_send[nack_idx]);
state->nack_to_send[nack_idx] = NULL;
} else if (state->ack_to_send != NULL) /* There's an ack to send */
{
if (pkt_set_window(state->ack_to_send, state->curr_recv_window) != 0 ||
pkt_encode(state->ack_to_send, (char *) buffer, &len) != PKT_OK)
{
ERROR("\nEncoding an ACK to send failed");
return -1;
}
size_t written = write(pfd->fd, (void *) buffer, len);
if ( written != len)
{
ERROR("When witing a ACK to socket, didn't written the correct len");
return -1;
}
DEBUG("Sent ACK");
DEBUG_DUMP(buffer, len);
pkt_del(state->ack_to_send);
state->ack_to_send = NULL;
}
return 0;
}
int consume_data(con_state_t * state, uint8_t seqnum_to_consume)
{
size_t written = fwrite((void *) pkt_get_payload(state->recvd_buf[seqnum_to_consume]), sizeof(char), (size_t) pkt_get_length(state->recvd_buf[seqnum_to_consume]), stdout);
if (written != pkt_get_length(state->recvd_buf[seqnum_to_consume]))
{
ERROR("Couldn't write the full packet content");
return -1;
}
fflush(stdout);
pkt_del(state->recvd_buf[seqnum_to_consume]);
state->recvd_buf[seqnum_to_consume] = NULL;
return 0;
}
int update_buffer_upon_new_data(con_state_t * state, pkt_t * pkt)
{
// Find free space
if (state->curr_recv_window == 0)
{
DEBUG("Received packet when receive window full");
return 0;
} else
{
state->curr_recv_window = (state->curr_recv_window > 0) ? state->curr_recv_window-1 : 0;
}
uint8_t seqnum = pkt_get_seqnum(pkt);
// New packet
if (state->recvd_buf[seqnum] == NULL)
{
pkt_t * pkt_to_store = pkt_new();
if (pkt_to_store == NULL)
return -1;
memcpy((void *) pkt_to_store, (void *) pkt, sizeof(pkt_t));
state->recvd_buf[seqnum] = pkt_to_store;
}
if (seqnum == (state->last_received_in_order + 1) % TWO_EXP_EIGHT)
state->last_received_in_order = seqnum;
else
return 0;
uint16_t next_wait = state->last_received_in_order;
while (state->recvd_buf[next_wait] != NULL)
{
DEBUG("Consuming packet : %d | curr_recv_window = %d, recv_window_start = %d", next_wait, state->curr_recv_window,
state->recv_window_start);
if (consume_data(state, next_wait) != 0) return -1;
state->last_received_in_order = next_wait;
state->curr_recv_window = (state->curr_recv_window < RECV_MAX_SLCTV_RPT_WDW) ? state->curr_recv_window+1 : RECV_MAX_SLCTV_RPT_WDW;
state->recv_window_start = (state->recv_window_start + 1) % TWO_EXP_EIGHT;
next_wait = (next_wait + 1) % TWO_EXP_EIGHT;
}
return 0;
}
int update_state_data_pkt(con_state_t * state, pkt_t * pkt)
{
if (update_buffer_upon_new_data(state, pkt) != 0) return -1;
if ( state->ack_to_send != NULL ) /* There's an unsent ack to send */
{
if (pkt_set_seqnum(state->ack_to_send, state->last_received_in_order) != 0)
{
ERROR("When setting the seqnum of an existing ack that hasn't been sent yet.");
return -1;
}
} else /* We have to create an ack to send */
{
state->ack_to_send = pkt_new();
if (state->ack_to_send == NULL)
{
ERROR("Couldn't create a ACK for the new packet.");
return -1;
}
if (pkt_set_type(state->ack_to_send, PTYPE_ACK) != 0 || pkt_set_tr(state->ack_to_send, 0) ||
pkt_set_seqnum(state->ack_to_send, state->last_received_in_order) != 0)
{
ERROR("Setting up a ACK packet to send.");
return -1;
}
}
ASSERT(state->ack_to_send != NULL);
return 0;
}
int update_state_truncated_pkt(con_state_t * state, pkt_t * pkt)
{
int free_idx = RECV_MAX_SLCTV_RPT_WDW;
for (; free_idx > 0 && state->nack_to_send[free_idx] != NULL; free_idx--);
if ( free_idx == 0 )
return 0;
pkt_t * pkt_to_send = pkt_new();
if (pkt_to_send == NULL) return -1;
if (pkt_set_type(pkt_to_send, PTYPE_NACK) != 0 || pkt_set_tr(pkt_to_send, 0) ||
pkt_set_seqnum(pkt_to_send, pkt_get_seqnum(pkt)) != 0)
{
ERROR("Setting up a NACK packet to send.");
return -1;
}
state->nack_to_send[free_idx] = pkt_to_send;
return 0;
} }
int update_state(con_state_t * state, pkt_t * pkt) int update_state(con_state_t * state, pkt_t * pkt)
{ {
ASSERT(state != NULL && pkt != NULL); // Feck - Not handled for now
if (pkt_get_type(pkt) != PTYPE_DATA) return 0;
// Is it in the receive_window ?
uint8_t seqnum = pkt_get_seqnum(pkt);
int in_window = (seqnum >= state->recv_window_start && seqnum < (state->recv_window_start + RECV_MAX_SLCTV_RPT_WDW) % TWO_EXP_EIGHT);
if (!in_window)
return 0;
uint8_t tr = pkt_get_tr(pkt);
if (tr == 1) /* If truncated */
return update_state_truncated_pkt(state, pkt);
else /* If not truncated */
return update_state_data_pkt(state, pkt);
return 0; return 0;
} }
...@@ -21,7 +185,7 @@ int handle_incoming(struct pollfd * pfd, con_state_t * state) ...@@ -21,7 +185,7 @@ int handle_incoming(struct pollfd * pfd, con_state_t * state)
ssize_t read_bytes = read(pfd->fd, (void *) buffer, PKT_MAX_LEN); ssize_t read_bytes = read(pfd->fd, (void *) buffer, PKT_MAX_LEN);
if (read_bytes == -1) return 0; if (read_bytes == -1) return 0;
DEBUG_DUMP(buffer, read_bytes); // DEBUG_DUMP(buffer, read_bytes);
pkt_t * pkt = pkt_new(); pkt_t * pkt = pkt_new();
if (pkt == NULL) if (pkt == NULL)
...@@ -33,8 +197,10 @@ int handle_incoming(struct pollfd * pfd, con_state_t * state) ...@@ -33,8 +197,10 @@ int handle_incoming(struct pollfd * pfd, con_state_t * state)
if (pkt_status == 0) if (pkt_status == 0)
{ {
if (update_state(state, pkt) != 0) return -1; if (update_state(state, pkt) != 0) return -1;
} else { // If the packet has been damaged } else {
DEBUG("Received a packet with %d status.", pkt_status); /* If the packet has been damaged we can discuss if it's better to send a ACK
or not but for now we just ignore it */
DEBUG("Received a damaged packet with %d status.", pkt_status);
} }
pkt_del(pkt); pkt_del(pkt);
return 0; return 0;
...@@ -49,7 +215,6 @@ int handle_incoming(struct pollfd * pfd, con_state_t * state) ...@@ -49,7 +215,6 @@ int handle_incoming(struct pollfd * pfd, con_state_t * state)
*/ */
void reception_loop(struct pollfd * pfd, con_state_t * state) void reception_loop(struct pollfd * pfd, con_state_t * state)
{ {
ASSERT(state != NULL);
int not_eof = 1; int not_eof = 1;
while (not_eof) while (not_eof)
{ {
...@@ -65,7 +230,7 @@ void reception_loop(struct pollfd * pfd, con_state_t * state) ...@@ -65,7 +230,7 @@ void reception_loop(struct pollfd * pfd, con_state_t * state)
if (handle_incoming(pfd, state) != 0) return; if (handle_incoming(pfd, state) != 0) return;
} else if (pfd->revents & POLLOUT) } else if (pfd->revents & POLLOUT)
{ {
send_if_inneed(); if (send_if_inneed(pfd, state) != 0) return;
} else if ((pfd->revents & POLLHUP) || (pfd->revents & POLLERR)){ } else if ((pfd->revents & POLLHUP) || (pfd->revents & POLLERR)){
ERROR("Occured on the socket"); ERROR("Occured on the socket");
return; return;
...@@ -73,6 +238,33 @@ void reception_loop(struct pollfd * pfd, con_state_t * state) ...@@ -73,6 +238,33 @@ void reception_loop(struct pollfd * pfd, con_state_t * state)
} }
} }
con_state_t * state_new()
{
con_state_t * toReturn = (con_state_t *) calloc(1, sizeof(con_state_t));
if (toReturn == NULL) return NULL;
for (size_t i = 0; i < TWO_EXP_EIGHT; i++)
toReturn->recvd_buf[i] = NULL;
for (size_t i = 0; i < RECV_MAX_SLCTV_RPT_WDW; i++)
toReturn->nack_to_send[i] = NULL;
toReturn->curr_recv_window = RECV_MAX_SLCTV_RPT_WDW;
toReturn->recv_window_start = 0;
toReturn->last_received_in_order = -1; // Sign indicating that it hasn't received something
toReturn->ack_to_send = NULL; // Sign that there's no ack to send
return toReturn;
}
void state_del(con_state_t * state)
{
for (int i = 0; i < 31; i++)
pkt_del(state->recvd_buf[i]);
free(state->ack_to_send);
free(state);
}
/** /**
* This main loop for the receiver. * This main loop for the receiver.
* @sfd: A valid socket. * @sfd: A valid socket.
...@@ -80,21 +272,18 @@ void reception_loop(struct pollfd * pfd, con_state_t * state) ...@@ -80,21 +272,18 @@ void reception_loop(struct pollfd * pfd, con_state_t * state)
*/ */
void receiver_read_write_loop(int sfd) void receiver_read_write_loop(int sfd)
{ {
con_state_t * state = malloc(sizeof(con_state_t)); con_state_t * state = state_new();
struct pollfd * pfd = (struct pollfd *) calloc(1, sizeof(struct pollfd)); struct pollfd * pfd = (struct pollfd *) calloc(1, sizeof(struct pollfd));
if (state == NULL || pfd == NULL) if (state == NULL || pfd == NULL)
{ {
free(state); if (state != NULL) state_del(state);
free(pfd); free(pfd);
return; return;
} }
pfd->fd = sfd; pfd->fd = sfd;
pfd->events = POLLIN | POLLOUT; pfd->events = POLLIN | POLLOUT;
if (wait_for_client(sfd) == 0)
if (wait_for_client(sfd) == 0)
reception_loop(pfd, state); reception_loop(pfd, state);
state_del(state);
free(state);
free(pfd); free(pfd);
} }
\ No newline at end of file
...@@ -218,5 +218,5 @@ int main(int argc, char const *argv[]) ...@@ -218,5 +218,5 @@ int main(int argc, char const *argv[])
close(socket); close(socket);
// Writing and reading // Writing and reading
return (EXIT_SUCCESS ); return (EXIT_SUCCESS);
} }
\ No newline at end of file
0% Chargement en cours ou .
You are about to add 0 people to the discussion. Proceed with caution.
Terminez d'abord l'édition de ce message.
Veuillez vous inscrire ou vous pour commenter