UOMOP
Conventional_20230504 본문
import cv2
import math
import time
import random
import graycode
import numpy as np
from PIL import Image
from numpy import sqrt
from tqdm import trange
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from pyldpc import make_ldpc, encode, decode, get_message
class For_Image:
def image2bit(path):
gray_bit = list()
img_gray = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
img_gray_flat = img_gray.flatten()
for i in range(len(img_gray_flat)):
gray_bit.append(format(img_gray_flat[i], 'b').zfill(8))
gray_stream = ''.join(gray_bit)
gray_output = list()
for i in range(len(gray_stream)):
gray_output.append(int(gray_stream[i]))
return gray_output
def Restore_image(input_sig):
channel = input_sig
str_1 = list()
for i in range(len(channel)):
str_1.append(str(channel[i]))
pixel_1 = list()
for i in range(int(len(channel) / 8)):
pixel_1.append(int(''.join(str_1[8 * i:8 * (i + 1)]), 2))
array = np.reshape((np.array(pixel_1, dtype=np.uint8)), (int(sqrt(len(pixel_1))), -1))
return array
def Compare_SNR(origin_img_name, path, max_snr, mod_mode, chcode_mode, rayleigh):
title_list = list(["original"])
BER_list = list([0])
image_path = path
gray_stream = For_Image.image2bit(image_path)
for i in range(max_snr):
SNR = i + 1
title_input = "SNR=" + str(SNR)
title_list.append(title_input)
input_sig, output_sig, BER = forward(gray_stream, SNR=i, mod_mode=mod_mode, rayleigh=rayleigh,
chcode_mode=chcode_mode)
BER_list.append(BER)
image_arr = For_Image.Restore_image(input_sig=output_sig)
img = Image.fromarray(image_arr)
img.save(title_input + ".jpg")
print("Complete : SNR = " + str(SNR))
plt.figure(figsize=(22, 6))
plt.subplot(1, max_snr + 1, 1)
img = cv2.imread(origin_img_name, cv2.IMREAD_GRAYSCALE)
plt.imshow(img)
plt.title(title_list[0])
for i in range(max_snr):
plt.subplot(1, max_snr + 1, i + 2)
img = cv2.imread(title_list[i + 1] + ".jpg")
plt.imshow(img)
titles = title_list[i + 1] + " (BER=" + str(round(BER_list[i + 1], 4)) + ")"
plt.title(titles)
def Bit_Gen(how_many):
return (np.random.randint(2, size=how_many)).tolist()
def BER_Check(list_1, list_2):
if (len(list_1) != len(list_2)): print("Lengths of the two streams are different.")
error = 0
print("BER ERROR Checking start")
for i in trange(len(list_1)):
if list_1[i] != list_2[i]:
error += 1
BER = round((error / len(list_1)), 7)
#print("BER : {}".format(BER))
print()
return BER
class Tx:
def Block_Encode(input_sig):
n = 6
k = 3
redund = int(len(input_sig)) % 3
padding_num_Block = k - redund
for i in range(padding_num_Block):
input_sig.append(0)
Gen_Mat = np.matrix([[1, 1, 0, 1, 0, 0],
[0, 1, 1, 0, 1, 0],
[1, 0, 1, 0, 0, 1]])
input_sig_list = list()
output_list = list()
result = list()
for i in range(int(len(input_sig) / k)):
for j in range(k):
input_sig_list.append(int(input_sig[i * k + j]))
print("(6, 3) Linear Block Coding start")
for i in trange(int(len(input_sig_list) / k)):
Data_Mat = np.matrix(input_sig_list[i * k: i * k + k])
a = (Data_Mat * Gen_Mat).A1
for j in range(len(a)):
if a[j] % 2 == 0:
a[j] = 0
elif (a[j] != 1) and (a[j] % 2 == 1):
a[j] = 1
output_list.append(a[j])
for i in range(padding_num_Block):
input_sig.pop()
print()
return output_list, padding_num_Block
def LDPC_Encode(input_sig, coding_rate):
n = coding_rate[0]
if coding_rate == (12, 9) :
d_c = 6
d_v = 2
elif coding_rate == (14, 11) :
d_c = 7
d_v = 2
elif coding_rate == (10, 7) :
d_c = 5
d_v = 2
elif coding_rate == (15, 10) :
d_c = 5
d_v = 2
elif coding_rate == (14, 10) :
d_c = 7
d_v = 3
snr = 10000000000000
H, G = make_ldpc(n, d_v, d_c, systematic=True, sparse=True)
k = G.shape[1]
print("({}, {})LDPC Channel Coding".format(n, k))
redund = int(len(input_sig)) % k
padding_num_ldpc = k - redund
for i in range(padding_num_ldpc):
input_sig.append(0)
coded_list = list()
input_arr = np.array(input_sig)
print("{} LDPC Channel Coding start".format(coding_rate))
for i in trange(int(len(input_arr) / k)):
result = encode(G, input_arr[k * i: k * (i + 1)], snr).tolist()
for j in range(len(result)):
coded_list.append(result[j])
for i in range(len(coded_list)):
coded_list[i] = int(coded_list[i])
if coded_list[i] < 0:
coded_list[i] = int(0)
for i in range(padding_num_ldpc):
input_sig.pop()
print()
return coded_list, padding_num_ldpc
def Modulation(input_sig, mode):
# mode : {M-ASK, M-FSK, M-PSK}
if mode[-3] == 'P':
M = int(mode[0])
k = int(math.log2(M))
redund = int(len(input_sig)) % k
padding_num_for_psk = k - redund
for i in range(padding_num_for_psk):
input_sig.append(0)
start_phase = np.pi * (1 / M)
phase_list = list([start_phase])
phase_step = start_phase * 2
for i in range(int(M / 2) - 1):
start_phase = phase_list[i] + phase_step
phase_list.append(start_phase)
phase_list_rev = list(reversed(phase_list))
for i in range(len(phase_list_rev)):
phase_list_rev[i] *= -1
phase_list = phase_list + phase_list_rev
result = list()
table_for_demod = list()
for i in range(M):
inphase = np.cos(phase_list[i])
quadrature = np.sin(phase_list[i])
table_for_demod.append(complex(inphase, quadrature))
print("{} Modulation start".format(mode))
for i in trange(int(len(input_sig) / k)):
bin_sig = "".join(map(str, input_sig[k * i: k * i + k]))
# print('0b' + bin_sig)
sig = int('0b' + bin_sig, 2)
inphase = np.cos(phase_list[sig])
quadrature = np.sin(phase_list[sig])
result.append(complex(inphase, quadrature))
for i in range(padding_num_for_psk):
input_sig.pop()
print()
return result, padding_num_for_psk
elif mode[-3] == 'Q':
if mode[0] == '1':
M = 16
elif mode[0] == '6':
M = 64
k = int(math.log2(M))
redund = int(len(input_sig)) % k
padding_num_for_qam = k - redund
for i in range(padding_num_for_qam):
input_sig.append(0)
list_gray = list()
list_bbb = list()
output_sig = list()
for i in range(int(sqrt(M))):
list_gray.append(format(graycode.tc_to_gray_code(i), 'b').zfill(int(k / 2)))
for i in range(int(sqrt(M))):
start_num = 1 - int(sqrt(M))
list_bbb.append(start_num + 2 * i)
print("{} Modulation start".format(mode))
for i in trange(int(len(input_sig) / k)):
cut_sig = input_sig[k * i: k * (i + 1)]
inphase_val = str("")
quadrature_val = str("")
for j in range(int(len(cut_sig) / 2)):
inphase_val = inphase_val + str(cut_sig[j])
quadrature_val = quadrature_val + str(cut_sig[j + int(len(cut_sig) / 2)])
check_inphase = 0
check_quadrature = 0
for t in range(int(sqrt(M))):
if inphase_val == list_gray[t]:
inphase = list_bbb[t]
if quadrature_val == list_gray[t]:
quadrature = list_bbb[t]
output_sig.append(complex(inphase, quadrature))
for i in range(padding_num_for_qam):
input_sig.pop()
print()
return output_sig, padding_num_for_qam
class Fading_Channel:
def rayleigh(input_sig):
output = list()
for i in range(len(input_sig)):
channel_coef = complex(random.gauss(0, 1), random.gauss(0, 1)) / sqrt(2)
output.append(abs(channel_coef) * input_sig[i])
return output
class Noise:
def AWGN(input_sig, SNR_dB):
target_snr_db = SNR_dB
sum_abs = 0
abs_list = list()
noise_out = list()
for i in range(len(input_sig)):
sum_abs = sum_abs + abs(input_sig[i])
for i in range(len(input_sig)):
abs_list.append(abs(input_sig[i]))
SNR_linear = 10 ** (SNR_dB / 10)
input_sig_power = np.mean(abs_list)
N0 = input_sig_power / SNR_linear
for i in range(len(input_sig)):
noise = sqrt(N0 / 2) * complex(random.gauss(0, 1), random.gauss(0, 1))
noise_out.append(input_sig[i] + noise)
return noise_out
class Rx:
def DeModulation(input_sig, mode, padding_num_for_demod):
if mode[2] == 'P':
M = int(mode[0])
k = int(math.log2(M))
bit_list = list()
start_phase = np.pi * (1 / M)
phase_list = list([start_phase])
phase_step = start_phase * 2
for i in range(int(M / 2) - 1):
start_phase = phase_list[i] + phase_step
phase_list.append(start_phase)
phase_list_rev = list(reversed(phase_list))
for i in range(len(phase_list_rev)):
phase_list_rev[i] *= -1
phase_list = phase_list + phase_list_rev
result = list()
table_for_demod = list()
for i in range(M):
inphase = np.cos(phase_list[i])
quadrature = np.sin(phase_list[i])
table_for_demod.append(complex(inphase, quadrature))
print("{} DeModulation start".format(mode))
for i in trange(len(input_sig)):
input_sig_x = float(input_sig[i].real)
input_sig_y = float(input_sig[i].imag)
distance = 1000
output = 0
for j in range(M):
if distance > sqrt(
(input_sig_x - table_for_demod[j].real) ** 2 + (
input_sig_y - table_for_demod[j].imag) ** 2):
distance = sqrt(
(input_sig_x - table_for_demod[j].real) ** 2 + (input_sig_y - table_for_demod[j].imag) ** 2)
output = j
bin_output = format(output, 'b').zfill(k)
for q in range(k):
bit_list.append(int(bin_output[q]))
for i in range(padding_num_for_demod):
bit_list.pop()
print()
return bit_list
elif mode[-3] == 'Q':
if mode[0] == '1':
M = 16
elif mode[0] == '6':
M = 64
k = int(math.log2(M))
list_gray = list()
list_bbb = list()
output_sig = list()
all_candidate = list()
demodulated_list = list()
for i in range(int(sqrt(M))):
list_gray.append(format(graycode.tc_to_gray_code(i), 'b').zfill(int(k / 2)))
for i in range(int(sqrt(M))):
start_num = 1 - int(sqrt(M))
list_bbb.append(start_num + 2 * i)
for i in range(len(list_bbb)):
for j in range(len(list_bbb)):
all_candidate.append(complex(list_bbb[i], list_bbb[j]))
print("{} DeModulation start".format(mode))
for i in trange(len(input_sig)):
d_min = 100
save_index = 0
for j in range(len(all_candidate)):
if sqrt(((float(input_sig[i].real) - float(all_candidate[j].real)) ** 2) + (
(float(input_sig[i].imag) - float(all_candidate[j].imag)) ** 2)) < d_min:
d_min = sqrt(((float(input_sig[i].real) - float(all_candidate[j].real)) ** 2) + (
(float(input_sig[i].imag) - float(all_candidate[j].imag)) ** 2))
inphase_val = int(all_candidate[j].real)
quadrature_val = int(all_candidate[j].imag)
for k in range(len(list_bbb)):
if (inphase_val == list_bbb[k]):
output_first = list_gray[k]
if (quadrature_val == list_bbb[k]):
output_second = list_gray[k]
result = output_first + output_second
for q in range(len(result)):
demodulated_list.append(int(result[q]))
for i in range(padding_num_for_demod):
demodulated_list.pop()
print()
return demodulated_list
def Block_Decode(input_sig, padding_num_Block):
n = 6
k = 3
Parity_Check_Mat = np.matrix([[1, 0, 0],
[0, 1, 0],
[0, 0, 1],
[1, 1, 0],
[0, 1, 1],
[1, 0, 1]])
syndrom_table = np.matrix([[0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 1],
[0, 0, 0, 0, 1, 0],
[0, 0, 0, 1, 0, 0],
[0, 0, 1, 0, 0, 0],
[0, 1, 0, 0, 0, 0],
[1, 0, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 1]])
output_list = list()
print("(6, 3) Linear Block Decoding start")
for i in trange(int(len(input_sig) / n)):
r_list = list()
cut_input_sig = input_sig[n * i: n * i + n]
for j in range(len(cut_input_sig)):
r_list.append(cut_input_sig[j])
r_Mat = np.matrix(r_list)
syndrom = r_Mat * Parity_Check_Mat
sum_syn = 0
for w in range(3):
if syndrom[0, w] % 2 == 0:
syndrom[0, w] = 0
elif (syndrom[0, w] != 1) and (syndrom[0, w] % 2 == 1):
syndrom[0, w] = 1
for q in range(3):
sum_syn += syndrom[0, q] * (2 ** q)
error_cor = np.matrix(cut_input_sig) + syndrom_table[sum_syn]
for p in range(k):
if error_cor[0, p + 3] % 2 == 0:
error_cor[0, p + 3] = 0
elif (error_cor[0, p + 3] != 1) and (error_cor[0, p + 3] % 2 == 1):
error_cor[0, p + 3] = 1
output_list.append(error_cor[0, p + 3])
for i in range(padding_num_Block):
output_list.pop()
print()
return output_list
def LDPC_Decode(input_sig, coding_rate, padding_num_ldpc):
n = coding_rate[0]
if coding_rate == (12, 9) :
d_c = 6
d_v = 2
elif coding_rate == (14, 11) :
d_c = 7
d_v = 2
elif coding_rate == (10, 7) :
d_c = 5
d_v = 2
elif coding_rate == (15, 10) :
d_c = 5
d_v = 2
elif coding_rate == (14, 10) :
d_c = 7
d_v = 3
for i in range(len(input_sig)):
input_sig[i] = float(input_sig[i])
if input_sig[i] == 0:
input_sig[i] = float(-1)
snr = 10000000000000
H, G = make_ldpc(n, d_v, d_c, systematic=True, sparse=True)
k = G.shape[1]
input_sig = np.array(input_sig)
decoded_list = list()
print("{} LDPC Decoding start".format(coding_rate))
for i in trange(int(len(input_sig) / n)):
result = decode(H, input_sig[n * i: n * (i + 1)], snr)
restored_msg = get_message(G, result)
for j in range(len(restored_msg)):
decoded_list.append(restored_msg[j])
for i in range(padding_num_ldpc):
decoded_list.pop()
print()
return decoded_list
def forward(input_sig, SNR, mod_mode, rayleigh, chcode_mode):
error = 0
if chcode_mode == "LDPC":
encoded_sig, padding_num_LDPC = Tx.LDPC_Encode(input_sig, (12, 9))
elif chcode_mode == "Block":
encoded_sig, padding_num_Block = Tx.Block_Encode(input_sig)
modulated_sig, padding_num_demod = Tx.Modulation(input_sig=encoded_sig, mode=mod_mode)
if rayleigh == "Yes":
AWGN_input = Fading_Channel.rayleigh(modulated_sig)
elif rayleigh == "No":
AWGN_input = modulated_sig
AWGN_output = Noise.AWGN(AWGN_input, SNR_dB=SNR)
demodulated_sig = Rx.DeModulation(AWGN_output, mod_mode, padding_num_demod)
if chcode_mode == "LDPC":
decoded_sig = Rx.LDPC_Decode(demodulated_sig, (12, 9), padding_num_LDPC)
elif chcode_mode == "Block":
decoded_sig = Rx.Block_Decode(demodulated_sig, padding_num_Block)
BER = BER_Check(input_sig, decoded_sig)
print("BER : {}".format(BER))
return input_sig, decoded_sig, BER
class BER_Graph:
def Compare_rayleigh(input_sig, snr_range, snr_step, mod_mode, chcode_mode):
BER_list = list()
BER_list_Ray = list()
SNR_list = list()
Block_level = 100 # 임의의 정수로 잡아둔다
num_bit = len(input_sig)
print("Start the code without rayleigh")
for i in np.arange(0, snr_range + 1, snr_step):
error = 0
input_sig, decoded_sig, BER = forward(input_sig=input_sig, SNR=i, mod_mode=mod_mode, rayleigh="No",
chcode_mode=chcode_mode)
BER_list.append(BER)
SNR_list.append(i)
print("SNR = {}, Complete!\tBER : {}".format(i, BER))
print("\n\nStart the code with rayleigh")
for i in np.arange(0, snr_range + 1, snr_step):
error = 0
input_sig, decoded_sig, BER = forward(input_sig=input_sig, SNR=i, mod_mode=mod_mode, rayleigh="Yes",
chcode_mode=chcode_mode)
BER_list_Ray.append(BER)
print("SNR = {}, Complete!\tBER : {}".format(i, BER))
plt.plot(SNR_list, BER_list, marker='o', linestyle='dashed', color='blue', label='No rayleigh')
plt.plot(SNR_list, BER_list_Ray, marker='x', linestyle='dotted', color='red', label='with rayleigh')
plt.axis([0, SNR_list[-1] + 1, 1e-6, 1])
plt.xscale('linear')
plt.yscale('log')
plt.xlabel('EbNo(dB)')
plt.ylabel('BER')
plt.grid(True)
plt.legend()
plt.show()
def Compare_chcode(input_sig, snr_range, snr_step, mod_mode, rayleigh):
BER_list_Block = list()
BER_list_LDPC = list()
SNR_list = list()
print("Start the code with LDPC")
for i in np.arange(0, snr_range + 1, snr_step):
error = 0
input_sig, decoded_sig, BER = forward(input_sig=input_sig, SNR=i, mod_mode=mod_mode, rayleigh=rayleigh,
chcode_mode="LDPC")
BER_list_LDPC.append(BER)
SNR_list.append(i)
print("SNR = {}, Complete!\tBER : {}".format(i, BER))
print("\n\nStart the code with Linear Block Coding")
for i in np.arange(0, snr_range + 1, snr_step):
error = 0
input_sig, decoded_sig, BER = forward(input_sig=input_sig, SNR=i, mod_mode=mod_mode, rayleigh=rayleigh,
chcode_mode="Block")
BER_list_Block.append(BER)
print("SNR = {}, Complete!\tBER : {}".format(i, BER))
plt.plot(SNR_list, BER_list_LDPC, marker='o', linestyle='dashed', color='blue', label='LDPC')
plt.plot(SNR_list, BER_list_Block, marker='x', linestyle='dotted', color='red', label='Block Coding')
plt.axis([0, SNR_list[-1] + 1, 1e-6, 1])
plt.xscale('linear')
plt.yscale('log')
plt.xlabel('EbNo(dB)')
plt.ylabel('BER')
plt.grid(True)
plt.legend()
plt.show()
##################################################################################################################################
input_sig = Bit_Gen(1000000)
SNR = 7
mod_mode = "64-QAM"
rayleigh = "No"
chcode_mode = "LDPC"
input_sig, output_sig, BER = forward(input_sig, SNR, mod_mode, rayleigh, chcode_mode)
'Wireless Comm. > Python' 카테고리의 다른 글
AutoEncoder for Fashion MNIST (0) | 2023.05.11 |
---|---|
Image PSNR Check_20230508 (0) | 2023.05.08 |
save2 (0) | 2023.05.03 |
LPDC (n, k) comparison (0) | 2023.05.03 |
save2 (0) | 2023.05.03 |
Comments