UOMOP

LPDC (n, k) comparison 본문

Wireless Comm./Python

LPDC (n, k) comparison

Happy PinGu 2023. 5. 3. 17:14
import cv2
import math
import time
import random
import graycode
import numpy as np
from PIL import Image
from numpy import sqrt
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from pyldpc import make_ldpc, encode, decode, get_message

class For_Image:

    def image2bit(path):

        gray_bit = list()
        img_gray = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
        img_gray_flat = img_gray.flatten()

        for i in range(len(img_gray_flat)):
            gray_bit.append(format(img_gray_flat[i], 'b').zfill(8))
        gray_stream = ''.join(gray_bit)

        gray_output = list()

        for i in range(len(gray_stream)):
            gray_output.append(int(gray_stream[i]))

        return gray_output

    def Restore_image(input_sig):

        channel = input_sig

        str_1 = list()

        for i in range(len(channel)):
            str_1.append(str(channel[i]))

        pixel_1 = list()

        for i in range(int(len(channel) / 8)):
            pixel_1.append(int(''.join(str_1[8 * i:8 * (i + 1)]), 2))

        array = np.reshape((np.array(pixel_1, dtype=np.uint8)), (int(sqrt(len(pixel_1))), -1))

        return array

    def Compare_SNR(origin_img_name, path, max_snr, mod_mode, chcode_mode, rayleigh):
        title_list = list(["original"])
        BER_list = list([0])

        image_path = path
        gray_stream = For_Image.image2bit(image_path)

        for i in range(max_snr):
            SNR = i + 1
            title_input = "SNR=" + str(SNR)
            title_list.append(title_input)

            input_sig, output_sig, BER = forward(gray_stream, SNR=i, mod_mode=mod_mode, rayleigh=rayleigh,
                                                 chcode_mode=chcode_mode)
            BER_list.append(BER)

            image_arr = For_Image.Restore_image(input_sig=output_sig)
            img = Image.fromarray(image_arr)
            img.save(title_input + ".jpg")
            print("Complete : SNR = " + str(SNR))

        plt.figure(figsize=(22, 6))

        plt.subplot(1, max_snr + 1, 1)
        img = cv2.imread(origin_img_name, cv2.IMREAD_GRAYSCALE)
        plt.imshow(img)
        plt.title(title_list[0])

        for i in range(max_snr):
            plt.subplot(1, max_snr + 1, i + 2)
            img = cv2.imread(title_list[i + 1] + ".jpg")
            plt.imshow(img)
            titles = title_list[i + 1] + "  (BER=" + str(round(BER_list[i + 1], 4)) + ")"
            plt.title(titles)


def Bit_Gen(how_many):
    return (np.random.randint(2, size=how_many)).tolist()


def BER_Check(list_1, list_2):

    if (len(list_1) != len(list_2)): print("Lengths of the two streams are different.")
    error = 0
    for i in range(len(list_1)):
        if list_1[i] != list_2[i]:
            error += 1

    BER = error / len(list_1)
    print("BER : {}".format(BER))

    return BER


class Tx:
    def Block_Encode(input_sig):

        n = 6
        k = 3

        redund = int(len(input_sig)) % 3
        padding_num_Block = k - redund

        for i in range(padding_num_Block):
            input_sig.append(0)

        Gen_Mat = np.matrix([[1, 1, 0, 1, 0, 0],
                             [0, 1, 1, 0, 1, 0],
                             [1, 0, 1, 0, 0, 1]])

        input_sig_list = list()
        output_list = list()
        result = list()

        for i in range(int(len(input_sig) / k)):
            for j in range(k):
                input_sig_list.append(int(input_sig[i * k + j]))

        for i in range(int(len(input_sig_list) / k)):
            Data_Mat = np.matrix(input_sig_list[i * k: i * k + k])

            a = (Data_Mat * Gen_Mat).A1

            for j in range(len(a)):
                if a[j] % 2 == 0:
                    a[j] = 0

                elif (a[j] != 1) and (a[j] % 2 == 1):
                    a[j] = 1
                output_list.append(a[j])

        for i in range(padding_num_Block):
            input_sig.pop()

        return output_list, padding_num_Block



    def LDPC_Encode(input_sig, n, d_c, d_v):

        snr = 10000000000000
        H, G = make_ldpc(n, d_v, d_c, systematic=True, sparse=True)
        k = G.shape[1]

        print("({}, {})LDPC Channel Coding".format(n, k))

        redund = int(len(input_sig)) % k
        padding_num_ldpc = k - redund

        for i in range(padding_num_ldpc):
            input_sig.append(0)

        coded_list = list()

        input_arr = np.array(input_sig)

        for i in range(int(len(input_arr) / k)):

            result = encode(G, input_arr[k * i: k * (i + 1)], snr).tolist()

            for j in range(len(result)):
                coded_list.append(result[j])

        for i in range(len(coded_list)):
            coded_list[i] = int(coded_list[i])
            if coded_list[i] < 0:
                coded_list[i] = int(0)

        for i in range(padding_num_ldpc):
            input_sig.pop()

        return coded_list, padding_num_ldpc



    def Modulation(input_sig, mode):
        # mode : {M-ASK, M-FSK, M-PSK}

        if mode[-3] == 'P':

            M = int(mode[0])
            k = int(math.log2(M))

            redund = int(len(input_sig)) % k
            padding_num_for_psk = k - redund

            for i in range(padding_num_for_psk):
                input_sig.append(0)

            start_phase = np.pi * (1 / M)

            phase_list = list([start_phase])
            phase_step = start_phase * 2

            for i in range(int(M / 2) - 1):
                start_phase = phase_list[i] + phase_step
                phase_list.append(start_phase)

            phase_list_rev = list(reversed(phase_list))

            for i in range(len(phase_list_rev)):
                phase_list_rev[i] *= -1

            phase_list = phase_list + phase_list_rev

            result = list()
            table_for_demod = list()

            for i in range(M):
                inphase = np.cos(phase_list[i])
                quadrature = np.sin(phase_list[i])

                table_for_demod.append(complex(inphase, quadrature))

            for i in range(int(len(input_sig) / k)):
                bin_sig = "".join(map(str, input_sig[k * i: k * i + k]))
                # print('0b' + bin_sig)

                sig = int('0b' + bin_sig, 2)

                inphase = np.cos(phase_list[sig])
                quadrature = np.sin(phase_list[sig])

                result.append(complex(inphase, quadrature))

            for i in range(padding_num_for_psk):
                input_sig.pop()

            return result, padding_num_for_psk


        elif mode[-3] == 'Q':

            if mode[0] == '1':
                M = 16
            elif mode[0] == '6':
                M = 64

            k = int(math.log2(M))

            redund = int(len(input_sig)) % k
            padding_num_for_qam = k - redund

            for i in range(padding_num_for_qam):
                input_sig.append(0)

            list_gray = list()
            list_bbb = list()
            output_sig = list()

            for i in range(int(sqrt(M))):
                list_gray.append(format(graycode.tc_to_gray_code(i), 'b').zfill(int(k / 2)))

            for i in range(int(sqrt(M))):
                start_num = 1 - int(sqrt(M))
                list_bbb.append(start_num + 2 * i)

            for i in range(int(len(input_sig) / k)):
                cut_sig = input_sig[k * i: k * (i + 1)]
                inphase_val = str("")
                quadrature_val = str("")

                for j in range(int(len(cut_sig) / 2)):
                    inphase_val = inphase_val + str(cut_sig[j])
                    quadrature_val = quadrature_val + str(cut_sig[j + int(len(cut_sig) / 2)])

                check_inphase = 0
                check_quadrature = 0

                for t in range(int(sqrt(M))):
                    if inphase_val == list_gray[t]:
                        inphase = list_bbb[t]
                    if quadrature_val == list_gray[t]:
                        quadrature = list_bbb[t]

                output_sig.append(complex(inphase, quadrature))

            for i in range(padding_num_for_qam):
                input_sig.pop()

            return output_sig, padding_num_for_qam


class Fading_Channel:
    def rayleigh(input_sig):
        output = list()

        for i in range(len(input_sig)):
            channel_coef = complex(random.gauss(0, 1), random.gauss(0, 1)) / sqrt(2)

            output.append(abs(channel_coef) * input_sig[i])

        return output


class Noise:
    def AWGN(input_sig, SNR_dB):

        target_snr_db = SNR_dB

        sum_abs = 0
        abs_list = list()
        noise_out = list()

        for i in range(len(input_sig)):
            sum_abs = sum_abs + abs(input_sig[i])

        for i in range(len(input_sig)):
            abs_list.append(abs(input_sig[i]))

        SNR_linear = 10 ** (SNR_dB / 10)

        input_sig_power = np.mean(abs_list)

        N0 = input_sig_power / SNR_linear

        for i in range(len(input_sig)):
            noise = sqrt(N0 / 2) * complex(random.gauss(0, 1), random.gauss(0, 1))
            noise_out.append(input_sig[i] + noise)

        return noise_out


class Rx:
    def DeModulation(input_sig, mode, padding_num_for_demod):

        if mode[2] == 'P':
            M = int(mode[0])
            k = int(math.log2(M))
            bit_list = list()

            start_phase = np.pi * (1 / M)

            phase_list = list([start_phase])
            phase_step = start_phase * 2

            for i in range(int(M / 2) - 1):
                start_phase = phase_list[i] + phase_step
                phase_list.append(start_phase)

            phase_list_rev = list(reversed(phase_list))

            for i in range(len(phase_list_rev)):
                phase_list_rev[i] *= -1

            phase_list = phase_list + phase_list_rev

            result = list()
            table_for_demod = list()

            for i in range(M):
                inphase = np.cos(phase_list[i])
                quadrature = np.sin(phase_list[i])

                table_for_demod.append(complex(inphase, quadrature))

            for i in range(len(input_sig)):
                input_sig_x = float(input_sig[i].real)
                input_sig_y = float(input_sig[i].imag)

                distance = 1000
                output = 0

                for j in range(M):
                    if distance > sqrt(
                            (input_sig_x - table_for_demod[j].real) ** 2 + (
                                    input_sig_y - table_for_demod[j].imag) ** 2):
                        distance = sqrt(
                            (input_sig_x - table_for_demod[j].real) ** 2 + (input_sig_y - table_for_demod[j].imag) ** 2)
                        output = j

                bin_output = format(output, 'b').zfill(k)

                for q in range(k):
                    bit_list.append(int(bin_output[q]))

            for i in range(padding_num_for_demod):
                bit_list.pop()

            return bit_list


        elif mode[-3] == 'Q':

            if mode[0] == '1':
                M = 16
            elif mode[0] == '6':
                M = 64

            k = int(math.log2(M))

            list_gray = list()
            list_bbb = list()
            output_sig = list()
            all_candidate = list()
            demodulated_list = list()

            for i in range(int(sqrt(M))):
                list_gray.append(format(graycode.tc_to_gray_code(i), 'b').zfill(int(k / 2)))

            for i in range(int(sqrt(M))):
                start_num = 1 - int(sqrt(M))
                list_bbb.append(start_num + 2 * i)

            for i in range(len(list_bbb)):
                for j in range(len(list_bbb)):
                    all_candidate.append(complex(list_bbb[i], list_bbb[j]))

            for i in range(len(input_sig)):

                d_min = 100
                save_index = 0

                for j in range(len(all_candidate)):
                    if sqrt(((float(input_sig[i].real) - float(all_candidate[j].real)) ** 2) + (
                            (float(input_sig[i].imag) - float(all_candidate[j].imag)) ** 2)) < d_min:
                        d_min = sqrt(((float(input_sig[i].real) - float(all_candidate[j].real)) ** 2) + (
                                    (float(input_sig[i].imag) - float(all_candidate[j].imag)) ** 2))
                        inphase_val = int(all_candidate[j].real)
                        quadrature_val = int(all_candidate[j].imag)

                for k in range(len(list_bbb)):
                    if (inphase_val == list_bbb[k]):
                        output_first = list_gray[k]

                    if (quadrature_val == list_bbb[k]):
                        output_second = list_gray[k]

                result = output_first + output_second

                for q in range(len(result)):
                    demodulated_list.append(int(result[q]))

            for i in range(padding_num_for_demod):
                demodulated_list.pop()

            return demodulated_list

    def Block_Decode(input_sig, padding_num_Block):

        n = 6
        k = 3
        Parity_Check_Mat = np.matrix([[1, 0, 0],
                                      [0, 1, 0],
                                      [0, 0, 1],
                                      [1, 1, 0],
                                      [0, 1, 1],
                                      [1, 0, 1]])

        syndrom_table = np.matrix([[0, 0, 0, 0, 0, 0],
                                   [0, 0, 0, 0, 0, 1],
                                   [0, 0, 0, 0, 1, 0],
                                   [0, 0, 0, 1, 0, 0],
                                   [0, 0, 1, 0, 0, 0],
                                   [0, 1, 0, 0, 0, 0],
                                   [1, 0, 0, 0, 0, 0],
                                   [0, 1, 0, 0, 0, 1]])

        output_list = list()

        for i in range(int(len(input_sig) / n)):
            r_list = list()
            cut_input_sig = input_sig[n * i: n * i + n]

            for j in range(len(cut_input_sig)):
                r_list.append(cut_input_sig[j])

            r_Mat = np.matrix(r_list)
            syndrom = r_Mat * Parity_Check_Mat
            sum_syn = 0

            for w in range(3):
                if syndrom[0, w] % 2 == 0:
                    syndrom[0, w] = 0
                elif (syndrom[0, w] != 1) and (syndrom[0, w] % 2 == 1):
                    syndrom[0, w] = 1

            for q in range(3):
                sum_syn += syndrom[0, q] * (2 ** q)

            error_cor = np.matrix(cut_input_sig) + syndrom_table[sum_syn]

            for p in range(k):
                if error_cor[0, p + 3] % 2 == 0:
                    error_cor[0, p + 3] = 0
                elif (error_cor[0, p + 3] != 1) and (error_cor[0, p + 3] % 2 == 1):
                    error_cor[0, p + 3] = 1

                output_list.append(error_cor[0, p + 3])

        for i in range(padding_num_Block):
            output_list.pop()

        return output_list

    def LDPC_Decode(input_sig, n, d_c, d_v, padding_num_ldpc):

        for i in range(len(input_sig)):
            input_sig[i] = float(input_sig[i])
            if input_sig[i] == 0:
                input_sig[i] = float(-1)

        snr = 10000000000000
        H, G = make_ldpc(n, d_v, d_c, systematic=True, sparse=True)
        k = G.shape[1]

        input_sig = np.array(input_sig)

        decoded_list = list()

        for i in range(int(len(input_sig) / n)):
            result = decode(H, input_sig[n * i: n * (i + 1)], snr)
            restored_msg = get_message(G, result)

            for j in range(len(restored_msg)):
                decoded_list.append(restored_msg[j])

        for i in range(padding_num_ldpc):
            decoded_list.pop()

        return decoded_list


def forward(input_sig, SNR, mod_mode, rayleigh, chcode_mode):
    error = 0

    if chcode_mode == "LDPC":
        encoded_sig, padding_num_LDPC = Tx.LDPC_Encode(input_sig)
    elif chcode_mode == "Block":
        encoded_sig, padding_num_Block = Tx.Block_Encode(input_sig)

    modulated_sig, padding_num_demod = Tx.Modulation(input_sig=encoded_sig, mode=mod_mode)

    if rayleigh == "Yes":
        AWGN_input = Fading_Channel.rayleigh(modulated_sig)
    elif rayleigh == "No":
        AWGN_input = modulated_sig

    AWGN_output = Noise.AWGN(AWGN_input, SNR_dB=SNR)
    demodulated_sig = Rx.DeModulation(AWGN_output, mod_mode, padding_num_demod)

    if chcode_mode == "LDPC":
        decoded_sig = Rx.LDPC_Decode(demodulated_sig, padding_num_LDPC)
    elif chcode_mode == "Block":
        decoded_sig = Rx.Block_Decode(demodulated_sig, padding_num_Block)

    BER = BER_Check(input_sig, decoded_sig)

    return input_sig, decoded_sig, BER
n_list = list([9, 10, 10, 12, 12, 12, 12, 12, 14, 14, 14, 14, 14, 15, 15, 15])
d_c_list = list([3, 5, 5, 4, 4, 6, 6, 6, 7, 7, 7, 7, 7, 5, 5, 5])
d_v_list = list([2, 2, 3, 2, 3, 2, 4, 5, 2, 3, 4, 5, 6, 4, 3, 2])

time_list = list()
BER_list = list()

SNR = 1
input_sig = Bit_Gen(10000)

for i in range(len(n_list)) :
    start = time.time()
    encoded_sig, padnum_chcod = Tx.LDPC_Encode(input_sig, n_list[i], d_c_list[i], d_v_list[i])
    modulated_sig, padnum_demod = Tx.Modulation(encoded_sig, "4-PSK")

    AWGN_output = Noise.AWGN(modulated_sig, SNR_dB = SNR)

    demodulated_sig = Rx.DeModulation(AWGN_output, "4-PSK", padnum_demod)
    decoded_sig              = Rx.LDPC_Decode(demodulated_sig, n_list[i],d_c_list[i], d_v_list[i], padnum_chcod)
    BER = BER_Check(input_sig, decoded_sig)
    time_list.append( round((time.time()-start), 3) )
    BER_list.append(BER)
    
plt.figure(figsize = (17, 8))

plt.subplot(2, 1, 1)
#plt.figure(figsize = (10, 5))
plt.bar(x_list, time_list)
plt.title("Time Comparison(LDPC Coding)")
plt.ylabel("time(sec)")

plt.subplot(2, 1, 2)
#plt.figure(figsize = (10, 5))
plt.bar(x_list, BER_list)
plt.title("BER Comparison(LDPC Coding)")
plt.ylabel("Bit Error Rate")

plt.show()

 

 

SNR = 1

 

 

SNR = 5

 

 

코드 실행 시간이 짧은 순은 (12, 9), (14, 11) >= (10, 7) > (15, 10)
코드 실행 시간이 긴 순은 (14, 7), (15, 6) >= (12, 6) > (14, 8)

10000개 랜덤 비트를 사용하였음.
100000개 이상의 데이터를 보내야 그래프 신뢰도가 높아질 것으로 보임.
Data Rate이 적을수록 확실히 BER 성능은 좋지만, 시간이 오래 걸린다는 것을 확인할 수 있음.

'Wireless Comm. > Python' 카테고리의 다른 글

Conventional_20230504  (0) 2023.05.04
save2  (0) 2023.05.03
save2  (0) 2023.05.03
save1  (0) 2023.05.02
16/64QAM  (0) 2023.05.02
Comments