-
Pol Maistriaux a rédigéPol Maistriaux a rédigé
wuRx_lib.py 51,27 Kio
import os
import shutil
import sys
import numpy as np
import matplotlib
import scipy.io
import matplotlib.pyplot as plt
from matplotlib import ticker, cm
from matplotlib.colors import LogNorm
from matplotlib.ticker import (AutoMinorLocator, MultipleLocator)
import importlib
import random
from scipy import signal
from scipy.fft import fft, fftfreq, fftshift
from scipy.stats import chi2, ncx2, bernoulli, binom, ncf, norm
from scipy import special
dictColor = {"DeepBlue" :"#095256",
"LightBlue" :"#01a7c2",
"UnitedBlue" :"#6290c8",
"Green" :"#44af69",
"LightRed" :"#f8333c",
"DeepRed" :"#a22c29",
"Orange" :"#fcab10",
"Sandy" :"#fc9e4F",
"Olivine" :"#9ab87a",
"CarolinaBlue" :"#009DDC",
"2DeepBlue" :"#095256",
"2LightBlue" :"#01a7c2",
"2UnitedBlue" :"#6290c8",
"2CerulBlue" :"#6096ba",
"2Green" :"#44af69"}
listColor = list(dictColor.values())
###########################################################################
def bitreverse_reorder(array,Nfft):
N = len(array)
Nsig = N//Nfft
lst = np.arange(Nfft)
array_reoreder = np.zeros((Nsig,Nfft), dtype=np.complex64)
def order_in_reversed_bits_python(lst):
return np.array([v for _, v in sorted(enumerate(lst), key=lambda k: bin(k[0])[:1:-1])])
new_order = order_in_reversed_bits_python(lst)
for s in range(Nsig):
for i,index in enumerate(new_order):
array_reoreder[s,i] = array[index+s*Nfft]
return array_reoreder
def bitreverse_index(index_array,Nfft):
N = len(index_array)
array_reoreder = np.zeros((N))
lst = np.arange(Nfft)
def order_in_reversed_bits_python(lst):
return np.array([v for _, v in sorted(enumerate(lst), key=lambda k: bin(k[0])[:1:-1])])
new_order = order_in_reversed_bits_python(lst)
for i,index in enumerate(index_array):
array_reoreder[i] = new_order[index]
return array_reoreder
###########################################################################
def quantif_signedMag(in_array,N,Q,clip=0.9999):
length = len(in_array)
in_array_r = np.real(in_array)
in_array_i = np.imag(in_array)
clipping = clip*(2**(N-Q-1))
input_clip_r = np.clip(in_array_r , a_min= -clipping, a_max = clipping)
input_clip_i = np.clip(in_array_i , a_min= -clipping, a_max = clipping)
input_Q_r = input_clip_r * (2 ** Q)
input_Q_i = input_clip_i * (2 ** Q)
input_quant_r = np.floor(np.abs(input_Q_r)).astype(np.int32)
input_quant_i = np.floor(np.abs(input_Q_i)).astype(np.int32)
#input_quant_r = np.abs(np.floor(input_Q_r)).astype(np.int32)
#input_quant_i = np.abs(np.floor(input_Q_i)).astype(np.int32)
input_quant_ns_r = np.bitwise_and(np.abs(input_quant_r), ((2**(N-1))-1)) # Overflow
input_quant_ns_i = np.bitwise_and(np.abs(input_quant_i), ((2**(N-1))-1)) # Overflow
sign_input_r = (input_Q_r<0).astype(np.int32)
sign_input_i = (input_Q_i<0).astype(np.int32)
input_core = np.zeros((2*length)).astype(np.uint32)
for i in range(length):
input_core[i*2 ]= input_quant_ns_r[i] | (sign_input_r[i]<< (N-1))
input_core[i*2+1]= input_quant_ns_i[i] | (sign_input_i[i]<< (N-1))
to_plot = np.zeros((2,length))
to_plot[0,:] = (2*(input_Q_r>0) -1)*input_quant_r
to_plot[1,:] = (2*(input_Q_i>0) -1)*input_quant_i
return (input_Q_r+1j*input_Q_i),to_plot, input_core
def quantif_twoComplement(in_array,N,Q,clip=0.9999, round =False):
length = len(in_array)
in_array_r = np.real(in_array)
in_array_i = np.imag(in_array)
clipping = clip*(2**(N-Q-1))
input_clip_r = np.clip(in_array_r , a_min= -clipping, a_max = clipping)
input_clip_i = np.clip(in_array_i , a_min= -clipping, a_max = clipping)
input_Q_r = input_clip_r * (2 ** Q)
input_Q_i = input_clip_i * (2 ** Q)
if(round):
input_quant_r = np.round((input_Q_r)).astype(np.int32)
input_quant_i = np.round((input_Q_i)).astype(np.int32)
else:
input_quant_r = np.floor((input_Q_r)).astype(np.int32)
input_quant_i = np.floor((input_Q_i)).astype(np.int32)
input_core = np.zeros((2*length)).astype(np.uint32)
for i in range(length):
input_core[i*2 ]= input_quant_r[i] & (2**N -1)# input_quant_ns_r[i] | (sign_input_r[i]<< (N-1))
input_core[i*2+1]= input_quant_i[i] & (2**N -1)# input_quant_ns_i[i] | (sign_input_i[i]<< (N-1))
to_plot = np.zeros((2,length))
to_plot[0,:] = input_quant_r
to_plot[1,:] = input_quant_i
return (input_quant_r+1j*input_quant_i),to_plot, input_core
def read_IQ_outputs(file_path,N):
outputs_core_iq = np.loadtxt(file_path,delimiter=" ",converters={0:lambda s: int(s,16)}).astype(np.uint32)
Nfft = len(outputs_core_iq)
outputs_core = np.zeros((Nfft,), dtype=np.complex64)
code_part = 2**(N) -1
code_mag = ((2**(N-1))-1)
for i in range(Nfft):
first_part = np.bitwise_and((outputs_core_iq[i]>> N ), code_part)
sec_part = np.bitwise_and((outputs_core_iq[i] ), code_part)
first_sign = 1-2*(first_part >>(N -1))
sec_sign = 1-2*(sec_part >>(N -1))
a = np.bitwise_and(np.abs(first_part), code_mag)
b = np.bitwise_and(np.abs(sec_part ), code_mag)
outputs_core[i] = a * first_sign + 1j* b * sec_sign
return outputs_core
def read_IQ_outputs_interleaved(file_path,N):
outputs_core = np.loadtxt(file_path,delimiter=" ",converters={0:lambda s: int(s,16)}).astype(np.uint32)
outputs_core_iq = np.array([outputs_core[0::2], outputs_core[1::2]])
Nfft = np.shape(outputs_core_iq)[1]
outputs_core = np.zeros((Nfft,), dtype=np.complex64)
code_part = 2**(N) -1
code_mag = ((2**(N-1))-1)
for i in range(Nfft):
first_part = np.bitwise_and((outputs_core_iq[0,i] ), code_part)
sec_part = np.bitwise_and((outputs_core_iq[1,i] ), code_part)
first_sign = 1-2*(first_part >>(N -1))
sec_sign = 1-2*(sec_part >>(N -1))
a = np.bitwise_and(np.abs(first_part), code_mag)
b = np.bitwise_and(np.abs(sec_part ), code_mag)
outputs_core[i] = a * first_sign + 1j* b * sec_sign
return outputs_core
def sign_extend(value, bits):
sign_bit = 1 << (bits - 1)
return (value & (sign_bit - 1)) - (value & sign_bit)
def read_IQ_outputs_interleaved_2s(file_path,N):
outputs_core = np.loadtxt(file_path,delimiter=" ",converters={0:lambda s: int(s,16)}).astype(np.int32)
outputs_core_ext = sign_extend(outputs_core, N)
outputs_core_iq =outputs_core_ext[0::2] + 1j* outputs_core_ext[1::2]
return outputs_core_iq
def read_Mag_outputs(file_path,N):
outputs_core_mag = np.loadtxt(file_path,delimiter=" ",converters={0:lambda s: int(s,16)}).astype(np.uint64)
Nfft = len(outputs_core_mag)
outputs_core = np.zeros((Nfft,))
code_part = 2**(N) -1
code_mag = ((2**(N-1))-1)
for i in range(Nfft):
outputs_core[i] = outputs_core_mag[i]
return outputs_core
def read_rescaling(file_path):
rescaling = np.loadtxt(file_path,converters={0:lambda s: int(s)})
return rescaling
def read_maxidx(file_path):
maxidx = np.loadtxt(file_path,converters={0:lambda s: int(s)})
return maxidx
#def read_Mag_outputs(file_path,N):
# outputs_core_mag = np.loadtxt(file_path,delimiter=" ",converters={0:lambda s: int(s,16)}).astype(np.uint32)
# Nfft = len(outputs_core_mag)
# outputs_core = np.zeros((Nfft,), dtype=np.complex64)
#
# code_part = 2**(N) -1
# code_mag = ((2**(N-1))-1)
# for i in range(Nfft):
# outputs_core[i] = np.bitwise_and((outputs_core_mag[i]>> N ), code_part)
# return outputs_core
def read_FFT_core_outputs(file_path,NFFT,N):
outputs_core = read_IQ_outputs(file_path,N)
outputs_core_reorder = bitreverse_reorder(outputs_core,NFFT)
return outputs_core_reorder
def read_FFT_core_outputs_interleaved(file_path,NFFT,N):
outputs_core = read_IQ_outputs_interleaved(file_path,N)
outputs_core_reorder = bitreverse_reorder(outputs_core,NFFT)
return outputs_core_reorder
###########################################################################
def estimate_perf(input_data=[],input_symbols=[],NSNR_tested=1,NTRIALS_atSNR=1,NSF=128):
abs_signal = np.abs(input_data)
# Store symbols Peak Value (might be dominated by noise)
symbol_peak = np.zeros((NSNR_tested,NTRIALS_atSNR))
for i in range(NSNR_tested):
for j in range(NTRIALS_atSNR):
symbol_peak[i,j] = abs_signal[i,j,input_symbols[i,j]]
# Get the index of the maximum (the max observed)
ind_max_signal = np.argmax(abs_signal,axis=2)
# Remove the peak of the input symbol bin
sig_noPeak = np.copy(input_data)
# Noise variance of the output signal without the bin of the input symbol
noSig_var_trial = np.zeros((NSNR_tested,NTRIALS_atSNR))
# Output SNR of the output symbol bin on output noise variance
snr_trial = np.zeros((NSNR_tested,NTRIALS_atSNR))
for i in range(NSNR_tested):
for j in range(NTRIALS_atSNR):
sig_noPeak[i,j,input_symbols[i,j]] = 0
noise_var_thisTrial = np.mean(np.abs( sig_noPeak[i,j,:] - (np.sum(sig_noPeak[i,j,:])/(NSF-1)) )**2)
noSig_var_trial[i,j] = noise_var_thisTrial
snr_trial[i,j] = (symbol_peak[i,j]**2)/noise_var_thisTrial
symbol_mean_peak = np.mean(symbol_peak,axis=1)
noiseVar_estimated = np.mean(noSig_var_trial,axis=1)
outSNR_estimated = np.mean(snr_trial, axis=1)
outSNR_estimated = 10*np.log10(outSNR_estimated)
return symbol_mean_peak, noiseVar_estimated, outSNR_estimated
###########################################################################
def estimate_perf_SNR(input_data_pow=[],input_symbols=[],NSNR_tested=1,NTRIALS_atSNR=1,NSF=128):
abs_signal = (input_data_pow)
# Store symbols Peak Value (might be dominated by noise)
symbol_peak = np.zeros((NSNR_tested,NTRIALS_atSNR))
for i in range(NSNR_tested):
for j in range(NTRIALS_atSNR):
symbol_peak[i,j] = abs_signal[i,j,input_symbols[i,j]]
# Get the index of the maximum (the max observed)
ind_max_signal = np.argmax(abs_signal,axis=2)
# Remove the peak of the input symbol bin
sig_noPeak = np.copy(abs_signal)
# Noise variance of the output signal without the bin of the input symbol
noSig_var_trial = np.zeros((NSNR_tested,NTRIALS_atSNR))
# Output SNR of the output symbol bin on output noise variance
snr_trial = np.zeros((NSNR_tested,NTRIALS_atSNR))
for i in range(NSNR_tested):
for j in range(NTRIALS_atSNR):
sig_noPeak[i,j,input_symbols[i,j]] = 0
noise_var_thisTrial = (np.sum(sig_noPeak[i,j,:]**2)/(NSF-1))#np.mean(np.abs( sig_noPeak[i,j,:] - (np.sum(sig_noPeak[i,j,:])/(NSF-1)) )**2)
noSig_var_trial[i,j] = noise_var_thisTrial
snr_trial[i,j] = (symbol_peak[i,j]**2)/noise_var_thisTrial
symbol_mean_peak = np.mean(symbol_peak,axis=1)
noiseVar_estimated = np.mean(noSig_var_trial,axis=1)
outSNR_estimated = np.mean(snr_trial, axis=1)
outSNR_estimated = 10*np.log10(outSNR_estimated)
return symbol_mean_peak, noiseVar_estimated, outSNR_estimated
###########################################################################
def estimate_perf_NoiseStd(input_data_pow=[],NSNR_tested=1,NTRIALS_atStd=1,NSF=128):
abs_signal = np.abs(input_data_pow)
noise_trial = np.zeros((NSNR_tested,NTRIALS_atStd))
for i in range(NSNR_tested):
for j in range(NTRIALS_atStd):
noise_trial[i,j] = (np.sum(abs_signal[i,j,:]**2)/(NSF-1))#np.mean(np.abs( sig_noPeak[i,j,:] - (np.sum(sig_noPeak[i,j,:])/(NSF-1)) )**2)
return np.mean(noise_trial,axis=1)
###########################################################################
def estimate_perf_pureSig(input_data=[],input_symbols=[],NAMP_tested=1,NTRIALS_atA=1,NSF=128):
abs_signal = np.abs(input_data)
# Store symbols Peak Value (might be dominated by noise)
symbol_peak = np.zeros((NAMP_tested,NTRIALS_atA))
for i in range(NAMP_tested):
for j in range(NTRIALS_atA):
symbol_peak[i,j] = abs_signal[i,j,input_symbols[i,j]]
# Get the index of the maximum (the max observed)
ind_max_signal = np.argmax(abs_signal,axis=2)
# Remove the peak of the input symbol bin
sig_noPeak = np.copy(input_data)
# Noise variance of the output signal without the bin of the input symbol
noSig_var_trial = np.zeros((NAMP_tested,NTRIALS_atA))
# Output SNR of the output symbol bin on output noise variance
snr_trial = np.zeros((NAMP_tested,NTRIALS_atA))
for i in range(NAMP_tested):
for j in range(NTRIALS_atA):
sig_noPeak[i,j,input_symbols[i,j]] = 0
noise_var_thisTrial = np.mean(np.abs( sig_noPeak[i,j,:] - (np.sum(sig_noPeak[i,j,:])/(NSF-1)) )**2)
noSig_var_trial[i,j] = noise_var_thisTrial
snr_trial[i,j] = (symbol_peak[i,j]**2)/noise_var_thisTrial
symbol_mean_peak = np.mean(symbol_peak,axis=1)
noiseVar_estimated = np.mean(noSig_var_trial,axis=1)
outSNR_estimated = np.mean(snr_trial, axis=1)
outSNR_estimated = 10*np.log10(outSNR_estimated)
return symbol_mean_peak, noiseVar_estimated, outSNR_estimated
###########################################################################
def estimate_noiseVar_and_SNR(input_data=[],input_symbols=[],NSNR_tested=1,NTRIALS_atSNR=1,NSF=128):
abs_signal = np.abs(input_data)
# Store symbols Peak Value (might be dominated by noise)
symbol_peak = np.zeros((NSNR_tested,NTRIALS_atSNR))
for i in range(NSNR_tested):
for j in range(NTRIALS_atSNR):
symbol_peak[i,j] = abs_signal[i,j,input_symbols[i,j]]
# Get the index of the maximum (the max observed)
ind_max_signal = np.argmax(abs_signal,axis=2)
# Remove the peak of the input symbol bin
sig_noPeak = np.copy(input_data)
# Noise variance of the output signal without the bin of the input symbol
noSig_var_trial = np.zeros((NSNR_tested,NTRIALS_atSNR))
# Output SNR of the output symbol bin on output noise variance
snr_trial = np.zeros((NSNR_tested,NTRIALS_atSNR))
# Value of the maximum, not considering input symbol bin
max_noSig = np.zeros((NSNR_tested,NTRIALS_atSNR))
for i in range(NSNR_tested):
for j in range(NTRIALS_atSNR):
sig_noPeak[i,j,input_symbols[i,j]] = 0
noise_var_thisTrial = np.mean(np.abs( sig_noPeak[i,j,:] - (np.sum(sig_noPeak[i,j,:])/(NSF-1)) )**2)
noSig_var_trial[i,j] = noise_var_thisTrial
snr_trial[i,j] = (symbol_peak[i,j]**2)/noise_var_thisTrial
max_noSig[i,j] = np.max(np.abs( sig_noPeak[i,j,:]))
symbol_mean_peak = np.mean(symbol_peak,axis=1)
peak_dev = np.mean(symbol_peak/max_noSig,axis=1)
noiseVar_estimated = np.mean(noSig_var_trial,axis=1)
outSNR_estimated = np.mean(snr_trial, axis=1)
outSNR_estimated = 10*np.log10(outSNR_estimated)
inSNR_estimated = outSNR_estimated - 10*np.log10(NSF)
return symbol_mean_peak, peak_dev, noiseVar_estimated, inSNR_estimated, outSNR_estimated
###########################################################################
def LoRa_modulate(symbols = [],SF=7,fs=125e3,B=125e3, nonIdealRandom=False, theta=0, cfo =0, sto =0, sfo_ppm =0, A=1):
N = 2**SF
n_os = int(fs//B)
N_os = N*n_os
if nonIdealRandom:
cfo, sto, sfo, theta = LoRa_random_param(cfo_max = cfo, sto_max = sto, sfo_ppm_max =sfo_ppm, fs=125e3)
else :
sfo = sfo_ppm*fs/1e6
Ts = 1/fs
fs_sfo = fs + sfo
Ts_sfo = 1/fs_sfo
T_symbol = n_os*2**SF*Ts
T_symbol_sfo = n_os*2**SF*Ts_sfo
T_symbol_ratio = T_symbol_sfo/T_symbol
N_sample = len(symbols)*N_os
time_symbols = np.linspace(0,stop = N_sample*Ts_sfo,num = N_sample,endpoint=False)
init_timeVec = np.linspace(0,stop = N_sample*Ts_sfo,num = N_sample,endpoint=False)
received = np.zeros(N_sample , dtype=np.complex64)
upchirp_sto_sfo = np.zeros(N_sample , dtype=np.complex64)
ratio = fs/fs_sfo
increment = N_os*fs/fs_sfo
prev_index = 0
prev_value = 0
for s, symb in enumerate(symbols):
index_fold = int(np.floor( prev_value + (N-symb)*ratio*n_os ) )
prev_value = prev_value + increment
index = int(np.floor(prev_value))
time_symbols[prev_index : index_fold] = time_symbols[prev_index : index_fold] - s*T_symbol +(sto/B) + symb*Ts*n_os
time_symbols[index_fold : index] = time_symbols[index_fold : index] - s*T_symbol +(sto/B) + symb*Ts*n_os - T_symbol
upchirp_sto_sfo[prev_index : index] = A[s]*np.exp(2*1j*np.pi*B* ( (np.square(time_symbols[prev_index : index]))/(2*T_symbol) - 0.5*(time_symbols[prev_index : index]) ) , dtype=np.complex64)
prev_index = index
received[:] = upchirp_sto_sfo * np.exp(1j*theta) * np.exp(2*1j*np.pi*B* init_timeVec*(cfo)/N)
return received
###########################################################################
def LoRa_random_param(cfo_max = 0, sto_max = 0, sfo_ppm_max =0, fs=125e3):
cfo = ((cfo_max )*np.random.rand() )*np.sign(np.random.rand() - 0.5)
sto = ((sto_max )*np.random.rand() )*np.sign(np.random.rand() - 0.5)
sfo_tx = ((sfo_ppm_max )*np.random.rand() )*np.sign(np.random.rand() - 0.5)
sfo_rx = ((sfo_ppm_max )*np.random.rand() )*np.sign(np.random.rand() - 0.5)
sfo = (sfo_tx+sfo_rx)*fs/1e6
theta = 2*np.pi*np.random.rand(1)
return cfo, sto, sfo, theta
def LoRa_refchirp(SF=7,fs=125e3,B=125e3):
N = 2**SF
n_os = int(fs//B)
N_os = N*n_os
Ts = 1/fs
T_symbol = n_os*2**SF*Ts
received = np.zeros(N_os , dtype=np.complex64)
time_refchirp = np.linspace(0,stop = T_symbol,num = N_os,endpoint=False)
upchirp = np.exp(2*1j*np.pi*B* ( (np.square(time_refchirp))/(2*T_symbol) - 0.5*time_refchirp ))
return upchirp
###########################################################################
def create_noise(lenVec = 0, var_noise = 0):
noise = np.random.normal(0, np.sqrt(var_noise*2.0)/2.0, size=(lenVec, 2))
noisy_signal = noise[:,0] + 1j* noise[:,1]
return noisy_signal
def signal_power( var_noise=0, snr = []):
signal_pow = np.zeros((len(snr)))
for i,SNR in enumerate(snr):
snr_linear = 10 ** (snr[i] / 10)
signal_pow[i] = var_noise * snr_linear
return signal_pow
###########################################################################
def create_symbol_and_noise(snr_test, Ntrials_atSNR, noise_std, A, add_noise, fix_noise, noise_only, SF, NSF, N, Q, Ncore, Qcore, offset, fs, B, nonIdealRandom, theta, cfo, sto, sfo_ppm, symbolType="random"):
frac_fac = 2**Q
frac_fac_core = 2**Qcore
noise_var = 2*(noise_std**2)
Nsnr_test = len(snr_test)
snr = np.repeat(snr_test,repeats= Ntrials_atSNR)
Nsymbols = len(snr)
if isinstance(symbolType, int) :
symbols = np.ones(Nsymbols)*(symbolType%NSF)
elif symbolType == "upchirp":
symbols = np.zeros(Nsymbols)
else:
symbols = np.random.randint(0,NSF,Nsymbols) #32*np.ones(Nsymbols)
reshaped_symbols = np.reshape(symbols,newshape=(Nsnr_test,Ntrials_atSNR)).astype(int)
print("%i different SNRs will be tested"%Nsnr_test)
print("%i symbols will be demodulated"%len(symbols))
noise = create_noise(lenVec=Nsymbols*NSF*int(fs//B),var_noise=noise_var)
sig_pow = signal_power(var_noise=noise_var,snr=snr_test)
A_test = np.sqrt(sig_pow)
if fix_noise:
A = np.repeat(A_test,repeats= Ntrials_atSNR)
else:
A = A*np.ones(Nsymbols)
in_array= LoRa_modulate(symbols = symbols,SF=SF,fs=fs,B=B, nonIdealRandom=nonIdealRandom, theta=theta, cfo =cfo, sto =sto, sfo_ppm =sfo_ppm, A=A)
print(len(in_array))
if add_noise:
in_array = in_array +noise
if noise_only:
in_array = noise
downchirp = np.conjugate(LoRa_refchirp(SF=SF,fs=B,B=B))
initial_noise_var= np.var(noise)
print("Initial noise variance : %.3f "%initial_noise_var)
print("Initial noise std (real or imag) : %.3f\n"%np.std(np.real(noise)))
for i in range(Nsnr_test):
data = in_array[(i*Ntrials_atSNR*NSF) : ((i+1)*Ntrials_atSNR*NSF)]
all_data = np.array([np.real(data),np.imag(data)]).flatten()
max_in_array = np.max(all_data)
ratio_clipped = (np.shape(np.where(np.abs(all_data) >= 1))[1])/len(all_data)
data_std = np.std(all_data)
print("SNR [dB] : %i \t Std : %.2f \t Max val : %.2f \t Ratio clipping [%%]: %.4f"%(snr_test[i], data_std, max_in_array,100*ratio_clipped))
return initial_noise_var, noise_var, symbols, reshaped_symbols,downchirp, in_array
###########################################################################
def create_pureSymbol(A_test, Ntrials_at_A, SF, NSF, N, Q, Ncore, Qcore, offset, fs, B, nonIdealRandom, theta, cfo, sto, sfo_ppm):
frac_fac = 2**Q
frac_fac_core = 2**Qcore
Ntest = len(A_test)
snr = np.repeat(A_test, repeats= Ntrials_at_A)
Nsymbols = len(snr)
symbols = np.random.randint(0,NSF,Nsymbols) #32*np.ones(Nsymbols)
reshaped_symbols = np.reshape(symbols,newshape=(Ntest,Ntrials_at_A)).astype(int)
print("%i different amplitude will be tested"%Ntest)
print("%i symbols will be demodulated"%len(symbols))
A = np.repeat(A_test,repeats= Ntrials_at_A)
in_array = LoRa_modulate(symbols = symbols,SF=SF,fs=fs,B=B, nonIdealRandom=nonIdealRandom, theta=theta, cfo =cfo, sto =sto, sfo_ppm =sfo_ppm, A=A)
downchirp = np.conjugate(LoRa_refchirp(SF=SF,fs=fs,B=B))
for i in range(Ntest):
data = in_array[(i*Ntrials_at_A*NSF) : ((i+1)*Ntrials_at_A*NSF)]
all_data = np.array([np.real(data),np.imag(data)]).flatten()
ratio_clipped = (np.shape(np.where(np.abs(all_data) >= 1))[1])/len(all_data)
print("Pow [dB] : %i \t Ratio clipping [%%]: %.4f"%(20*np.log10(A_test[i]), 100*ratio_clipped))
return symbols, reshaped_symbols,downchirp, in_array
###########################################################################
def create_pureNoise(std_test, Ntrials_atStd, SF, NSF, N, Q, Ncore, Qcore, offset,fs, B):
frac_fac = 2**Q
frac_fac_core = 2**Qcore
noise_var = 2*(std_test**2)
Nstd = len(std_test)
noiseVar_vec = np.repeat(noise_var,repeats= Ntrials_atStd)
print("%i different SNRs will be tested"%Nstd)
print("%i symbols will be demodulated"%len(noiseVar_vec))
in_array = np.zeros((Ntrials_atStd*len(std_test)*NSF), dtype=np.complex64)
for i in range(Nstd):
in_array[i*Ntrials_atStd*NSF : (i+1)*Ntrials_atStd*NSF] = create_noise(lenVec=Ntrials_atStd*NSF,var_noise=noise_var[i])
downchirp = np.conjugate(LoRa_refchirp(SF=SF,fs=fs,B=B))
for i in range(Nstd):
data = in_array[(i*Ntrials_atStd*NSF) : ((i+1)*Ntrials_atStd*NSF)]
all_data = np.array([np.real(data),np.imag(data)]).flatten()
ratio_clipped = (np.shape(np.where(np.abs(all_data) >= 1))[1])/len(all_data)
data_std = np.std(all_data)
print("Std fixed : %.4f \t Std obs. : %.4f \t Ratio clipping [%%]: %.4f"%(std_test[i], data_std,100*ratio_clipped))
return noise_var, downchirp, in_array
###########################################################################
def quantify_downchirp(SF=7,fs=125e3,B=125e3,Ncore=16,Qcore=15,path_rtl_file="", signedMag = True):
NSF = 2**SF
n = np.arange(NSF)
downchirp = np.conjugate(LoRa_refchirp(SF=SF,fs=fs,B=B))
if signedMag :
downc_scaled, to_plot, downc_core = quantif_signedMag(downchirp,Ncore,Qcore,clip=0.999999999)
rtl_format = "'h"
else :
downc_scaled, to_plot, downc_core = quantif_twoComplement(downchirp,Ncore,Qcore,clip=((2**Qcore - 1))/(2**Qcore),round =True)
rtl_format = "'sh"
re = "parameter ["+str(Ncore-1)+":0] DOWNCHIRP_ARRAY_R [0:"+str((NSF//2)-1)+"] = {"
im = "parameter ["+str(Ncore-1)+":0] DOWNCHIRP_ARRAY_I [0:"+str((NSF//2)-1)+"] = {"
format = "%."+str(Ncore//4)+"x"
for i in range((NSF//2)-1):
re += (str(Ncore)+rtl_format+format+",")%(downc_core[2*i])
im += (str(Ncore)+rtl_format+format+",")%(downc_core[2*i+1])
re += (str(Ncore)+rtl_format+format+"};")%(downc_core[2*((NSF//2)-1)])
im += (str(Ncore)+rtl_format+format+"};")%(downc_core[2*((NSF//2)-1)+1])
with open(path_rtl_file, "w") as text_file:
text_file.write("%s\n\n" % re)
text_file.write("%s" % im)
text_file.close()
return downc_scaled, downc_core
###########################################################################
def quantify_downchirps(SF_min=7, SF_max=7,max_declared_SF=12,fs=125e3,B=125e3,Ncore=16,Qcore=15,path_rtl_file="", signedMag = True):
SFs = np.arange(SF_min,max_declared_SF+1,1)
with open(path_rtl_file, "w") as text_file:
for SF in SFs:
NSF = 2**SF
n = np.arange(NSF)
downchirp = np.conjugate(LoRa_refchirp(SF=SF,fs=fs,B=B))
if SF <= SF_max:
if signedMag :
downc_scaled, to_plot, downc_core = quantif_signedMag(downchirp,Ncore,Qcore,clip=0.999999999)
rtl_format = "'h"
else :
downc_scaled, to_plot, downc_core = quantif_twoComplement(downchirp,Ncore,Qcore,clip=((2**Qcore - 1))/(2**Qcore),round =True)
rtl_format = "'sh"
re = "parameter ["+str(Ncore-1)+":0] DOWNCHIRP_SF"+str(SF)+"_R [0:"+str((NSF//2)-1)+"] = {"
im = "parameter ["+str(Ncore-1)+":0] DOWNCHIRP_SF"+str(SF)+"_I [0:"+str((NSF//2)-1)+"] = {"
format = "%."+str(Ncore//4)+"x"
for i in range((NSF//2)-1):
re += (str(Ncore)+rtl_format+format+",")%(downc_core[2*i])
im += (str(Ncore)+rtl_format+format+",")%(downc_core[2*i+1])
re += (str(Ncore)+rtl_format+format+"};")%(downc_core[2*((NSF//2)-1)])
im += (str(Ncore)+rtl_format+format+"};")%(downc_core[2*((NSF//2)-1)+1])
text_file.write("%s\n" % re)
text_file.write("%s\n\n" % im)
else :
re = "parameter DOWNCHIRP_SF"+str(SF)+"_R = 0;"
im = "parameter DOWNCHIRP_SF"+str(SF)+"_I = 0;"
text_file.write("%s\n" % re)
text_file.write("%s\n\n" % im)
text_file.close()
return downc_scaled, downc_core
###########################################################################
def quantify_newTF(NFFT=128,Ncore=16,Qcore=15,path_rtl_file="", signedMag=True,divComplexCircle = 2,endPoint=False, clocked=False):
pointFFT=int(NFFT/divComplexCircle) + endPoint
n = np.arange(pointFFT)
TF = np.exp(-1j*2*np.pi*n/(NFFT))
if signedMag :
TF_scaled, to_plot, TF_core = quantif_signedMag(TF,Ncore,Qcore,clip = 0.99999999)
rtl_format = "'h"
else :
TF_scaled, to_plot, TF_core = quantif_twoComplement(TF,Ncore,Qcore,clip=((2**Qcore - 1))/(2**Qcore),round =True)
rtl_format = "'sh"
NbitsAddr = int(np.ceil(np.log2(pointFFT)))
print(NbitsAddr)
format_addr = "%."+str(int(np.ceil(NbitsAddr/4)))+"x"
buffer = ""
format = "%."+str((2*Ncore)//4)+"x"
for i in range(pointFFT):
buffer += ("\t\t\t"+str(NbitsAddr)+"'h"+format_addr+": DOUT <= "+str(2*Ncore)+rtl_format+format+";\n")%(int(n[i]),(2**Ncore)*TF_core[2*i] +TF_core[2*i+1])
if clocked:
with open(path_rtl_file, "w") as text_file:
text_file.write("module TF_newRom (\n")
text_file.write(" input [%i-1:0] ADDR,\n"%NbitsAddr)
text_file.write(" output reg [%i-1:0] DOUT, \n"%(2*Ncore))
text_file.write(" input CLK, \n")
text_file.write(" input RST \n")
text_file.write(");\n\n")
text_file.write("always @(posedge CLK or posedge RST) begin\n")
text_file.write("\tif (RST)\n")
text_file.write("\t\tDOUT <= 'b0;\n")
text_file.write("\telse\n")
text_file.write("\t\tcase (ADDR)\n")
text_file.write("%s" % buffer)
#text_file.write(("\t\tdefault : DOUT <= "+str(2*Ncore)+rtl_format+format+";\n")%(0))
text_file.write("\tendcase\n")
text_file.write("end\n")
text_file.write("endmodule\n")
text_file.close()
return TF_scaled, TF_core
else:
with open(path_rtl_file, "w") as text_file:
text_file.write("module TF_newRom (\n")
text_file.write(" input [%i-1:0] ADDR,\n"%NbitsAddr)
text_file.write(" output reg [%i-1:0] DOUT, \n"%(2*Ncore))
text_file.write(" input CLK, \n")
text_file.write(" input RST \n")
text_file.write(");\n\n")
text_file.write("always @(*) begin\n")
text_file.write("\tcase (ADDR)\n")
text_file.write("%s" % buffer)
text_file.write(("\tdefault : DOUT <= "+str(2*Ncore)+rtl_format+format+";\n")%(0))
text_file.write("\tendcase\n")
text_file.write("end\n")
text_file.write("endmodule\n")
text_file.close()
return TF_scaled, TF_core
###########################################################################
def quantify_twiddle_factor(NFFT=128,Ncore=16,Qcore=15,path_rtl_file="", signedMag=True,divComplexCircle = 2,endPoint=False):
pointFFT=int(NFFT/divComplexCircle) + endPoint
n = np.arange(pointFFT)
TF = np.exp(-1j*2*np.pi*n/(NFFT))
if signedMag :
TF_scaled, to_plot, TF_core = quantif_signedMag(TF,Ncore,Qcore,clip = 0.99999999)
rtl_format = "'h"
else :
TF_scaled, to_plot, TF_core = quantif_twoComplement(TF,Ncore,Qcore,clip=((2**Qcore - 1))/(2**Qcore),round =True)
rtl_format = "'sh"
re = "parameter ["+str(Ncore-1)+":0] TF_ARRAY_R [0:"+str(pointFFT-1)+"] = {"
im = "parameter ["+str(Ncore-1)+":0] TF_ARRAY_I [0:"+str(pointFFT-1)+"] = {"
format = "%."+str(Ncore//4)+"x"
for i in range(pointFFT-1):
re += (str(Ncore)+rtl_format+format+",")%(TF_core[2*i])
im += (str(Ncore)+rtl_format+format+",")%(TF_core[2*i+1])
re += (str(Ncore)+rtl_format+format+"};")%(TF_core[2*(pointFFT-1)])
im += (str(Ncore)+rtl_format+format+"};")%(TF_core[2*(pointFFT-1)+1])
with open(path_rtl_file, "w") as text_file:
text_file.write("%s\n\n" % re)
text_file.write("%s" % im)
text_file.close()
return TF_scaled, TF_core
###########################################################################
def quantify_fullTwiddle_factor(NFFT=128,Ncore=16,Qcore=15,path_rtl_file="", signedMag=True):
n = np.arange(int(NFFT))
TF = np.exp(-1j*2*np.pi*n/(NFFT))
if signedMag :
TF_scaled, to_plot, TF_core = quantif_signedMag(TF,Ncore,Qcore,clip = 0.99999999)
rtl_format = "'h"
else :
TF_scaled, to_plot, TF_core = quantif_twoComplement(TF,Ncore,Qcore,clip=((2**Qcore - 1))/(2**Qcore),round =True)
rtl_format = "'sh"
re = "parameter ["+str(Ncore-1)+":0] TF_ARRAY_R [0:"+str(NFFT-1)+"] = {"
im = "parameter ["+str(Ncore-1)+":0] TF_ARRAY_I [0:"+str(NFFT-1)+"] = {"
format = "%."+str(Ncore//4)+"x"
for i in range(NFFT-1):
re += (str(Ncore)+rtl_format+format+",")%(TF_core[2*i])
im += (str(Ncore)+rtl_format+format+",")%(TF_core[2*i+1])
re += (str(Ncore)+rtl_format+format+"};")%(TF_core[2*(NFFT-1)])
im += (str(Ncore)+rtl_format+format+"};")%(TF_core[2*(NFFT-1)+1])
with open(path_rtl_file, "w") as text_file:
text_file.write("%s\n\n" % re)
text_file.write("%s" % im)
text_file.close()
return TF_scaled, TF_core
###########################################################################
def launch_simu(fs=125e3,B=125e3,nonIdealRandom=False,theta=0,cfo=0,sto=0,sfo_ppm=0,SF_MAX=12,SF_MIN=6, SF= 6,add_noise = True,fix_noise = True,noise_only= False,
N= 12,Q =11,Ncore= 16,Qcore= 15,Nc=12,Qc=11,offset= 0,
noise_std=0.001, A=0.05,Ntrials_atSNR=2,snr_start=0,snr_stop=5,snr_step = 1,
path_workspace="",path_rtl="",dir_res_name="",
signedMag=1):
print("Are we using signed magnitude? %i"%signedMag)
################################################################
#Prepare
################################################################
Ts = 1 /fs
NSF = 2**SF
halfNSF = int(NSF/2)
snr_test = np.arange(start =snr_start,
stop =snr_stop,
step =snr_step)
################################################################
#INPUT DATA GENERATION
################################################################
initial_noise_var, noise_var, symbols, reshaped_symbols,downchirp, in_array= create_symbol_and_noise(snr_test, Ntrials_atSNR, noise_std, A, add_noise, fix_noise, noise_only, SF, NSF, N, Q, Ncore, Qcore, offset, fs, B, nonIdealRandom, theta, cfo, sto, sfo_ppm)
t= np.linspace(start = 0, stop=NSF*Ts, num=NSF, endpoint=False)
if signedMag:
in_scaled, to_plot, quantified = quantif_signedMag(in_array,N,Q)
else:
in_scaled, to_plot, quantified = quantif_twoComplement(in_array,N,Q)
format = "%."+str(N//4)+"x"
np.savetxt(path_workspace+"/testvec_files/wuRx_inputs.txt",quantified,fmt=format)
if signedMag:
inputs_real_mixed = read_IQ_outputs_interleaved(path_workspace+"/testvec_files/wuRx_inputs.txt",N)
else:
inputs_real_mixed = read_IQ_outputs_interleaved_2s(path_workspace+"/testvec_files/wuRx_inputs.txt",N)
################################################################
#DOWNCHIRP AND TF
################################################################
TF_scaled, TF_toCore = quantify_twiddle_factor(NFFT=NSF, Ncore=Nc,Qcore=Qc,path_rtl_file=path_rtl+"/fft_lib/TF_sim.inc", signedMag=signedMag)
quantify_twiddle_factor(NFFT=2**SF_MAX,Ncore=Nc,Qcore=Qc,path_rtl_file=path_rtl+"/fft_lib/TF_SFs_sim.inc", signedMag=signedMag)
downchirp_quant, downc_core = quantify_downchirp (SF=SF, fs=fs,B=B,Ncore=Nc,Qcore=Qc,path_rtl_file=path_rtl+"/dechirping/downchirp_sim.inc" , signedMag=signedMag)
quantify_downchirps(SF_min=SF_MIN,SF_max=SF_MAX,fs=fs,B=B,Ncore=Nc,Qcore=Qc,path_rtl_file=path_rtl+"/dechirping/downchirp_SFs_sim.inc" , signedMag=signedMag)
TF_full_quant = np.zeros(NSF, dtype =np.complex_)
TF_full_quant[0 :NSF//2] = TF_scaled[0:NSF//2]
TF_full_quant[NSF//2:NSF ] = -TF_scaled[0:NSF//2]
################################################################
#SIMULATION RESULTS
################################################################
# WITHOUT QUANTIZATION
simu_reshaped = np.reshape(in_array, newshape=(len(snr_test),Ntrials_atSNR,NSF))
simu_dechirped = simu_reshaped*downchirp
simu_FFT = fft(simu_dechirped,axis=2)
sim_PSD_woQ = np.abs(simu_FFT)
# WITH QUANTIZATION
in_reshaped = np.reshape(inputs_real_mixed, newshape=(len(snr_test),Ntrials_atSNR,NSF))
signal_dechirped = in_reshaped*downchirp_quant
signal_dechirped_quant = np.floor(np.real(signal_dechirped)/(2**Qc )) + 1j*np.floor(np.imag(signal_dechirped)/(2**Qc ))
signalFFT = fft((signal_dechirped_quant) /(2**Q ))
sim_PSD_wQ = np.abs(signalFFT)
signalHwPSD_R2p3 = np.zeros((len(snr_test),Ntrials_atSNR,NSF))
for i in range(0,len(snr_test),1):
for j in range(0,Ntrials_atSNR,1):
signalHwPSD_R2p3[i,j,:] = abs(dif_fft_2p3(signal_dechirped_quant[i,j,:] , TF_full_quant/(2**Qc), int(np.log2(NSF)) , rescaling = True, Qdata=Qcore)) /(2**(Q))
Dic_to_save = {"N": N, "Q":Q, "Ncore": Ncore, "Qcore": Qcore, "Nc": Nc, "Qc": Qc, "Offset": offset, "SF":SF,
"Ntrials" : Ntrials_atSNR, "SNR_tested": snr_test,
"res_woQ": sim_PSD_woQ, "res_wQ": sim_PSD_wQ, "res_emul": signalHwPSD_R2p3 }
scipy.io.savemat(path_workspace+"/"+dir_res_name+"/"+"N"+str(N)+"C"+str(Ncore)+"c"+str(Nc)+"_SF"+str(SF)+".mat", mdict=Dic_to_save)
return 1
def add_HDL_res(path_workspace="",dir_res_name="",
N= 12,Q =11,Ncore= 16,Qcore= 15,Nc=12,Qc=11,offset= 0,SF = 6, signedMag = True):
resSim = scipy.io.loadmat(path_workspace+"/"+dir_res_name+"/"+"N"+str(N)+"C"+str(Ncore)+"c"+str(Nc)+"_SF"+str(SF)+".mat")
snr_test = resSim["SNR_tested"][0]
Ntrials_atSNR = resSim["Ntrials"][0][0]
sim_PSD_woQ = resSim["res_woQ"] [:]
sim_PSD_wQ = resSim["res_wQ"] [:]
signalHwPSD_R2p3 = resSim["res_emul"] [:]
NSF = 2**SF
outputs_core_mag = read_Mag_outputs(path_workspace+"/testvec_files/wuRx_outputs.txt",2*Ncore)
outputs_core_mag = bitreverse_reorder(outputs_core_mag,2**SF)
outputs_core_mag = np.sqrt(np.reshape(outputs_core_mag, newshape=(len(snr_test),Ntrials_atSNR,NSF))/(2**(Q+offset+offset+Q)))
rescale = read_rescaling(path_workspace+"/testvec_files/wuRx_rescale.txt")
rescale = np.reshape(rescale , newshape=(len(snr_test),Ntrials_atSNR))
outputs_resc = np.zeros(shape = np.shape(outputs_core_mag), dtype=np.float64)
if signedMag :
factor_scale = 1
else:
factor_scale = np.sqrt(2)
for i in range(0,len(snr_test)):
for j in range(0,Ntrials_atSNR):
outputs_resc[i,j,:] = np.abs(outputs_core_mag[i,j,:])*(factor_scale* 2**rescale[i,j])
Dic_to_save = {"N": N, "Q":Q, "Ncore": Ncore, "Qcore": Qcore, "Nc": Nc, "Qc": Qc, "Offset": offset, "SF":SF,
"Ntrials" : Ntrials_atSNR, "SNR_tested": snr_test,
"res_woQ": sim_PSD_woQ, "res_wQ": sim_PSD_wQ , "res_emul": signalHwPSD_R2p3 , "res_HDL": outputs_resc }
scipy.io.savemat(path_workspace+"/"+dir_res_name+"/Res_"+"N"+str(N)+"C"+str(Ncore)+"c"+str(Nc)+"_SF"+str(SF)+".mat", mdict=Dic_to_save)
return 1
################################################################
#Plot Freq rep
################################################################
def plot_time(sig_list, label_list, fs):
plt.figure()
for sig,label in zip(sig_list, label_list):
plt.plot(np.arange(0,sig.shape[-1])/fs,sig,'.-',label=label)
plt.xlabel("Time [s]")
plt.ylabel("Amplitude")
plt.legend()
plt.draw()
def plot_freq(sig_list, label_list, fs, os=1, f_lim=None ):
f,ax = plt.subplots(2,1,sharex=True)
for sig,label in zip(sig_list, label_list):
freq = fftshift(fftfreq(sig.shape[-1]*os))*fs/1000
H = fftshift(fft(sig,n=sig.shape[-1]*os))
ax[0].plot(freq,20*np.log10(np.abs(H)),label=label)
ax[1].plot(freq,np.rad2deg(np.angle(H)),label=label)
ax[0].set_ylim([-100, 10])
ax[0].set_ylabel("Modulus (dB)")
ax[0].set_ylim(-120, 20)
if f_lim is not None:
ax[0].set_xlim(-f_lim/1000, f_lim/1000)
ax[1].set_xlim(-f_lim/1000, f_lim/1000)
ax[1].set_ylabel("Phase (deg)")
ax[1].set_xlabel("Frequency [kHz]")
plt.legend()
plt.draw()
################################################################
#HARDWARE EMULATION OF FFT
################################################################
################################################################
#HARDWARE EMULATION OF FFT
################################################################
def bracewell_buneman(xarray, length, log2length):
'''
bracewell-buneman bit reversal function
inputs: xarray is array; length is array length; log2length=log2(length).
output: bit reversed array xarray.
'''
muplus = int((log2length+1)//2)
mvar = 1
reverse = np.zeros(length, dtype = int)
upper_range = muplus+1
for _ in np.arange(1, upper_range):
for kvar in np.arange(0, mvar):
tvar = 2*reverse[kvar]
reverse[kvar] = tvar
reverse[kvar+mvar] = tvar+1
mvar = mvar+mvar
if (log2length & 0x01):
mvar = mvar//2
for qvar in np.arange(1, mvar):
nprime = qvar-mvar
rprimeprime = reverse[qvar]*mvar
for pvar in np.arange(0, reverse[qvar]):
nprime = nprime+mvar
rprime = rprimeprime+reverse[pvar]
temp = xarray[nprime]
xarray[nprime] = xarray[rprime]
xarray[rprime] = temp
return xarray
def dif_fft0 (xarray, twiddle, log2length,rescaling = False, Qdata = 12, verbose=False, rounding=False):
'''
radix-2 dif fft
'''
if rounding:
scale_f =lambda x:np.round(x)
else:
scale_f =lambda x:np.floor(x)
xarray = xarray.astype(np.complex_)
b_p = 1
rescale = 0
nvar_p = xarray.size
twiddle_step_size = 1
for _ in range(0, log2length): # pass loop
nvar_pp = int(nvar_p//2)
base_e = 0
for _ in range(0, b_p): # block loop
base_o = base_e+nvar_pp
for nvar in range(0, nvar_pp): # butterfly loop
evar = xarray[base_e+nvar]+xarray[base_o+nvar]
twiddle_factor = nvar*twiddle_step_size
ovar = (xarray[base_e+nvar] \
-xarray[base_o+nvar])*twiddle[twiddle_factor]
xarray[base_e+nvar] = evar
xarray[base_o+nvar] = ovar
base_e = base_e+nvar_p
if rescaling:
scaling_val = max(0, np.ceil( np.log2( max(np.max(np.abs(np.real(xarray[:]))), np.max(np.abs(np.imag(xarray[:]))))/(2**Qdata) ) ))
rescale = rescale + scaling_val
xarray[:] = ( scale_f(np.real(xarray[:])/ (2**scaling_val) ) + 1j*scale_f(np.imag(xarray[:])/ (2**scaling_val) ) )
b_p = b_p*2
nvar_p = nvar_p//2
twiddle_step_size = 2*twiddle_step_size
if rescaling:
if verbose:
print("Rescale required : %i"%(2**rescale))
xarray = bracewell_buneman(xarray*(2**rescale), xarray.size, log2length)
return xarray
def bf16_R2p4_wTF(arr16, TF, n,rescaling,scale_f):
c1 = 0.70703125 - 1j*0.70703125
c2 =-0.70703125 - 1j*0.70703125
c3 = 0.923828125- 1j*0.3828125
c4 = 0.3828125 - 1j*0.923828125
c5 =-0.3828125 - 1j*0.923828125
c6 =-0.923828125- 1j*0.3828125 #
TF0 = np.array([ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,-1j,-1j,-1j,-1j])
TF1 = np.array([ 1, 1, 1, 1, 1, 1,-1j,-1j, 1, 1, c1, c1, 1, 1, c2, c2])
TF2 = np.array([ 1, 1, 1,-1j, 1, c1, 1, c2, 1, c3, 1, c5, 1, c4, 1, c6])
TFi = np.array([ 0, 8, 4, 12, 2, 10, 6, 14, 1, 9, 5, 13, 3, 11, 7, 15])
base = 0
for i in range(8):
a = arr16[i]
b = arr16[i+8]
arr16[i] = (a+b) *TF0[i]
arr16[i+8] = (a-b) *TF0[i+8]
if rescaling:
arr16 = scale_f(np.real(arr16))+ 1j*scale_f(np.imag(arr16))
for i in range(2):
for j in range(4):
a = arr16[i*8 + j]
b = arr16[i*8 + j +4]
arr16[i*8 + j] = (a+b) *TF1[i*8 + j]
arr16[i*8 + j +4] = (a-b) *TF1[i*8 + j + 4]
if rescaling:
arr16 = scale_f(np.real(arr16))+ 1j*scale_f(np.imag(arr16))
for i in range(4):
for j in range(2):
a = arr16[i*4 + j]
b = arr16[i*4 + j +2]
arr16[i*4 + j] = (a+b) *TF2[i*4 + j]
arr16[i*4 + j +2] = (a-b) *TF2[i*4 + j + 2]
if rescaling:
arr16 = scale_f(np.real(arr16))+ 1j*scale_f(np.imag(arr16))
for i in range(8):
a = arr16[i*2]
b = arr16[i*2+1]
arr16[i*2] = (a+b) *TF[n*TFi[i*2]]
arr16[i*2+1] = (a-b) *TF[n*TFi[i*2+1]]
if rescaling:
arr16 = scale_f(np.real(arr16))+ 1j*scale_f(np.imag(arr16))
return arr16
def bf8_R2p3_wTF(arr8, TF, n,rescaling,scale_f):
c1 = 0.70703125 - 1j*0.70703125
c2 =-0.70703125 - 1j*0.70703125
TF0 = np.array([ 1, 1, 1, 1, 1, 1, -1j,-1j])
TF1 = np.array([ 1, 1, 1,-1j, 1, c1, 1, c2])
TFi = np.array([ 0, 4, 2, 6, 1, 5, 3, 7])
base = 0
for i in range(4):
a = arr8[i]
b = arr8[i+4]
arr8[i] = (a+b) *TF0[i]
arr8[i+4] = (a-b) *TF0[i+4]
if rescaling:
arr8 = scale_f(np.real(arr8))+ 1j*scale_f(np.imag(arr8))
for i in range(2):
for j in range(2):
a = arr8[i*4 + j]
b = arr8[i*4 + j +2]
arr8[i*4 + j] = (a+b) *TF1[i*4 + j]
arr8[i*4 + j +2] = (a-b) *TF1[i*4 + j + 2]
if rescaling:
arr8 = scale_f(np.real(arr8))+ 1j*scale_f(np.imag(arr8))
for i in range(4):
a = arr8[i*2]
b = arr8[i*2+1]
arr8[i*2] = (a+b) *TF[n*TFi[i*2]]
arr8[i*2+1] = (a-b) *TF[n*TFi[i*2+1]]
if rescaling:
arr8 = scale_f(np.real(arr8))+ 1j*scale_f(np.imag(arr8))
return arr8
def bf4_R2p2_wTF(arr4, TF, n,rescaling,scale_f):
TF0 = np.array([ 1, 1, 1,-1j])
TFi = np.array([ 0, 2, 1, 3])
base = 0
for i in range(2):
a = arr4[i]
b = arr4[i+2]
arr4[i] = (a+b) *TF0[i]
arr4[i+2] = (a-b) *TF0[i+2]
if rescaling:
arr4 = scale_f(np.real(arr4))+ 1j*scale_f(np.imag(arr4))
for i in range(2):
a = arr4[i*2]
b = arr4[i*2+1]
arr4[i*2] = (a+b) *TF[n*TFi[i*2]]
arr4[i*2+1] = (a-b) *TF[n*TFi[i*2+1]]
if rescaling:
arr4 = scale_f(np.real(arr4))+ 1j*scale_f(np.imag(arr4))
return arr4
def bf2_wTF(arr2, TF, n,rescaling,scale_f):
a = arr2[0]
b = arr2[1]
arr2[0] = (a + b)
arr2[1] = (a - b) *TF[n]
if rescaling:
arr2 = scale_f(np.real(arr2))+ 1j*scale_f(np.imag(arr2))
return arr2
def dif_fft_2pk (xarray, twiddle, log2length,P = 2,rescaling = False, Qdata = 12, rounding=False, write_res_stg = False, path_res_stg="./path_res_stg", verbose=False):
'''
radix-2 dif fft
'''
if rounding:
scale_f =lambda x:np.round(x)
else:
scale_f =lambda x:np.floor(x)
'''
radix-2 dif fft
'''
remaining_stages = log2length
nstg_perf = P
radix = int(2**P)
perf_stg = P
rem_2stage= log2length
NFFT = 2**log2length
xarray = xarray.astype(np.complex_)
b_p = 1
rescale = 0
nvar_p = xarray.size
twiddle_step_size = 1
if write_res_stg:
text_file = open(path_res_stg, "w")
while (rem_2stage != 0):
#print("Perf stage %i" %perf_stg)
nvar_pp = int(nvar_p//radix)
base_e = 0
rem_2stage = rem_2stage - perf_stg
if rem_2stage ==0 :
twiddle = np.ones(NFFT//2)
for b in range(0, b_p): # block loop
for nvar in range(0, nvar_pp): # butterfly loop
base = base_e + nvar
if perf_stg == 4:
index_data = np.arange(start=base, stop=base+nvar_pp*(radix), step=nvar_pp)
xarray[index_data] = bf16_R2p4_wTF(arr16= xarray[index_data],TF=twiddle,n=nvar*b_p, rescaling=rescaling,scale_f=scale_f)
elif perf_stg == 3:
index_data = np.arange(start=base, stop=base+nvar_pp*(radix), step=nvar_pp)
xarray[index_data] = bf8_R2p3_wTF(arr8= xarray[index_data],TF=twiddle,n=nvar*b_p,rescaling=rescaling, scale_f=scale_f)
elif perf_stg == 2:
index_data = np.arange(start=base, stop=base+nvar_pp*(radix), step=nvar_pp)
xarray[index_data] = bf4_R2p2_wTF(arr4= xarray[index_data],TF=twiddle,n=nvar*b_p,rescaling=rescaling, scale_f=scale_f)
elif perf_stg == 1:
index_data = np.arange(start=base, stop=base+nvar_pp*(radix), step=nvar_pp)
xarray[index_data] = bf2_wTF(arr2= xarray[index_data],TF=twiddle,n=nvar*b_p, rescaling=rescaling,scale_f=scale_f)
if write_res_stg:
for i in index_data:
text_file.write("%10.0f \t %10.0f\n" %((xarray[i]).real,(xarray[i]).imag))
base_e = base_e+nvar_p
if rescaling:
scaling_val = max(0, np.ceil( np.log2( max(np.max(np.abs(np.real(xarray[:]))), np.max(np.abs(np.imag(xarray[:]))))/(2**Qdata) ) ))
rescale = rescale + scaling_val
xarray[:] = (scale_f(np.real(xarray[:])/ (2**scaling_val))) + 1j*(scale_f(np.imag(xarray[:])/ (2**scaling_val)))
b_p = b_p*radix
nvar_p = nvar_p//radix
perf_stg = min(P,rem_2stage)
radix = int(2**perf_stg)
if write_res_stg:
text_file.write("############# REM STAGE %i ###########\n"%rem_2stage)
if rescaling:
if verbose:
print("Rescale required : %i"%(2**rescale))
xarray = bracewell_buneman(xarray*(2**rescale), xarray.size, log2length)
return xarray
def arm_cfft_emul (xarray, twiddle, log2length, Qdata = 12, rounding=False,write_res_stg = False, path_res_stg="./path_res_stg", verbose=False):
'''
radix-2 dif fft
'''
if rounding:
scale_f =lambda x:np.round(x)
else:
scale_f =lambda x:np.floor(x)
P = 2
remaining_stages = log2length
nstg_perf = P
radix = int(2**P)
perf_stg = P
rem_2stage= log2length
NFFT = 2**log2length
xarray = xarray.astype(np.complex_)
b_p = 1
rescale = 0
nvar_p = xarray.size
twiddle_step_size = 1
scale = 4
if write_res_stg:
text_file = open(path_res_stg, "w")
xarray[:] = (scale_f(np.real(xarray[:]))/ 2 )+ 1j*(scale_f(np.imag(xarray[:]))/ 2 )
while (rem_2stage != 0):
#print("Perf stage %i" %perf_stg)
nvar_pp = int(nvar_p//radix)
base_e = 0
rem_2stage = rem_2stage - perf_stg
if rem_2stage ==0 :
scale = perf_stg
twiddle = np.ones(NFFT//2)
for b in range(0, b_p): # block loop
for nvar in range(0, nvar_pp): # butterfly loop
base = base_e + nvar
if perf_stg == 2:
index_data = np.arange(start=base, stop=base+nvar_pp*(radix), step=nvar_pp)
xarray[index_data] = bf4_R2p2_wTF(arr4= xarray[index_data],TF=twiddle,n=nvar*b_p,rescaling=True,scale_f=scale_f)
elif perf_stg == 1:
index_data = np.arange(start=base, stop=base+nvar_pp*(radix), step=nvar_pp)
xarray[index_data] = bf2_wTF(arr2= xarray[index_data],TF=twiddle,n=nvar*b_p,rescaling=True,scale_f=scale_f)
if write_res_stg:
for i in index_data:
text_file.write("%10.0f \t %10.0f\n" %((xarray[i]).real,(xarray[i]).imag))
base_e = base_e+nvar_p
xarray[:] = (scale_f(np.real(xarray[:])/ scale )) + 1j*(scale_f(np.imag(xarray[:])/ scale) )
b_p = b_p*radix
nvar_p = nvar_p//radix
perf_stg = min(P,rem_2stage)
radix = int(2**perf_stg)
if write_res_stg:
text_file.write("############# REM STAGE %i ###########\n"%rem_2stage)
xarray = bracewell_buneman(xarray, xarray.size, log2length)
return xarray