Skip to content
Extraits de code Groupes Projets
lora_coding.py 12,2 ko
Newer Older
  • Learn to ignore specific revisions
  • import numpy as np
    
    def hdr_crc(m_header): 
        c4 = (m_header[0] & 0b1000) >> 3 ^ (m_header[0] & 0b0100) >> 2 ^ (m_header[0] & 0b0010) >> 1 ^ (m_header[0] & 0b0001)
        c3 = (m_header[0] & 0b1000) >> 3 ^ (m_header[1] & 0b1000) >> 3 ^ (m_header[1] & 0b0100) >> 2 ^ (m_header[1] & 0b0010) >> 1 ^ (m_header[2] & 0b0001)
        c2 = (m_header[0] & 0b0100) >> 2 ^ (m_header[1] & 0b1000) >> 3 ^ (m_header[1] & 0b0001) ^ (m_header[2] & 0b1000) >> 3 ^ (m_header[2] & 0b0010) >> 1
        c1 = (m_header[0] & 0b0010) >> 1 ^ (m_header[1] & 0b0100) >> 2 ^ (m_header[1] & 0b0001) ^ (m_header[2] & 0b0100) >> 2 ^ (m_header[2] & 0b0010) >> 1 ^ (m_header[2] & 0b0001)
        c0 = (m_header[0] & 0b0001) ^ (m_header[1] & 0b0010) >> 1 ^ (m_header[2] & 0b1000) >> 3 ^ (m_header[2] & 0b0100) >> 2 ^ (m_header[2] & 0b0010) >> 1 ^ (m_header[2] & 0b0001)
    
        m_header3 = c4
        m_header4 = c3 << 3 | c2 << 2 | c1 << 1 | c0
        return m_header3, m_header4
    
    def calculate_hdr(payload_len, CR, CRC_EN):
        header_len_nibbles = 5
        header = np.zeros((5), dtype =np.int16)
        header[0] = (payload_len & 0xF0) >>4
        header[1] =  payload_len & 0x0F
        header[2] = ((CR & 0x07) << 1) | (CRC_EN & 0x01)
    
        header_to_crc_calc = (header[2]<<8) | (header[1]<<4) | (header[0])
        header[3], header[4] = hdr_crc(header[0:3])
    
        return header
    
    #####################################################################################
    
    def crc16(val, new_byte):
        for i in range(8):
            if (((val & 0x8000) >> 8) ^ (new_byte & 0x80)):
                val = (val << 1)  ^ 0x1021
            else:
                val = (val << 1)
            new_byte = new_byte << 1
        return val & 0xFFFF
    
    
    def compute_crc(payload):
        n_bytes = len(payload)
        crc = 0x0000
    	#calculate CRC on the N-2 firsts data bytes using Poly=1021 Init=0000
        for  i in range( n_bytes-2):
            crc = crc16(crc, payload[i])
            
    
    	#XOR the obtained CRC with the last 2 data bytes
        crc = crc ^ payload[n_bytes-1] ^ (payload[n_bytes-2]<<8)
    
            
        data_crc = np.zeros((4), dtype =np.int8)
        data_crc[0] =  crc & 0x000F
        data_crc[1] = (crc & 0x00F0) >>4
        data_crc[2] = (crc & 0x0F00) >>8
        data_crc[3] = (crc & 0xF000) >>12
        return data_crc
    
    #####################################################################################
    
    def gray_map(data, SF):
        syms = np.copy(data)
        for i in range(len(syms)):
            syms[i] = (syms[i] - 1) % (1 << SF)  # Perform modulo operation
            syms[i] = syms[i] ^ (syms[i] >> 1)  # Convert to Gray code
        return syms
    
    def gray_demap(data, SF):
        syms = np.copy(data)
        for i in range(len(data)):
            s = syms[i]
            for j in range(1, SF):
                s = s ^ (syms[i] >> j)  # Iteratively XOR with shifted versions
            syms[i] = (s + 1) % (1 << SF)  # Restore binary and modulo operation
        return syms
    
    #####################################################################################
    
    
    def xor_integer_bits(n, num_bits):
        result = 0
        for i in range(num_bits):
            # Extract the i-th bit using bitwise AND and right-shift operations
            bit = (n >> i) & 1
            # XOR the bit with the result
            result ^= bit
        return result
    
    
    def interleave(codewords,  SF, CR, LDRO):
        if LDRO:
            sf = SF - 2
        else:
            sf = SF
    
        ppm = 4 + CR
        syms = np.zeros((ppm), dtype =np.int16)
        for i in range(ppm):
            for j in range(sf):
                bit = (codewords[(i - j - 1) % sf] >> (ppm - 1 - i)) & 1  # Extract the specific bit
                syms[i] |= (bit << (sf - j - 1 ))  # Set the corresponding bit in syms
    
            if LDRO : 
                par = xor_integer_bits(syms[i], sf)
                syms[i] =  (syms[i] << 2 ) | (par<<1)
    
        return syms
    
    
    
    def deinterleave(syms, SF, CR, LDRO):
        if LDRO:
            sf = SF - 2
            ldro_offset = 2
        else:
            sf = SF
            ldro_offset = 0
    
        ppm = 4 + CR
    
        codewords = np.zeros((sf), dtype =np.int16)
    
        for i in range(ppm):
            for j in range(sf):
                bit = (syms[i] >> (sf - j - 1 + ldro_offset)) & 1
                codewords[(i-j-1)% sf] |= (bit << (ppm - 1 - i))
        
        return codewords
    
    #####################################################################################
    
    whitening_seq = [
        0xFF, 0xFE, 0xFC, 0xF8, 0xF0, 0xE1, 0xC2, 0x85, 0x0B, 0x17, 0x2F, 0x5E, 0xBC, 0x78, 0xF1, 0xE3,
        0xC6, 0x8D, 0x1A, 0x34, 0x68, 0xD0, 0xA0, 0x40, 0x80, 0x01, 0x02, 0x04, 0x08, 0x11, 0x23, 0x47,
        0x8E, 0x1C, 0x38, 0x71, 0xE2, 0xC4, 0x89, 0x12, 0x25, 0x4B, 0x97, 0x2E, 0x5C, 0xB8, 0x70, 0xE0,
        0xC0, 0x81, 0x03, 0x06, 0x0C, 0x19, 0x32, 0x64, 0xC9, 0x92, 0x24, 0x49, 0x93, 0x26, 0x4D, 0x9B,
        0x37, 0x6E, 0xDC, 0xB9, 0x72, 0xE4, 0xC8, 0x90, 0x20, 0x41, 0x82, 0x05, 0x0A, 0x15, 0x2B, 0x56,
        0xAD, 0x5B, 0xB6, 0x6D, 0xDA, 0xB5, 0x6B, 0xD6, 0xAC, 0x59, 0xB2, 0x65, 0xCB, 0x96, 0x2C, 0x58,
        0xB0, 0x61, 0xC3, 0x87, 0x0F, 0x1F, 0x3E, 0x7D, 0xFB, 0xF6, 0xED, 0xDB, 0xB7, 0x6F, 0xDE, 0xBD,
        0x7A, 0xF5, 0xEB, 0xD7, 0xAE, 0x5D, 0xBA, 0x74, 0xE8, 0xD1, 0xA2, 0x44, 0x88, 0x10, 0x21, 0x43,
        0x86, 0x0D, 0x1B, 0x36, 0x6C, 0xD8, 0xB1, 0x63, 0xC7, 0x8F, 0x1E, 0x3C, 0x79, 0xF3, 0xE7, 0xCE,
        0x9C, 0x39, 0x73, 0xE6, 0xCC, 0x98, 0x31, 0x62, 0xC5, 0x8B, 0x16, 0x2D, 0x5A, 0xB4, 0x69, 0xD2,
        0xA4, 0x48, 0x91, 0x22, 0x45, 0x8A, 0x14, 0x29, 0x52, 0xA5, 0x4A, 0x95, 0x2A, 0x54, 0xA9, 0x53,
        0xA7, 0x4E, 0x9D, 0x3B, 0x77, 0xEE, 0xDD, 0xBB, 0x76, 0xEC, 0xD9, 0xB3, 0x67, 0xCF, 0x9E, 0x3D,
        0x7B, 0xF7, 0xEF, 0xDF, 0xBF, 0x7E, 0xFD, 0xFA, 0xF4, 0xE9, 0xD3, 0xA6, 0x4C, 0x99, 0x33, 0x66,
        0xCD, 0x9A, 0x35, 0x6A, 0xD4, 0xA8, 0x51, 0xA3, 0x46, 0x8C, 0x18, 0x30, 0x60, 0xC1, 0x83, 0x07,
        0x0E, 0x1D, 0x3A, 0x75, 0xEA, 0xD5, 0xAA, 0x55, 0xAB, 0x57, 0xAF, 0x5F, 0xBE, 0x7C, 0xF9, 0xF2,
        0xE5, 0xCA, 0x94, 0x28, 0x50, 0xA1, 0x42, 0x84, 0x09, 0x13, 0x27, 0x4F, 0x9F, 0x3F, 0x7F
    ]
    
    def whitening(payload):
        nibbles = np.zeros((2*len(payload)), dtype =np.int8)
        for i in range(len(payload)):
            low_nib  = (payload[i] & 0x0F) ^ (whitening_seq[i] & 0x0F)
            high_nib = (payload[i] & 0xF0)>>4 ^ (whitening_seq[i] & 0xF0)>>4
            nibbles[2*i] = low_nib
            nibbles[2*i+1] = high_nib
        return nibbles
    
    def dewhitening(nibbles):
        payload = np.zeros((len(nibbles)//2), dtype =np.int8)
        for i in range(len(nibbles)//2):
            low_nib  = nibbles[2*i]   ^ (whitening_seq[i] & 0x0F)
            high_nib = nibbles[2*i+1] ^ (whitening_seq[i] & 0xF0)>>4
            payload[i] = (high_nib << 4) | low_nib
        return payload
    
    def lfsr(state, taps):
        feedback = 0
        for tap in taps:
            feedback ^= (state >> (tap - 1)) & 1  # Extract the bit and XOR it
        # Shift the register and insert the feedback bit at the leftmost position
        state = ((state << 1) | feedback) & 0xFF
        return state
    
    def whitening_lfsr(payload):
        state = 0xFF
        nibbles = np.zeros((2*len(payload)), dtype =np.int8)
        for i in range(len(payload)):
            low_nib  = (payload[i] & 0x0F) ^ (state & 0x0F)
            high_nib = (payload[i] & 0xF0)>>4 ^ (state & 0xF0)>>4
            nibbles[2*i] = low_nib
            nibbles[2*i+1] = high_nib
            state = lfsr(state,[8, 6, 5, 4])
        return nibbles
    
    def dewhitening_lfsr(nibbles):
        state = 0xFF
        payload = np.zeros((len(nibbles)//2), dtype =np.uint8)
        for i in range(len(nibbles)//2):
            low_nib  = nibbles[2*i]   ^ (state & 0x0F)
            high_nib = nibbles[2*i+1] ^ (state & 0xF0)>>4
            payload[i] = (high_nib << 4) | low_nib
            state = lfsr(state,[8, 6, 5, 4])
        return payload
    
    #####################################################################################
    
    def hamming_encode(data, CR):
    	data_len = len(data)
    	codewords = np.zeros((len(data)), dtype =np.int8)
    	for i in range(data_len):
    		d0 = (data[i] >> 3) & 1
    		d1 = (data[i] >> 2) & 1
    		d2 = (data[i] >> 1) & 1
    		d3 = (data[i] >> 0) & 1
    
    		if (CR==1):
    			p4 = d0 ^ d1 ^ d2 ^ d3
    			codewords[i] = d3<<4 | d2<<3 | d1<<2 | d0<<1 | p4
    		else:
    			p0 = d3 ^ d2 ^ d1
    			p1 = d2 ^ d1 ^ d0
    			p2 = d3 ^ d2 ^ d0
    			p3 = d3 ^ d1 ^ d0
    			codewords[i] = (d3<<7 | d2<<6 | d1<<5 | d0<<4 | p0<<3 | p1<<2 | p2<<1 | p3) >> (4-CR)
    	return codewords
    
    def hamming_decode(codewords, CR):
    	codewords_len = len(codewords)
    	data = np.zeros((codewords_len), dtype =np.int8)
    
    	for i in range(codewords_len):
    		cw = codewords[i] << (4-CR)
    
    		c0 = (cw >> 7) & 1
    		c1 = (cw >> 6) & 1
    		c2 = (cw >> 5) & 1
    		c3 = (cw >> 4) & 1
    		c4 = (cw >> 3) & 1
    		c5 = (cw >> 2) & 1
    		c6 = (cw >> 1) & 1
    		cp = (cw >> 0) & 1
    
    		if (CR > 2) :
    			s0 = c0 ^ c1 ^ c2 ^ c4
    			s1 = c1 ^ c2 ^ c3 ^ c5
    			s2 = c0 ^ c1 ^ c3 ^ c6
    			syndrom = s0 | (s1 << 1) | (s2 << 2)
    
    			if syndrom == 5 : 
    				c0 = (1-c0)
    			elif syndrom == 7 : 
    				c1 = (1-c1)
    			elif syndrom == 3 : 
    				c2 = (1-c2)
    			elif syndrom == 6 : 
    				c3 = (1-c3)
    
    		data[i] = (c3 << 3) | (c2 << 2) | (c1 << 1) | c0
    
    	return data
    
    #####################################################################################
    
    def payload_to_symbol(input_data, SF, CR, CRC_EN, LDRO):
        whitened_data = whitening_lfsr(input_data)
    
        header = calculate_hdr(len(input_data), CR, CRC_EN)
    
        data_crc = compute_crc(input_data)
    
    
        if CRC_EN:
            pkt = np.concatenate((header,whitened_data,data_crc))
        else : 
            pkt = np.concatenate((header,whitened_data))
    
        ldro_hdr = 1 if SF>6 else 0
    
    
        first_encoded = hamming_encode(pkt[0:SF-2*ldro_hdr],4)
        rest_encoded  = hamming_encode(pkt[SF-2*ldro_hdr :],CR)
    
        n_blocks = int(np.ceil(len(rest_encoded)/(SF - 2*LDRO))); #Number of blocks in payload
    
        padded_encoded = np.pad(rest_encoded,(0,(n_blocks*SF - len(rest_encoded))))
    
        n_symb = 8 + n_blocks*(4+CR)
    
        binary_symbols      = np.zeros((n_symb),dtype=np.int16)
        binary_symbols[0:8] = interleave(first_encoded,SF,4,ldro_hdr )
        sf = SF-2*LDRO
        for i in range(n_blocks):
            binary_symbols[8+i*(4+CR) : 8+(i+1)*(4+CR)] = interleave(padded_encoded[i*sf : (i+1)*sf],SF,CR,LDRO )
    
        symbols = gray_demap(binary_symbols,SF)
    
        return symbols
    
    #####################################################################################
    
    def symbols_to_payload(symbols,SF,LDRO, verbose = False):
        ldro_hdr = 1 if SF>6 else 0
        degrayed_symbols = gray_map(symbols,SF)
        if verbose:
            print("Degrayed symbols :")
            print("------------------------------")
            print(degrayed_symbols)
            print("---------------------------------")
    
        first_block  = deinterleave(degrayed_symbols[:8],SF,4,ldro_hdr)
        firstBlen = len(first_block)
        decoded_hdr = hamming_decode(first_block,4)
    
        dec_hdr_len = (decoded_hdr[0]<<4)|(decoded_hdr[1])
        dec_crc_en  = (decoded_hdr[2] & 0x01)
        dec_hdr_cr  = (decoded_hdr[2] >>1) & 0x03
        dec_hdr_crc =((decoded_hdr[3] & 0x01) << 4) | decoded_hdr[4]
        hdrCRC3,hdrCRC4  = hdr_crc(decoded_hdr[0:3])
        rec_hdr_cr = (hdrCRC3<< 4) | hdrCRC4
        
        n_received_blocks = int((len(degrayed_symbols)-8)/(4+dec_hdr_cr))
        other_blocks = np.zeros((n_received_blocks*(SF- 2*LDRO)),dtype=np.int16)
    
        sf = SF - 2*LDRO
        for i in range(n_received_blocks):
            other_blocks[i*sf : (i+1)*sf] = deinterleave(degrayed_symbols[8+i*(4+dec_hdr_cr) : 8+(i+1)*(4+dec_hdr_cr)],SF,dec_hdr_cr,LDRO )
        
        if verbose:
            print("Deinterleaved symbols")
            print("----------1st block ----------")
            print(first_block)
            print("----------2nd block ----------")
            print(other_blocks)
            print("------------------------------")
    
        decoded_pay        = np.concatenate((decoded_hdr[5:],hamming_decode(other_blocks,dec_hdr_cr)))
    
        if verbose:
            print("Decoded payload")
            print("------------------------------")
            print(decoded_pay)
            print("------------------------------")
        
        dewhitened_payload = dewhitening_lfsr(decoded_pay)
        
        if verbose:
            print("Dewhitened payload")
            print("------------------------------")
            print(dewhitened_payload)
            print("------------------------------")
    
        print("Payload len : %d  "%(dec_hdr_len))
        print("Code rate :   4/%d"%(4+dec_hdr_cr))
        print("Header CRC :  %d vs %d"%(dec_hdr_crc,rec_hdr_cr))
        if dec_crc_en : 
            idx = dec_hdr_len*2
            rec_data_crc = compute_crc(dewhitened_payload[:dec_hdr_len])
            print("CRC is enabled :     %s  vs %s"%(decoded_pay[idx:idx+4],rec_data_crc))
    
        print(''.join(chr(d) for d in dewhitened_payload[:dec_hdr_len]))
    
    #####################################################################################