UOMOP
LPDC (n, k) comparison 본문
import cv2
import math
import time
import random
import graycode
import numpy as np
from PIL import Image
from numpy import sqrt
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from pyldpc import make_ldpc, encode, decode, get_message
class For_Image:
def image2bit(path):
gray_bit = list()
img_gray = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
img_gray_flat = img_gray.flatten()
for i in range(len(img_gray_flat)):
gray_bit.append(format(img_gray_flat[i], 'b').zfill(8))
gray_stream = ''.join(gray_bit)
gray_output = list()
for i in range(len(gray_stream)):
gray_output.append(int(gray_stream[i]))
return gray_output
def Restore_image(input_sig):
channel = input_sig
str_1 = list()
for i in range(len(channel)):
str_1.append(str(channel[i]))
pixel_1 = list()
for i in range(int(len(channel) / 8)):
pixel_1.append(int(''.join(str_1[8 * i:8 * (i + 1)]), 2))
array = np.reshape((np.array(pixel_1, dtype=np.uint8)), (int(sqrt(len(pixel_1))), -1))
return array
def Compare_SNR(origin_img_name, path, max_snr, mod_mode, chcode_mode, rayleigh):
title_list = list(["original"])
BER_list = list([0])
image_path = path
gray_stream = For_Image.image2bit(image_path)
for i in range(max_snr):
SNR = i + 1
title_input = "SNR=" + str(SNR)
title_list.append(title_input)
input_sig, output_sig, BER = forward(gray_stream, SNR=i, mod_mode=mod_mode, rayleigh=rayleigh,
chcode_mode=chcode_mode)
BER_list.append(BER)
image_arr = For_Image.Restore_image(input_sig=output_sig)
img = Image.fromarray(image_arr)
img.save(title_input + ".jpg")
print("Complete : SNR = " + str(SNR))
plt.figure(figsize=(22, 6))
plt.subplot(1, max_snr + 1, 1)
img = cv2.imread(origin_img_name, cv2.IMREAD_GRAYSCALE)
plt.imshow(img)
plt.title(title_list[0])
for i in range(max_snr):
plt.subplot(1, max_snr + 1, i + 2)
img = cv2.imread(title_list[i + 1] + ".jpg")
plt.imshow(img)
titles = title_list[i + 1] + " (BER=" + str(round(BER_list[i + 1], 4)) + ")"
plt.title(titles)
def Bit_Gen(how_many):
return (np.random.randint(2, size=how_many)).tolist()
def BER_Check(list_1, list_2):
if (len(list_1) != len(list_2)): print("Lengths of the two streams are different.")
error = 0
for i in range(len(list_1)):
if list_1[i] != list_2[i]:
error += 1
BER = error / len(list_1)
print("BER : {}".format(BER))
return BER
class Tx:
def Block_Encode(input_sig):
n = 6
k = 3
redund = int(len(input_sig)) % 3
padding_num_Block = k - redund
for i in range(padding_num_Block):
input_sig.append(0)
Gen_Mat = np.matrix([[1, 1, 0, 1, 0, 0],
[0, 1, 1, 0, 1, 0],
[1, 0, 1, 0, 0, 1]])
input_sig_list = list()
output_list = list()
result = list()
for i in range(int(len(input_sig) / k)):
for j in range(k):
input_sig_list.append(int(input_sig[i * k + j]))
for i in range(int(len(input_sig_list) / k)):
Data_Mat = np.matrix(input_sig_list[i * k: i * k + k])
a = (Data_Mat * Gen_Mat).A1
for j in range(len(a)):
if a[j] % 2 == 0:
a[j] = 0
elif (a[j] != 1) and (a[j] % 2 == 1):
a[j] = 1
output_list.append(a[j])
for i in range(padding_num_Block):
input_sig.pop()
return output_list, padding_num_Block
def LDPC_Encode(input_sig, n, d_c, d_v):
snr = 10000000000000
H, G = make_ldpc(n, d_v, d_c, systematic=True, sparse=True)
k = G.shape[1]
print("({}, {})LDPC Channel Coding".format(n, k))
redund = int(len(input_sig)) % k
padding_num_ldpc = k - redund
for i in range(padding_num_ldpc):
input_sig.append(0)
coded_list = list()
input_arr = np.array(input_sig)
for i in range(int(len(input_arr) / k)):
result = encode(G, input_arr[k * i: k * (i + 1)], snr).tolist()
for j in range(len(result)):
coded_list.append(result[j])
for i in range(len(coded_list)):
coded_list[i] = int(coded_list[i])
if coded_list[i] < 0:
coded_list[i] = int(0)
for i in range(padding_num_ldpc):
input_sig.pop()
return coded_list, padding_num_ldpc
def Modulation(input_sig, mode):
# mode : {M-ASK, M-FSK, M-PSK}
if mode[-3] == 'P':
M = int(mode[0])
k = int(math.log2(M))
redund = int(len(input_sig)) % k
padding_num_for_psk = k - redund
for i in range(padding_num_for_psk):
input_sig.append(0)
start_phase = np.pi * (1 / M)
phase_list = list([start_phase])
phase_step = start_phase * 2
for i in range(int(M / 2) - 1):
start_phase = phase_list[i] + phase_step
phase_list.append(start_phase)
phase_list_rev = list(reversed(phase_list))
for i in range(len(phase_list_rev)):
phase_list_rev[i] *= -1
phase_list = phase_list + phase_list_rev
result = list()
table_for_demod = list()
for i in range(M):
inphase = np.cos(phase_list[i])
quadrature = np.sin(phase_list[i])
table_for_demod.append(complex(inphase, quadrature))
for i in range(int(len(input_sig) / k)):
bin_sig = "".join(map(str, input_sig[k * i: k * i + k]))
# print('0b' + bin_sig)
sig = int('0b' + bin_sig, 2)
inphase = np.cos(phase_list[sig])
quadrature = np.sin(phase_list[sig])
result.append(complex(inphase, quadrature))
for i in range(padding_num_for_psk):
input_sig.pop()
return result, padding_num_for_psk
elif mode[-3] == 'Q':
if mode[0] == '1':
M = 16
elif mode[0] == '6':
M = 64
k = int(math.log2(M))
redund = int(len(input_sig)) % k
padding_num_for_qam = k - redund
for i in range(padding_num_for_qam):
input_sig.append(0)
list_gray = list()
list_bbb = list()
output_sig = list()
for i in range(int(sqrt(M))):
list_gray.append(format(graycode.tc_to_gray_code(i), 'b').zfill(int(k / 2)))
for i in range(int(sqrt(M))):
start_num = 1 - int(sqrt(M))
list_bbb.append(start_num + 2 * i)
for i in range(int(len(input_sig) / k)):
cut_sig = input_sig[k * i: k * (i + 1)]
inphase_val = str("")
quadrature_val = str("")
for j in range(int(len(cut_sig) / 2)):
inphase_val = inphase_val + str(cut_sig[j])
quadrature_val = quadrature_val + str(cut_sig[j + int(len(cut_sig) / 2)])
check_inphase = 0
check_quadrature = 0
for t in range(int(sqrt(M))):
if inphase_val == list_gray[t]:
inphase = list_bbb[t]
if quadrature_val == list_gray[t]:
quadrature = list_bbb[t]
output_sig.append(complex(inphase, quadrature))
for i in range(padding_num_for_qam):
input_sig.pop()
return output_sig, padding_num_for_qam
class Fading_Channel:
def rayleigh(input_sig):
output = list()
for i in range(len(input_sig)):
channel_coef = complex(random.gauss(0, 1), random.gauss(0, 1)) / sqrt(2)
output.append(abs(channel_coef) * input_sig[i])
return output
class Noise:
def AWGN(input_sig, SNR_dB):
target_snr_db = SNR_dB
sum_abs = 0
abs_list = list()
noise_out = list()
for i in range(len(input_sig)):
sum_abs = sum_abs + abs(input_sig[i])
for i in range(len(input_sig)):
abs_list.append(abs(input_sig[i]))
SNR_linear = 10 ** (SNR_dB / 10)
input_sig_power = np.mean(abs_list)
N0 = input_sig_power / SNR_linear
for i in range(len(input_sig)):
noise = sqrt(N0 / 2) * complex(random.gauss(0, 1), random.gauss(0, 1))
noise_out.append(input_sig[i] + noise)
return noise_out
class Rx:
def DeModulation(input_sig, mode, padding_num_for_demod):
if mode[2] == 'P':
M = int(mode[0])
k = int(math.log2(M))
bit_list = list()
start_phase = np.pi * (1 / M)
phase_list = list([start_phase])
phase_step = start_phase * 2
for i in range(int(M / 2) - 1):
start_phase = phase_list[i] + phase_step
phase_list.append(start_phase)
phase_list_rev = list(reversed(phase_list))
for i in range(len(phase_list_rev)):
phase_list_rev[i] *= -1
phase_list = phase_list + phase_list_rev
result = list()
table_for_demod = list()
for i in range(M):
inphase = np.cos(phase_list[i])
quadrature = np.sin(phase_list[i])
table_for_demod.append(complex(inphase, quadrature))
for i in range(len(input_sig)):
input_sig_x = float(input_sig[i].real)
input_sig_y = float(input_sig[i].imag)
distance = 1000
output = 0
for j in range(M):
if distance > sqrt(
(input_sig_x - table_for_demod[j].real) ** 2 + (
input_sig_y - table_for_demod[j].imag) ** 2):
distance = sqrt(
(input_sig_x - table_for_demod[j].real) ** 2 + (input_sig_y - table_for_demod[j].imag) ** 2)
output = j
bin_output = format(output, 'b').zfill(k)
for q in range(k):
bit_list.append(int(bin_output[q]))
for i in range(padding_num_for_demod):
bit_list.pop()
return bit_list
elif mode[-3] == 'Q':
if mode[0] == '1':
M = 16
elif mode[0] == '6':
M = 64
k = int(math.log2(M))
list_gray = list()
list_bbb = list()
output_sig = list()
all_candidate = list()
demodulated_list = list()
for i in range(int(sqrt(M))):
list_gray.append(format(graycode.tc_to_gray_code(i), 'b').zfill(int(k / 2)))
for i in range(int(sqrt(M))):
start_num = 1 - int(sqrt(M))
list_bbb.append(start_num + 2 * i)
for i in range(len(list_bbb)):
for j in range(len(list_bbb)):
all_candidate.append(complex(list_bbb[i], list_bbb[j]))
for i in range(len(input_sig)):
d_min = 100
save_index = 0
for j in range(len(all_candidate)):
if sqrt(((float(input_sig[i].real) - float(all_candidate[j].real)) ** 2) + (
(float(input_sig[i].imag) - float(all_candidate[j].imag)) ** 2)) < d_min:
d_min = sqrt(((float(input_sig[i].real) - float(all_candidate[j].real)) ** 2) + (
(float(input_sig[i].imag) - float(all_candidate[j].imag)) ** 2))
inphase_val = int(all_candidate[j].real)
quadrature_val = int(all_candidate[j].imag)
for k in range(len(list_bbb)):
if (inphase_val == list_bbb[k]):
output_first = list_gray[k]
if (quadrature_val == list_bbb[k]):
output_second = list_gray[k]
result = output_first + output_second
for q in range(len(result)):
demodulated_list.append(int(result[q]))
for i in range(padding_num_for_demod):
demodulated_list.pop()
return demodulated_list
def Block_Decode(input_sig, padding_num_Block):
n = 6
k = 3
Parity_Check_Mat = np.matrix([[1, 0, 0],
[0, 1, 0],
[0, 0, 1],
[1, 1, 0],
[0, 1, 1],
[1, 0, 1]])
syndrom_table = np.matrix([[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 1],
[0, 0, 0, 0, 1, 0],
[0, 0, 0, 1, 0, 0],
[0, 0, 1, 0, 0, 0],
[0, 1, 0, 0, 0, 0],
[1, 0, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 1]])
output_list = list()
for i in range(int(len(input_sig) / n)):
r_list = list()
cut_input_sig = input_sig[n * i: n * i + n]
for j in range(len(cut_input_sig)):
r_list.append(cut_input_sig[j])
r_Mat = np.matrix(r_list)
syndrom = r_Mat * Parity_Check_Mat
sum_syn = 0
for w in range(3):
if syndrom[0, w] % 2 == 0:
syndrom[0, w] = 0
elif (syndrom[0, w] != 1) and (syndrom[0, w] % 2 == 1):
syndrom[0, w] = 1
for q in range(3):
sum_syn += syndrom[0, q] * (2 ** q)
error_cor = np.matrix(cut_input_sig) + syndrom_table[sum_syn]
for p in range(k):
if error_cor[0, p + 3] % 2 == 0:
error_cor[0, p + 3] = 0
elif (error_cor[0, p + 3] != 1) and (error_cor[0, p + 3] % 2 == 1):
error_cor[0, p + 3] = 1
output_list.append(error_cor[0, p + 3])
for i in range(padding_num_Block):
output_list.pop()
return output_list
def LDPC_Decode(input_sig, n, d_c, d_v, padding_num_ldpc):
for i in range(len(input_sig)):
input_sig[i] = float(input_sig[i])
if input_sig[i] == 0:
input_sig[i] = float(-1)
snr = 10000000000000
H, G = make_ldpc(n, d_v, d_c, systematic=True, sparse=True)
k = G.shape[1]
input_sig = np.array(input_sig)
decoded_list = list()
for i in range(int(len(input_sig) / n)):
result = decode(H, input_sig[n * i: n * (i + 1)], snr)
restored_msg = get_message(G, result)
for j in range(len(restored_msg)):
decoded_list.append(restored_msg[j])
for i in range(padding_num_ldpc):
decoded_list.pop()
return decoded_list
def forward(input_sig, SNR, mod_mode, rayleigh, chcode_mode):
error = 0
if chcode_mode == "LDPC":
encoded_sig, padding_num_LDPC = Tx.LDPC_Encode(input_sig)
elif chcode_mode == "Block":
encoded_sig, padding_num_Block = Tx.Block_Encode(input_sig)
modulated_sig, padding_num_demod = Tx.Modulation(input_sig=encoded_sig, mode=mod_mode)
if rayleigh == "Yes":
AWGN_input = Fading_Channel.rayleigh(modulated_sig)
elif rayleigh == "No":
AWGN_input = modulated_sig
AWGN_output = Noise.AWGN(AWGN_input, SNR_dB=SNR)
demodulated_sig = Rx.DeModulation(AWGN_output, mod_mode, padding_num_demod)
if chcode_mode == "LDPC":
decoded_sig = Rx.LDPC_Decode(demodulated_sig, padding_num_LDPC)
elif chcode_mode == "Block":
decoded_sig = Rx.Block_Decode(demodulated_sig, padding_num_Block)
BER = BER_Check(input_sig, decoded_sig)
return input_sig, decoded_sig, BER
n_list = list([9, 10, 10, 12, 12, 12, 12, 12, 14, 14, 14, 14, 14, 15, 15, 15])
d_c_list = list([3, 5, 5, 4, 4, 6, 6, 6, 7, 7, 7, 7, 7, 5, 5, 5])
d_v_list = list([2, 2, 3, 2, 3, 2, 4, 5, 2, 3, 4, 5, 6, 4, 3, 2])
time_list = list()
BER_list = list()
SNR = 1
input_sig = Bit_Gen(10000)
for i in range(len(n_list)) :
start = time.time()
encoded_sig, padnum_chcod = Tx.LDPC_Encode(input_sig, n_list[i], d_c_list[i], d_v_list[i])
modulated_sig, padnum_demod = Tx.Modulation(encoded_sig, "4-PSK")
AWGN_output = Noise.AWGN(modulated_sig, SNR_dB = SNR)
demodulated_sig = Rx.DeModulation(AWGN_output, "4-PSK", padnum_demod)
decoded_sig = Rx.LDPC_Decode(demodulated_sig, n_list[i],d_c_list[i], d_v_list[i], padnum_chcod)
BER = BER_Check(input_sig, decoded_sig)
time_list.append( round((time.time()-start), 3) )
BER_list.append(BER)
plt.figure(figsize = (17, 8))
plt.subplot(2, 1, 1)
#plt.figure(figsize = (10, 5))
plt.bar(x_list, time_list)
plt.title("Time Comparison(LDPC Coding)")
plt.ylabel("time(sec)")
plt.subplot(2, 1, 2)
#plt.figure(figsize = (10, 5))
plt.bar(x_list, BER_list)
plt.title("BER Comparison(LDPC Coding)")
plt.ylabel("Bit Error Rate")
plt.show()
SNR = 1
SNR = 5
코드 실행 시간이 짧은 순은 (12, 9), (14, 11) >= (10, 7) > (15, 10)
코드 실행 시간이 긴 순은 (14, 7), (15, 6) >= (12, 6) > (14, 8)
10000개 랜덤 비트를 사용하였음.
100000개 이상의 데이터를 보내야 그래프 신뢰도가 높아질 것으로 보임.
Data Rate이 적을수록 확실히 BER 성능은 좋지만, 시간이 오래 걸린다는 것을 확인할 수 있음.
'Wireless Comm. > Python' 카테고리의 다른 글
Conventional_20230504 (0) | 2023.05.04 |
---|---|
save2 (0) | 2023.05.03 |
save2 (0) | 2023.05.03 |
save1 (0) | 2023.05.02 |
16/64QAM (0) | 2023.05.02 |
Comments