diff --git a/receiver b/receiver new file mode 100755 index 0000000000000000000000000000000000000000..8c6dfadb164089605220e1d66fee01d592c3bad7 Binary files /dev/null and b/receiver differ diff --git a/sender b/sender new file mode 100755 index 0000000000000000000000000000000000000000..fbf5ff9642b83c5144fe8e84989eef913eb139d0 Binary files /dev/null and b/sender differ diff --git a/src/packet_interface.c b/src/packet_interface.c index 783380cabd1a2de36d434a2648809ac942487998..94e65803ffc72706abe3d7dcc5f13861f1c5caba 100644 --- a/src/packet_interface.c +++ b/src/packet_interface.c @@ -55,27 +55,33 @@ pkt_status_code pkt_decode_ack_nack(const char *data, const size_t len, pkt_t *p pkt_status_code pkt_decode_data_fec(const char *data, const size_t len, pkt_t *pkt) { - uint16_t length; - memcpy((void *) &length, (void *) &data[1], 2); - length = ntohs(length); + uint16_t payload_length, actual_indicated_length; + ptypes_t type = (data[0] & TYPE_MASK) >> TYPE_SHIFT; + + memcpy((void *) &actual_indicated_length, (void *) &data[1], 2); + actual_indicated_length = ntohs(actual_indicated_length); + payload_length = (type == PTYPE_DATA) ? actual_indicated_length : MAX_PAYLOAD_SIZE; // Fec are always of length 512 if ( len < PKT_MIN_HEADERLEN ) return E_NOHEADER; - if ( len > PKT_MAX_LEN || length > MAX_PAYLOAD_SIZE ) + if ( len > PKT_MAX_LEN || payload_length > MAX_PAYLOAD_SIZE ) return E_LENGTH; - ptypes_t type = (data[0] & TYPE_MASK) >> TYPE_SHIFT; + if ( type == PTYPE_FEC && (data[0] & TR_MASK ) != 0 ) return E_TR; - size_t expected_len = sizeof(header_t) + TIMESTAMP_SIZE + CRC_SIZE + length; + size_t expected_len = sizeof(header_t) + TIMESTAMP_SIZE + CRC_SIZE + payload_length; - if ( length > 0 ) + if ( payload_length > 0 ) expected_len += CRC_SIZE; - if ( len != expected_len && type != PTYPE_FEC) + if ( len != expected_len ) + { return E_UNCONSISTENT; + } + // We set the TR to 0 in order to calculcate the CRC on the header char modified_header[8]; @@ -94,11 +100,11 @@ pkt_status_code pkt_decode_data_fec(const char *data, const size_t len, pkt_t *p @brief : We don't check the checksum and the window here **/ uint32_t crc2; - if ( (type == PTYPE_DATA && (data[0] & TR_MASK ) == 0 && length > 0) || type == PTYPE_FEC ) + if ( type == PTYPE_DATA && (data[0] & TR_MASK ) == 0 && payload_length > 0 ) { - memcpy((void *) &crc2, &data[12+length], 4); + memcpy((void *) &crc2, &data[12+payload_length], 4); crc2 = ntohl((uint32_t) crc2); - if (calculate_crc(&data[12], length) != crc2) + if (calculate_crc(&data[12], payload_length) != crc2) return E_CRC; pkt_set_crc2(pkt, crc2); } @@ -107,11 +113,11 @@ pkt_status_code pkt_decode_data_fec(const char *data, const size_t len, pkt_t *p memcpy((void *) ×tamp, &data[4], 4); memcpy(&(pkt->header.front), data, 1); - pkt_set_length(pkt, length); + pkt_set_length(pkt, actual_indicated_length); pkt_set_seqnum(pkt, seqnum); pkt_set_timestamp(pkt, timestamp); pkt_set_crc1(pkt, crc1); - pkt_set_payload(pkt, &data[12], length); + pkt_set_payload(pkt, &data[12], payload_length); return PKT_OK; } @@ -145,7 +151,7 @@ pkt_status_code pkt_encode_ACK_NACK(const pkt_t *pkt, char *buf, size_t *len) pkt_status_code pkt_encode_DATA_FEC(const pkt_t *pkt, char *buf, size_t *len) { // Let's first copy the header - if ( *len < 12 ) return E_NOMEM; + if ( *len < PKT_MIN_HEADERLEN ) return E_NOMEM; memcpy((void *) &buf[0], (void *) &(pkt->header.front), 1); uint16_t n_length = htons(pkt_get_length(pkt)); memcpy((void *) &buf[1], (void *) &n_length, 2); @@ -157,20 +163,20 @@ pkt_status_code pkt_encode_DATA_FEC(const pkt_t *pkt, char *buf, size_t *len) size_t required_size; ptypes_t type = pkt_get_type(pkt); - uint16_t length = (type == PTYPE_DATA && pkt_get_tr(pkt) == 0) ? pkt_get_length(pkt) : MAX_PAYLOAD_SIZE; - required_size = sizeof(header_t) + TIMESTAMP_SIZE + CRC_SIZE + length; + uint16_t payload_length = (type == PTYPE_DATA && pkt_get_tr(pkt) == 0) ? pkt_get_length(pkt) : MAX_PAYLOAD_SIZE; // Cause FEC payload is always max size + required_size = sizeof(header_t) + TIMESTAMP_SIZE + CRC_SIZE + payload_length; if ( pkt_get_length(pkt) > 0 ) required_size += CRC_SIZE; if ( *len < required_size ) return E_NOMEM; - *len = sizeof(header_t) + TIMESTAMP_SIZE + CRC_SIZE + length; - if ( pkt_get_length(pkt) > 0 ) + *len = sizeof(header_t) + TIMESTAMP_SIZE + CRC_SIZE + payload_length; + if ( payload_length > 0 ) { - memcpy((void *) &buf[12], (void *) pkt->payload, length); - uint32_t crc2 = htonl(calculate_crc(&buf[12], length)); - memcpy((void *) &buf[12 + length], (void *) &crc2, CRC_SIZE); + memcpy((void *) &buf[12], (void *) pkt->payload, payload_length); + uint32_t crc2 = htonl(calculate_crc(&buf[12], payload_length)); + memcpy((void *) &buf[12 + payload_length], (void *) &crc2, CRC_SIZE); *len = *len + CRC_SIZE; } return PKT_OK; diff --git a/src/receiver_utils.c b/src/receiver_utils.c index f8cfc562b552e187f50d37515c404e7a7a943715..0cd936588618ea3237e7be17100697c272a36468 100644 --- a/src/receiver_utils.c +++ b/src/receiver_utils.c @@ -50,17 +50,17 @@ int send_if_inneed(struct pollfd * pfd, receiver_state_t * state) int consume_data_pkt(receiver_state_t * state, uint8_t seqnum_to_consume) { - ASSERT(state->recvd_buf[seqnum_to_consume] != NULL); - size_t written = fwrite((void *) pkt_get_payload(state->recvd_buf[seqnum_to_consume]), sizeof(char), (size_t) pkt_get_length(state->recvd_buf[seqnum_to_consume]), stdout); - if (written != pkt_get_length(state->recvd_buf[seqnum_to_consume])) + ASSERT(state->recvd_data_buf[seqnum_to_consume] != NULL); + size_t written = fwrite((void *) pkt_get_payload(state->recvd_data_buf[seqnum_to_consume]), sizeof(char), (size_t) pkt_get_length(state->recvd_data_buf[seqnum_to_consume]), stdout); + if (written != pkt_get_length(state->recvd_data_buf[seqnum_to_consume])) { ERROR("Couldn't write the full packet content"); return -1; } fflush(stdout); - state->latest_timestamp = pkt_get_timestamp(state->recvd_buf[seqnum_to_consume]); - pkt_del(state->recvd_buf[seqnum_to_consume]); - state->recvd_buf[seqnum_to_consume] = NULL; + state->latest_timestamp = pkt_get_timestamp(state->recvd_data_buf[seqnum_to_consume]); + pkt_del(state->recvd_data_buf[seqnum_to_consume]); + state->recvd_data_buf[seqnum_to_consume] = NULL; return 0; } @@ -77,7 +77,7 @@ uint16_t next_four_packets_received(receiver_state_t * state, uint16_t position_ for (uint16_t i = position_to_start; i < position_to_start + FEC_CALCULATED_ON; i++) { - if (state->recvd_buf[i] == NULL) + if (state->recvd_data_buf[i] == NULL) return 0; } return 1; @@ -98,7 +98,7 @@ uint16_t can_consume(receiver_state_t * state, const pkt_t * latest_packet_recei ASSERT(state->next_to_consume >= 0 && state->next_to_consume < TWO_EXP_EIGHT); if (pkt_get_length(latest_packet_received) == 0) { // Last packet we read everything from the buffer - for (uint16_t i = state->next_to_consume; state->recvd_buf[i] != NULL; i = (i + 1) % TWO_EXP_EIGHT ) + for (uint16_t i = state->next_to_consume; state->recvd_data_buf[i] != NULL; i = (i + 1) % TWO_EXP_EIGHT ) to_consume++; } else { // We only read blocks of 4 packets. @@ -116,13 +116,13 @@ int update_buffer_upon_new_data(receiver_state_t * state, const pkt_t * pkt) uint8_t seqnum = pkt_get_seqnum(pkt); // New packet - if (state->recvd_buf[seqnum] == NULL) + if (state->recvd_data_buf[seqnum] == NULL) { pkt_t * pkt_to_store = pkt_new(); if (pkt_to_store == NULL) return -1; memcpy((void *) pkt_to_store, (void *) pkt, sizeof(pkt_t)); - state->recvd_buf[seqnum] = pkt_to_store; + state->recvd_data_buf[seqnum] = pkt_to_store; state->curr_recv_window = (state->curr_recv_window > 0) ? state->curr_recv_window-1 : 0; } @@ -131,7 +131,7 @@ int update_buffer_upon_new_data(receiver_state_t * state, const pkt_t * pkt) { state->last_received_in_order = seqnum; uint16_t idx = seqnum; - for (;state->recvd_buf[idx] != NULL; idx = (idx + 1) % TWO_EXP_EIGHT) + for (;state->recvd_data_buf[idx] != NULL; idx = (idx + 1) % TWO_EXP_EIGHT) state->last_received_in_order = idx; } uint16_t to_consume = can_consume(state, pkt); @@ -198,28 +198,6 @@ int prepare_ack_to_send(receiver_state_t * state) return 0; } -/** - * @brief This function handles PTYPE_FEC arriving packets and updates the state. - * - * @param state: The receiver state. - * @param pkt: The DATA packet. - * @returns 0 upon success else -1. - * - * @modifies: state. - */ -int handle_fec_pkt(receiver_state_t * state, const pkt_t * pkt) -{ - ASSERT(state != NULL && pkt != NULL); - if (state->last_received_in_order > pkt_get_seqnum(pkt)) - { - DEBUG("Received FEC with seqnum %d but wasn't used since last received in order is %d", pkt_get_seqnum(pkt), state->last_received_in_order); - return 0; - } - // Add FEC to state buffer of fec - // See if there's FEC that can be used and update buffer - return 0; -} - /** * @brief This function handles PTYPE_DATA arriving packets and updates the state. * @@ -254,6 +232,115 @@ int handle_data_pkt(receiver_state_t * state, const pkt_t * pkt) state->last_data_packet = pkt_get_seqnum(pkt); return 0; +} + +int use_fec(receiver_state_t * state, const pkt_t * fec, uint16_t missing_seqnum) +{ + pkt_t * new_packet = pkt_new(); + if ( new_packet == NULL ) + { + ERROR("An error occured when initiating a new packet in order to use FEC"); + return -1; + } + uint16_t fec_seqnum = pkt_get_seqnum(fec); + pkt_set_type(new_packet, PTYPE_DATA); + pkt_set_seqnum(new_packet, missing_seqnum); + pkt_set_length(new_packet, pkt_get_length(fec)); + for ( uint16_t idx = fec_seqnum; idx < fec_seqnum + FEC_CALCULATED_ON; idx++ ) + { + if ( state->recvd_data_buf[idx] != NULL) + pkt_set_length(new_packet, pkt_get_length(new_packet) ^ pkt_get_length(state->recvd_data_buf[idx])); + } + uint16_t payload_length = pkt_get_length(new_packet); + uint8_t new_payload[payload_length]; + uint8_t * fec_payload = (uint8_t *) pkt_get_payload(fec); + uint8_t * other_payloads[FEC_CALCULATED_ON-1]; + + for ( uint16_t idx = fec_seqnum, idx_other_payloads = 0; idx < fec_seqnum + FEC_CALCULATED_ON; idx++ ) + { + if ( state->recvd_data_buf[idx] != NULL) + { + other_payloads[idx_other_payloads] = (uint8_t *) pkt_get_payload(state->recvd_data_buf[idx]); + idx_other_payloads++; + } + } + + for ( uint16_t i = 0; i < payload_length; i++ ) + { + new_payload[i] = fec_payload[i]; + for ( uint16_t idx_other_payloads = 0; idx_other_payloads < FEC_CALCULATED_ON-1; idx_other_payloads++ ) + new_payload[i] = new_payload[i] ^ other_payloads[idx_other_payloads][i]; + } + pkt_set_payload(new_packet, (char *) new_payload, payload_length); + + DEBUG("Used FEC packet [%d] to recover data packet with seqnum [%d] | The new packet payload length is %d", + fec_seqnum, missing_seqnum, payload_length); + + // Add packet to received data buffer and treat the packet + // We pass the packet to the rest as if it had just came from the network + if (handle_data_pkt(state, new_packet) != 0) return -1; + + pkt_del(new_packet); + return 0; +} + +int can_fec_be_used(receiver_state_t * state, uint16_t fec_seqnum, uint16_t * missing_seqnum) +{ + uint16_t idx = fec_seqnum; + uint16_t in_range_missing = 0; + uint16_t in_range_available = 0; + for (; idx < fec_seqnum + FEC_CALCULATED_ON; idx++) + { + if (state->recvd_data_buf[idx] != NULL) + { + in_range_available++; + } else + { + in_range_missing++; + *missing_seqnum = idx; + } + } + return (in_range_available == FEC_CALCULATED_ON-1 && in_range_missing == 1); +} + +int potential_usage_of_fec(receiver_state_t * state, const pkt_t * pkt) +{ + uint16_t fec_seqnum = pkt_get_seqnum(pkt); + uint16_t missing_data_seqnum = 0; + if ( can_fec_be_used(state, fec_seqnum, &missing_data_seqnum) ) + { + DEBUG("Going to use FEC packet with seqnum %d to recover data packet with seqnum %d", + fec_seqnum, missing_data_seqnum); + if ( use_fec(state, pkt, missing_data_seqnum) != 0 ) return -1; + } else + { + DEBUG("Received FEC with seqnum [%d] but wasn't used", fec_seqnum); + } + return 0; +} + +/** + * @brief This function handles PTYPE_FEC arriving packets and updates the state. + * + * @param state: The receiver state. + * @param pkt: The DATA packet. + * @returns 0 upon success else -1. + * + * @requires: - The packet is in the current receiving window + * + * @modifies: state. + */ +int handle_fec_pkt(receiver_state_t * state, const pkt_t * pkt) +{ + ASSERT(state != NULL && pkt != NULL); + uint8_t seqnum = pkt_get_seqnum(pkt); + if (seqnum + 3 < state->last_received_in_order ) + { + DEBUG("Received FEC with seqnum [%d] but wasn't used since last received in order : %d", + pkt_get_seqnum(pkt), state->last_received_in_order); + return 0; + } + return potential_usage_of_fec(state, pkt); } /** @@ -272,12 +359,13 @@ int handle_data_pkt(receiver_state_t * state, const pkt_t * pkt) */ int handle_truncated_pkt(receiver_state_t * state, const pkt_t * pkt) { + DEBUG("Received a truncated packet with seqnum %d", pkt_get_seqnum(pkt)); if (pkt_get_type(pkt) != PTYPE_DATA) return 0; int free_idx = 0; for (; free_idx < RECV_MAX_SLCTV_RPT_WDW && state->nack_to_send[free_idx] != NULL; free_idx++); - if ( free_idx == 0 ) return 0; + if ( free_idx == 32 ) return 0; pkt_t * pkt_to_send = pkt_new(); if (pkt_to_send == NULL) return -1; @@ -334,7 +422,6 @@ int handle_valid_pkt(receiver_state_t * state, const pkt_t * pkt) { /* Type FEC */ return handle_fec_pkt(state, pkt); } - return 0; } @@ -366,10 +453,10 @@ int handle_incoming(struct pollfd * pfd, receiver_state_t * state) if (handle_valid_pkt(state, (const pkt_t *) pkt) != 0) return -1; } else { - DEBUG("Received a damaged packet with %d status.", pkt_status); + DEBUG("Received a damaged packet with %d status. and seqnum as %d", pkt_status, pkt_get_seqnum(pkt)); // See if there's a FEC that can be used and update buffer pkt_del(pkt); - return prepare_ack_to_send(state); + return 0; } pkt_del(pkt); return 0; @@ -432,7 +519,7 @@ receiver_state_t * state_new() if (to_return == NULL) return NULL; for (size_t i = 0; i < TWO_EXP_EIGHT; i++) - to_return->recvd_buf[i] = NULL; + to_return->recvd_data_buf[i] = NULL; for (size_t i = 0; i < RECV_MAX_SLCTV_RPT_WDW; i++) to_return->nack_to_send[i] = NULL; @@ -456,7 +543,7 @@ void state_del(receiver_state_t * state) { ASSERT(state != NULL); for (int i = 0; i < 31; i++) - pkt_del(state->recvd_buf[i]); + pkt_del(state->recvd_data_buf[i]); free(state->ack_to_send); free(state); diff --git a/src/receiver_utils.h b/src/receiver_utils.h index 180bda38a2a72ef32d4a3d7872a34cd0805d832c..e6d9f041800e19d9a833f2f2145992be3fd71874 100644 --- a/src/receiver_utils.h +++ b/src/receiver_utils.h @@ -41,7 +41,7 @@ typedef struct __attribute__((__packed__)) uint8_t recv_window_start; uint8_t last_received_in_order; uint32_t latest_timestamp; - pkt_t * recvd_buf[TWO_EXP_EIGHT]; + pkt_t * recvd_data_buf[TWO_EXP_EIGHT]; pkt_t * ack_to_send; pkt_t * nack_to_send[RECV_MAX_SLCTV_RPT_WDW]; uint8_t transfer_done; diff --git a/tests/advanced_test.sh b/tests/advanced_test.sh index 72fb0aa828fe219de5971da24fae29ef7f24f7ce..5e297744c705209af27db2d2d0679fe1c2c85717 100755 --- a/tests/advanced_test.sh +++ b/tests/advanced_test.sh @@ -30,11 +30,11 @@ port1=$(comm -23 <(seq 65000 65200 | sort) <(ss -Htan | awk '{print $4}' | cut - port2=$(comm -23 <(seq 65000 65200 | sort) <(ss -Htan | awk '{print $4}' | cut -d':' -f2 | sort -u) | shuf | head -n 1) # We first launch the link simulator -./linksimulator/link_sim -p $port2 -P $port1 -l 20 -d 300 -e 8 -c 12 -R \ +./linksimulator/link_sim -p $port2 -P $port1 -l 25 -d 0 -e 20 -c 10 \ &>${TEST_OUTPUT_FILES}/adv_${BSN_PRE}_link.log & link_pid=$! # We launch the receiver and capture its output -valgrind --leak-check=full --log-file=${TEST_OUTPUT_FILES}/adv_${BSN_PRE}_valgrind_receiver.log \ +valgrind --leak-check=full --log-file=${TEST_OUTPUT_FILES}/adv_valgrind_${BSN_PRE}_receiver.log \ ./receiver ::1 $port1 1> ${TEST_OUTPUT_FILES}/adv_${BSN_PRE}_received_file.${BSN_EXT} \ 2> ${TEST_OUTPUT_FILES}/adv_${BSN_PRE}_receiver.log & receiver_pid=$!