diff --git a/headers/receiver.h b/headers/receiver.h index fbc039d28f0520d60f5f2141cc849b10036d4c47..e9faed6d00b9533faf4587b33071d6716273bb46 100644 --- a/headers/receiver.h +++ b/headers/receiver.h @@ -7,14 +7,18 @@ #define __RECEIVER_UTILS_ /* We are using 8 bits to encode the seqnum therefore to fight redondance our window is of 2**(8)/2 */ -#define MAX_SELECTIVE_REPEAT_WINDOW 8 +#define RECV_MAX_SLCTV_RPT_WDW 31 +#define TWO_EXP_EIGHT 256 /* Represent the receiver's connection state */ typedef struct __attribute__((__packed__)) { - uint8_t receive_window; // Initialised at 16 + uint8_t curr_recv_window; + uint8_t recv_window_start; uint8_t last_received_in_order; - pkt_t buffer[31]; // Buffer + pkt_t * recvd_buf[TWO_EXP_EIGHT]; + pkt_t * ack_to_send; + pkt_t * nack_to_send[RECV_MAX_SLCTV_RPT_WDW]; } con_state_t; @@ -25,4 +29,18 @@ typedef struct __attribute__((__packed__)) */ void receiver_read_write_loop(int sfd); +/** + * Creates and initialize a con_state_t structure + * @return: a valid pointer upon success, else NULL + */ +con_state_t * state_new(); + + +/** + * Deletes the con_state_t structure pointed by the + * state argument. + * @state: the state to delete. + * @return: / + */ +void state_del(con_state_t * state); #endif \ No newline at end of file diff --git a/src/our_utils.c b/src/our_utils.c index c07a716fce1e74ec34a929dadbdb637e76160481..6793cb48f51901ab1eb31bc9126cb5b3a3ac3356 100644 --- a/src/our_utils.c +++ b/src/our_utils.c @@ -102,5 +102,9 @@ int wait_for_client(int sfd) int err = connect(sfd, (struct sockaddr *) &peer_addr, sizeof(struct sockaddr_storage)); if (err != EISCONN && err != 0) return -1; + char text[100]; + struct sockaddr_in6 * cast = (struct sockaddr_in6 *) &peer_addr; + ipv6_to_str_unexpanded(text, &(cast->sin6_addr)); + DEBUG("Successfully connected to IPv6 addresss: %s, port : %d", text, ntohs(cast->sin6_port)); return 0; } \ No newline at end of file diff --git a/src/packet_interface.c b/src/packet_interface.c index 6ca0328476b687f907b553450a43f499296279d6..d79cdb07652c810397c9319948f87d050b262bae 100644 --- a/src/packet_interface.c +++ b/src/packet_interface.c @@ -4,7 +4,7 @@ /************* Functions definition *************/ pkt_t* pkt_new() { - pkt_t * toReturn = (pkt_t *) malloc(sizeof(pkt_t)); + pkt_t * toReturn = (pkt_t *) calloc(1, sizeof(pkt_t)); return toReturn; } diff --git a/src/receiver_utils.c b/src/receiver_utils.c index f78a44d91cf721794a5f6e9223e317ff226e5812..9d982fd0ce6d8cd73d8f3f8fd63fcf65cb6dd89b 100644 --- a/src/receiver_utils.c +++ b/src/receiver_utils.c @@ -4,14 +4,178 @@ #include "our_utils.h" #include "receiver.h" -int send_if_inneed() +int send_if_inneed(struct pollfd * pfd, con_state_t * state) { - return 1; + // Priority to NACKs + int nack_idx = 0; + for(; nack_idx < RECV_MAX_SLCTV_RPT_WDW && state->nack_to_send[nack_idx] == NULL; nack_idx++); + + char buffer[PKT_MIN_LEN]; + size_t len = PKT_MIN_LEN; + if (nack_idx != RECV_MAX_SLCTV_RPT_WDW) /* There's a nack to send */ + { + if (pkt_set_window(state->nack_to_send[nack_idx], state->curr_recv_window) != 0 || + pkt_encode(state->nack_to_send[nack_idx], (char *) buffer, &len) != PKT_OK) + { + ERROR("Encoding NACK to send failed"); + return -1; + } + size_t written = write(pfd->fd, (void *) buffer, len); + if ( written != len) + { + ERROR("When witing a NACK to socket, didn't written the correct len"); + return -1; + } + DEBUG("\nSent NACK"); + DEBUG_DUMP(buffer, len); + pkt_del(state->nack_to_send[nack_idx]); + state->nack_to_send[nack_idx] = NULL; + } else if (state->ack_to_send != NULL) /* There's an ack to send */ + { + if (pkt_set_window(state->ack_to_send, state->curr_recv_window) != 0 || + pkt_encode(state->ack_to_send, (char *) buffer, &len) != PKT_OK) + { + ERROR("\nEncoding an ACK to send failed"); + return -1; + } + size_t written = write(pfd->fd, (void *) buffer, len); + if ( written != len) + { + ERROR("When witing a ACK to socket, didn't written the correct len"); + return -1; + } + DEBUG("Sent ACK"); + DEBUG_DUMP(buffer, len); + pkt_del(state->ack_to_send); + state->ack_to_send = NULL; + } + return 0; +} + +int consume_data(con_state_t * state, uint8_t seqnum_to_consume) +{ + size_t written = fwrite((void *) pkt_get_payload(state->recvd_buf[seqnum_to_consume]), sizeof(char), (size_t) pkt_get_length(state->recvd_buf[seqnum_to_consume]), stdout); + if (written != pkt_get_length(state->recvd_buf[seqnum_to_consume])) + { + ERROR("Couldn't write the full packet content"); + return -1; + } + fflush(stdout); + pkt_del(state->recvd_buf[seqnum_to_consume]); + state->recvd_buf[seqnum_to_consume] = NULL; + return 0; +} + +int update_buffer_upon_new_data(con_state_t * state, pkt_t * pkt) +{ + // Find free space + if (state->curr_recv_window == 0) + { + DEBUG("Received packet when receive window full"); + return 0; + } else + { + state->curr_recv_window = (state->curr_recv_window > 0) ? state->curr_recv_window-1 : 0; + } + + uint8_t seqnum = pkt_get_seqnum(pkt); + // New packet + if (state->recvd_buf[seqnum] == NULL) + { + pkt_t * pkt_to_store = pkt_new(); + if (pkt_to_store == NULL) + return -1; + memcpy((void *) pkt_to_store, (void *) pkt, sizeof(pkt_t)); + state->recvd_buf[seqnum] = pkt_to_store; + } + + if (seqnum == (state->last_received_in_order + 1) % TWO_EXP_EIGHT) + state->last_received_in_order = seqnum; + else + return 0; + + uint16_t next_wait = state->last_received_in_order; + while (state->recvd_buf[next_wait] != NULL) + { + DEBUG("Consuming packet : %d | curr_recv_window = %d, recv_window_start = %d", next_wait, state->curr_recv_window, + state->recv_window_start); + if (consume_data(state, next_wait) != 0) return -1; + state->last_received_in_order = next_wait; + state->curr_recv_window = (state->curr_recv_window < RECV_MAX_SLCTV_RPT_WDW) ? state->curr_recv_window+1 : RECV_MAX_SLCTV_RPT_WDW; + state->recv_window_start = (state->recv_window_start + 1) % TWO_EXP_EIGHT; + next_wait = (next_wait + 1) % TWO_EXP_EIGHT; + } + return 0; +} + +int update_state_data_pkt(con_state_t * state, pkt_t * pkt) +{ + if (update_buffer_upon_new_data(state, pkt) != 0) return -1; + + if ( state->ack_to_send != NULL ) /* There's an unsent ack to send */ + { + if (pkt_set_seqnum(state->ack_to_send, state->last_received_in_order) != 0) + { + ERROR("When setting the seqnum of an existing ack that hasn't been sent yet."); + return -1; + } + } else /* We have to create an ack to send */ + { + state->ack_to_send = pkt_new(); + if (state->ack_to_send == NULL) + { + ERROR("Couldn't create a ACK for the new packet."); + return -1; + } + + if (pkt_set_type(state->ack_to_send, PTYPE_ACK) != 0 || pkt_set_tr(state->ack_to_send, 0) || + pkt_set_seqnum(state->ack_to_send, state->last_received_in_order) != 0) + { + ERROR("Setting up a ACK packet to send."); + return -1; + } + } + ASSERT(state->ack_to_send != NULL); + return 0; +} + +int update_state_truncated_pkt(con_state_t * state, pkt_t * pkt) +{ + + int free_idx = RECV_MAX_SLCTV_RPT_WDW; + for (; free_idx > 0 && state->nack_to_send[free_idx] != NULL; free_idx--); + if ( free_idx == 0 ) + return 0; + + pkt_t * pkt_to_send = pkt_new(); + if (pkt_to_send == NULL) return -1; + if (pkt_set_type(pkt_to_send, PTYPE_NACK) != 0 || pkt_set_tr(pkt_to_send, 0) || + pkt_set_seqnum(pkt_to_send, pkt_get_seqnum(pkt)) != 0) + { + ERROR("Setting up a NACK packet to send."); + return -1; + } + + state->nack_to_send[free_idx] = pkt_to_send; + return 0; } int update_state(con_state_t * state, pkt_t * pkt) { - ASSERT(state != NULL && pkt != NULL); + // Feck - Not handled for now + if (pkt_get_type(pkt) != PTYPE_DATA) return 0; + + // Is it in the receive_window ? + uint8_t seqnum = pkt_get_seqnum(pkt); + int in_window = (seqnum >= state->recv_window_start && seqnum < (state->recv_window_start + RECV_MAX_SLCTV_RPT_WDW) % TWO_EXP_EIGHT); + if (!in_window) + return 0; + + uint8_t tr = pkt_get_tr(pkt); + if (tr == 1) /* If truncated */ + return update_state_truncated_pkt(state, pkt); + else /* If not truncated */ + return update_state_data_pkt(state, pkt); return 0; } @@ -21,7 +185,7 @@ int handle_incoming(struct pollfd * pfd, con_state_t * state) ssize_t read_bytes = read(pfd->fd, (void *) buffer, PKT_MAX_LEN); if (read_bytes == -1) return 0; - DEBUG_DUMP(buffer, read_bytes); + // DEBUG_DUMP(buffer, read_bytes); pkt_t * pkt = pkt_new(); if (pkt == NULL) @@ -33,8 +197,10 @@ int handle_incoming(struct pollfd * pfd, con_state_t * state) if (pkt_status == 0) { if (update_state(state, pkt) != 0) return -1; - } else { // If the packet has been damaged - DEBUG("Received a packet with %d status.", pkt_status); + } else { + /* If the packet has been damaged we can discuss if it's better to send a ACK + or not but for now we just ignore it */ + DEBUG("Received a damaged packet with %d status.", pkt_status); } pkt_del(pkt); return 0; @@ -49,7 +215,6 @@ int handle_incoming(struct pollfd * pfd, con_state_t * state) */ void reception_loop(struct pollfd * pfd, con_state_t * state) { - ASSERT(state != NULL); int not_eof = 1; while (not_eof) { @@ -65,7 +230,7 @@ void reception_loop(struct pollfd * pfd, con_state_t * state) if (handle_incoming(pfd, state) != 0) return; } else if (pfd->revents & POLLOUT) { - send_if_inneed(); + if (send_if_inneed(pfd, state) != 0) return; } else if ((pfd->revents & POLLHUP) || (pfd->revents & POLLERR)){ ERROR("Occured on the socket"); return; @@ -73,6 +238,33 @@ void reception_loop(struct pollfd * pfd, con_state_t * state) } } +con_state_t * state_new() +{ + con_state_t * toReturn = (con_state_t *) calloc(1, sizeof(con_state_t)); + if (toReturn == NULL) return NULL; + + for (size_t i = 0; i < TWO_EXP_EIGHT; i++) + toReturn->recvd_buf[i] = NULL; + + for (size_t i = 0; i < RECV_MAX_SLCTV_RPT_WDW; i++) + toReturn->nack_to_send[i] = NULL; + + toReturn->curr_recv_window = RECV_MAX_SLCTV_RPT_WDW; + toReturn->recv_window_start = 0; + toReturn->last_received_in_order = -1; // Sign indicating that it hasn't received something + toReturn->ack_to_send = NULL; // Sign that there's no ack to send + return toReturn; +} + +void state_del(con_state_t * state) +{ + for (int i = 0; i < 31; i++) + pkt_del(state->recvd_buf[i]); + + free(state->ack_to_send); + free(state); +} + /** * This main loop for the receiver. * @sfd: A valid socket. @@ -80,21 +272,18 @@ void reception_loop(struct pollfd * pfd, con_state_t * state) */ void receiver_read_write_loop(int sfd) { - con_state_t * state = malloc(sizeof(con_state_t)); + con_state_t * state = state_new(); struct pollfd * pfd = (struct pollfd *) calloc(1, sizeof(struct pollfd)); if (state == NULL || pfd == NULL) { - free(state); - free(pfd); + if (state != NULL) state_del(state); + free(pfd); return; } - pfd->fd = sfd; pfd->events = POLLIN | POLLOUT; - - if (wait_for_client(sfd) == 0) + if (wait_for_client(sfd) == 0) reception_loop(pfd, state); - - free(state); + state_del(state); free(pfd); } \ No newline at end of file diff --git a/temp/get_ipv6_address.c b/temp/get_ipv6_address.c index 5159359a1ab25ef5e7e50126ad216bb234339db7..811144a75292c3ad8dbd6bf602c1168d5e5cf54b 100644 --- a/temp/get_ipv6_address.c +++ b/temp/get_ipv6_address.c @@ -218,5 +218,5 @@ int main(int argc, char const *argv[]) close(socket); // Writing and reading - return (EXIT_SUCCESS ); + return (EXIT_SUCCESS); } \ No newline at end of file