본문 바로가기
  • 머킹이의 머신로그
AI/코드 실습하기

[코드 실습]Transformer 직접 코딩해보기

by 머킹 2023. 9. 25.
728x90

The Annotated Transformer 보다 친절한 트랜스포머 튜토리얼, 2023 최신 오류 수정 버전

 

안녕하세요 머킹입니다.

오늘은 트랜스포머 튜토리얼을 직접 코딩해 보면서 발견한 오류를 수정하며

기록을 남겨보려고 합니다.

 

일단 저는 VS code를 사용했고

사용 데이터는  AI Hub에서 한국어-영어 번역 말뭉치 데이터 다운로드를 해야 합니다.

 

cf. 현재 AI Hub에서는 다양한 한영 번역 데이터셋을 구축해 총 160만 쌍의 데이터를 제공해주고 있지만, 모든 문장을 훈련시키기에는 데이터가 과도하므로 본 튜토리얼에서는 구어체 데이터 1 & 2만을 사용하도록 합니다.

 

코드 실습

import pandas as pd
import openpyxl

xls_a = pd.read_excel('C:/Users/user/Desktop/vs/한국어_영어번역(병렬)말뭉치/1_구어체(1).xlsx', index_col=None)
xls_b = pd.read_excel('C:/Users/user/Desktop/vs/한국어_영어번역(병렬)말뭉치/1_구어체(2).xlsx', index_col=None)
# index_col=None으로 설정하면 첫 번째 열을 인덱스로 사용하지 않고 데이터프레임을 생성

xls_a.to_csv('C:/Users/user/Desktop/vs/한국어_영어번역(병렬)말뭉치/1_구어체(1).xlsx', encoding='utf-8', index=False)
xls_b.to_csv('C:/Users/user/Desktop/vs/한국어_영어번역(병렬)말뭉치/1_구어체(2).xlsx', encoding='utf-8', index=False)

저는 이 코드를 실행하고 나서 밑에서 다시 코드를 돌리면 꼭 파일이 깨지는 오류가 발생했는데요.

그럴 때는 다시 파일을 삭제하고 다시 다운로드하니 해결이 잘 되었습니다.

그리고 경로 설정을 꼭 잘해주셔야 합니다!

 

- 이제, CSV로 변환한 말뭉치 파일을 pandas 라이브러리를 이용해 읽어옵니다.
- 읽어온 파일의 각 행을 돌며, 한국어 문장과 영어 문장을 각각의 리스트에 저장해줍니다.

data_a = pd.read_csv('C:/Users/user/Desktop/vs/한국어_영어번역(병렬)말뭉치/1_구어체(1).xlsx', encoding='utf-8')
data_b = pd.read_csv('C:/Users/user/Desktop/vs/한국어_영어번역(병렬)말뭉치/1_구어체(2).xlsx', encoding='utf-8')

data = pd.concat([data_b, data_a], ignore_index=True)
data.head()

이렇게 데이터를 찍어보면 이런 결과가 뜹니다.

  SID 원문 번역문
0 2000001 0 설정을 입력하고 안정될 때까지 5분 동안 기다린 후 OK 버튼을 길게 누르십시오. Enter into 0 setting, and wait for 5 minutes t...
1 2000002 0은 그들에게 아무것도 아니었지만 무는 숫자일 수가 없습니다 The zero was nothing for them but nothing coul...
2 2000003 1,015버전에서 핫키 버그가 있습니다. There is a Hotkey bug in the 1,015 version.
- 두 코퍼스를 병합해 총 400,000개의 병렬 문장 쌍이 만들어진 것을 확인하였습니다!
- 이제 해당 데이터 프레임을 돌며, 한국어 문장과 영어 문장을 리스트에 각각 저장해줍니다.
kor_lines = []
eng_lines = []

for _, row in data.iterrows():
    _, kor, eng = row
    kor_lines.append(kor)
    eng_lines.append(eng)
    
for kor, eng in zip(kor_lines[:5], eng_lines[:5]):
    print(f'[KOR]: {kor}')
    print(f'[ENG]: {eng}\n')

[KOR]: 0 설정을 입력하고 안정될 때까지 5분 동안 기다린 후 OK 버튼을 길게 누르십시오.

[ENG]: Enter into 0 setting, and wait for 5 minutes to make it stable, then long-press OK button.

[KOR]: 0은 그들에게 아무것도 아니었지만 무는 숫자일 수가 없습니다.

[ENG]: The zero was nothing for them but nothing couldn't be a number.

[KOR]: 1,015 버전에서 핫키 버그가 있습니다.

[ENG]: There is a Hotkey bug in the 1,015 version.

[KOR]: 1,390점에서 1,440점을 득점한 사람은 재판을 위해 걸러집니다.

[ENG]: Individuals who got a score between 1,390 and 1,440 are selected for a judge.

[KOR]: 1,400년보다 오래 전의 유적지에 있는 최초의 성당에서 숭배자들은 그것을 인지했을 것입니다.

[ENG]: Indeed, worshippers at the very first cathedral on this site, over 1,400 years ago, would have still recognized it.

이런 결과가 나오게 됩니다.

 

# 한국어 토크나이저 훈련 데이터 제작

with open('train_koeran.txt', 'w', encoding='utf-8') as f:
    for line in kor_lines:
        print(line, file=f)


# 영어 토크나이저 훈련 데이터 제작
with open('trian_english.txt', 'w', encoding='utf-8') as f:
    for line in eng_lines:
        print(line, file=f)
### 1. BPE 토크나이저 학습
> Byte Pair Encoding (단어 분리 및 압축 알고리즘)
앞서 가공한 데이터들을 활용해 BPE 토크나이저를 학습시킵니다.

토크나이저를 학습시키기 앞서 프로젝트 전반에 사용될 변수 사전을 정의합니다.

params = {
    'batch_size' : 64,
    'num_epoch' : 15,
    'dropout' : 0.1,
    'min_frequency' : 3,

    'vocab_size' : 20000,
    'num_layers' : 6,
    'num_heads' : 8,
    'hidden_dim' : 512,
    'ffn_dim' : 2048,
}
'''
batch_size: 모델이 한 번에 처리하는 입력 데이터의 샘플 개수.
num_epoch: 전체 데이터셋을 몇 번 반복해서 학습할 것인지를 결정하는 에포크 수.
dropout: 모델의 학습 과정에서 일부 유닛(뉴런)을 무작위로 비활성화하는 비율.
min_frequency: 어휘 구축 시 고려하는 최소 빈도 수.
vocab_size: 모델 어휘 사전의 크기로, 모델이 학습할 수 있는 고유한 단어의 수.
num_layers: 트랜스포머 모델의 층(layer) 수. (즉, 인코더, 디코더 갯수)
num_heads: 멀티 헤드 어텐션에서 사용되는 어텐션 헤드(head)의 수.
hidden_dim: 모델 내부의 은닉 차원 크기.
ffn_dim: 피드 포워드 네트워크의 은닉 크기.
'''
from tokenizers import Tokenizer, models, trainers, pre_tokenizers, decoders

# 토크나이저 모델 설정
tokenizer_model = models.BPE()

# 토크나이저 초기화
kor_tokenizer = Tokenizer(tokenizer_model)

# 토크나이저 트레이너 초기화
trainer = trainers.BpeTrainer(
    vocab_size=params['vocab_size'],
    min_frequency=params['min_frequency'],
    special_tokens=['[PAD]', '[SOS]', '[EOS]', '[UNK]'],
    suffix=''
)

# 토크나이저 훈련
kor_tokenizer.train(files=['C:/Users/user/Desktop/vs/deeprunnning/train_koeran.txt'], trainer=trainer)

여기서 코드에 오류가 발견되었는데요.

원래는 `BPETokenizer`라는 코드를 사용하지만 패키지가 먹지 않아서 수정했습니다.

"Requirement already satisfied" 메시지는 이미 해당 패키지가 설치되어 있다는 의미입니다.

 따라서 `tokenizers` 패키지는 이미 설치되어 있는 것으로 보입니다.

이미 `tokenizers` 패키지가 설치되어 있다면 추가적으로 설치할 필요가 없습니다. 여기서 중요한 것은 설치된 패키지를 Python 코드에서 사용하는 방법입니다. 코드에서 `BPETokenizer`를 가져올 때 다음과 같이 사용할 수 있어야 합니다:

from tokenizers import Tokenizer, models, trainers, pre_tokenizers, decoders


`BPETokenizer`를 직접 가져오는 대신 `tokenizers` 패키지의 하위 모듈을 사용하여 Tokenizer를 초기화하고 훈련시킬 수 있습니다. 이렇게 하면 패키지가 올바르게 활용될 것입니다.

from tokenizers import Tokenizer, models, trainers, pre_tokenizers, decoders

# 토크나이저 모델 설정
tokenizer_model = models.BPE()

# 토크나이저 초기화
eng_tokenizer = Tokenizer(tokenizer_model)

# 토크나이저 트레이너 초기화
trainer = trainers.BpeTrainer(
    vocab_size=params['vocab_size'],
    min_frequency=params['min_frequency'],
    special_tokens=['[PAD]', '[SOS]', '[EOS]', '[UNK]'],
    suffix=''
)

# 토크나이저 훈련
eng_tokenizer.train(files=['C:/Users/user/Desktop/vs/deeprunnning/trian_english.txt'], trainer=trainer)
패딩 옵션과 후처리 작업 등에 사용될 스페셜 토큰들의 아이디를 저장해줍니다.
pad_idx = kor_tokenizer.token_to_id('[PAD]')
sos_idx = kor_tokenizer.token_to_id('[SOS]')
eos_idx = kor_tokenizer.token_to_id('[EOS]')

02. 훈련된 토크나이저로 토큰화 진행

Tokenizers의 encode_batch 함수를 활용해 각 데이터들에 대해 토큰화 작업을 수행해 줍니다.

 

kor_encoded_data = kor_tokenizer.encode_batch(kor_lines)
eng_encoded_data = eng_tokenizer.encode_batch(eng_lines)

# 한국어 데이터 토큰화 작업 결과 출력
for origin, processed in zip(kor_lines[:3], kor_encoded_data[:3]):
    print(f'[Orig]: {origin}')
    print(f'[Proc]: {processed.tokens}\n')
>>>
[Orig]: 0 설정을 입력하고 안정될 때까지 5분 동안 기다린 후 OK 버튼을 길게 누르십시오.
[Proc]: ['0', ' ', '설정을 ', '입력', '하고 ', '안정', '될 때까지 ', '5', '분 동안 ', '기다', '린 ', '후 ', 'O', 'K', ' 버튼을 ', '길게 ', '누르', '십', '시', '오', '.']

[Orig]: 0은 그들에게 아무것도 아니었지만 무는 숫자일 수가 없습니다.
[Proc]: ['0', '은 ', '그들에게 ', '아무것도 ', '아니', '었지만 ', '무', '는 ', '숫자', '일', ' 수가 없', '습니다', '.']

[Orig]: 1,015버전에서 핫키 버그가 있습니다.
[Proc]: ['1,', '0', '15', '버전', '에서 ', '핫', '키', ' 버', '그', '가 있', '습니다', '.']


# 영어 데이터 토큰화 작업 결과 출력
for origin, processed in zip(eng_lines[:3], eng_encoded_data[:3]):
    print(f'[Orig]: {origin}')
    print(f'[Proc]: {processed.tokens}\n')
>>>
[Orig]: Enter into 0 setting, and wait for 5 minutes to make it stable, then long-press OK button.
[Proc]: ['En', 'ter ', 'into ', '0 ', 'sett', 'ing, and ', 'wait for ', '5 ', 'minutes to ', 'make it ', 'st', 'able', ', then ', 'long-', 'press ', 'O', 'K ', 'button', '.']

[Orig]: The zero was nothing for them but nothing couldn't be a number.
[Proc]: ['The ', 'zer', 'o ', 'was ', 'nothing ', 'for', ' them ', 'but ', 'nothing ', "couldn't ", 'be a ', 'number', '.']

[Orig]: There is a Hotkey bug in the 1,015 version.
[Proc]: ['There is a ', 'Hot', 'key ', 'bug ', 'in the ', '1,', '0', '15 ', 'vers', 'ion', '.']
 

토큰화가 잘 진행되었습니다.


3. 토큰화 결과에 후처리 로직 적용

- 이제 토큰화 작업이 수행된 결과에 [PAD] 토큰을 붙여줄 차례입니다. 

- [PAD] 토큰은 모델이 입력으로 받는 최대 길이 보다 길이가 짧은 문장들에 한해 부여되는 토큰이므로,

- 최대 길이로 설정할 적정 길이를 찾기 위해 각 언어 쌍의 평균 길이와 최대 길이를 계산합니다.      

# 한국어 데이터 평균 및 최대 길이 계산
kor_len_max = max(len(line.tokens) for line in kor_encoded_data)
kor_len =0

for line in kor_encoded_data:
    kor_len += len(line.tokens)
kor_len_avg = kor_len / len(kor_encoded_data)

kor_len_avg, kor_len_max
>>> (11.6915675, 53)

# 영어 데이터 평균 및 최대 길이 계산
eng_len_max = max(len(line.tokens) for line in eng_encoded_data)
eng_len = 0

for line in eng_encoded_data:
    eng_len += len(line.tokens)
eng_len_avg = eng_len / len(eng_encoded_data)

eng_len_avg, eng_len_max
>>> (10.750185, 65)

# 데이터셋 내 문장들이 그렇게 긴 편이 아니므로 32로 입력 값의 최대 길이로 정해주기
params['max_len'] = 32

# 마지막으로 [PAD] 토큰을 붙여주는 pad_sentence 함수와
# 문장의 시작과 끝을 알리는 [SOS], [EOS] 토큰을 붙여주는 후처리 함수 postprocess를 정의

def pad_sentence(input_ids):
    '''최대 길이보다 짧은 문장들에 [PAD] 토큰 부여'''

    num_pad = params['max_len'] - len(input_ids)
    input_ids.extend([pad_idx] * num_pad)
    return input_ids
    
def postprocess(input_ids):
    '''입력 문장에 [SOS] 토큰과 [EOS] 토큰 부여'''
    input_ids = pad_sentence(input_ids)

    input_ids = [sos_idx] + input_ids

    input_ids = input_ids[:params['max_len']]

    if pad_idx in input_ids:
        pad_start = input_ids.index(pad_idx)
        input_ids[pad_start] = eos_idx

    else:
        input_ids[-1] = eos_idx
    
    return input_ids
    
# 앞서 정의한 두 함수를 이용하면 결과 값이 다음과 같이 바뀝니다.
# 기본 토큰화 작업 결과
sent = '우리 진짜 별나대 그냥 내가 너무 좋아해 넌 그걸 너무 잘 알고 날 쥐락펴락해 나도 마찬가지인걸'

proc_sent = kor_tokenizer.encode(sent)
print(f'토큰화 결과: {proc_sent.tokens}')
>>> 토큰화 결과: ['우리 ', '진짜 ', '별', '나', '대 ', '그냥 ', '내가 너무 ', '좋아', '해 ', '넌 ', '그걸 ', '너무 ', '잘 ', '알고 ', '날 ', '쥐', '락', '펴', '락', '해 ', '나도 ', '마찬가지', '인', '걸']

# 토큰화 + 후철리 작업 결과

post_proc_sent = postprocess(proc_sent.ids)

print(f'후처리 결과: {post_proc_sent}\n')
print(f'후처리 해석: {kor_tokenizer.decode(post_proc_sent)}')
>>> 후처리 결과: [1, 2154, 4101, 945, 408, 2475, 3012, 13820, 2127, 2056, 3575, 4315, 2182, 2139, 6525, 2470, 1483, 699, 1860, 699, 2056, 2466, 5346, 1400, 219, 2, 0, 0, 0, 0, 0, 0]
후처리 해석: 우리  진짜  별 나 대  그냥  내가 너무  좋아 해  넌  그걸  너무  잘  알고  날  쥐 락 펴 락 해  나도  마찬가지 인 걸

# 이제 후처리 함수를 활용해 모든 데이터셋들에 대해 후처리 작업을 진행해줍니다.
kor_processed_data = [postprocess(data.ids) for data in kor_encoded_data]
eng_processed_data = [postprocess(data.ids) for data in eng_encoded_data]

4. 모든 데이터셋을 텐서형 데이터로 변환  

- 전처리와 후처리를 모두 마친 테이블을 torch.Tensor로 변환해 줍니다.- 변환 후, DataLoader를 활용해 데이터들을 배치로 만들어줍니다.

import torch
from torch.utils.data import DataLoader

torch.backends.cudnn.deterministic = True
#  학습 중에 무작위성을 제거하여 항상 동일한 결과를 얻을 수 있도록 하는 PyTorch 설정

device= torch.device('cuda' if torch.cuda.is_available() else 'cpu')

kor_tensors = [torch.LongTensor(line).to(device) for line in kor_processed_data]
eng_tensors = [torch.LongTensor(line).to(device) for line in eng_processed_data]

src_iter = DataLoader(kor_tensors, batch_size=params['batch_size'])
tgt_iter = DataLoader(eng_tensors, batch_size=params['batch_size'])

-- 오류 수정 -- 

from torch.utils.data import DataLoader

# ...

src_iter = DataLoader(kor_tensors, batch_size=params['batch_size'])
tgt_iter = DataLoader(eng_tensors, batch_size=params['batch_size'])

이 오류는 `data.DataLoader`가 잘못된 모듈에서 가져와졌거나 불러오려는 대상 데이터 프레임 `kor_processed_data` 또는 `eng_processed_data`가 이전 코드에서 올바르게 정의되지 않았을 때 발생합니다.

`data.DataLoader`는 PyTorch의 `torch.utils.data` 모듈에 속해 있으므로, 다음과 같이 코드를 수정해야 합니다:
또한, `kor_processed_data` 및 `eng_processed_data`가 이전 코드에서 정의되었는지 확인하십시오. 이 데이터 프레임들이 정의되지 않았거나 잘못 정의된 경우 해당 부분을 수정하십시오.

모듈 및 데이터 프레임 정의가 올바로 되었는지 확인한 후 코드를 다시 실행하면 이 문제가 해결될 것입니다.

 


5. Transformer 모델링

먼저 모델 구현에 필요한 라이브러리들을 모두 임포트 합니다

실험을 함에 있어 항상 실험의 Reproducibility를 보장하기 위해 Seed 설정을 해줍니다.

 
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import numpy as np
import matplotlib.pyplot as plt

# Reproducible
torch.manual_seed(32)  # CPU에서의 난수 생성 시드를 설정
torch.cuda.manual_seed(32)  #  GPU에서의 난수 생성 시드를 설정
torch.backends.cudnn.deterministic = True  
# PyTorch에서 CUDA 연산을 수행할 때, cuDNN 라이브러리의 동작을 결정적으로 만들어, 동일한 입력에 대해 항상 동일한 출력을 생성

5 - 1.(Masked) Multi-Head Attention 구현

class MultiHeadAttention(nn.Module):
    '''멀티 헤드 어텐션 레이어'''
    def __init__(self, params):
        super(MultiHeadAttention, self).__init__()
        assert params['hidden_dim'] % params['num_heads'] == 0, "hidden dimension must be divisible by the number of heads"
        self.num_heads = params['num_heads']
        self.attn_dim = params['hidden_dim'] // self.num_heads
        
        self.q_w = nn.Linear(params['hidden_dim'], self.num_heads * self.attn_dim)
        self.k_w = nn.Linear(params['hidden_dim'], self.num_heads * self.attn_dim)
        self.v_w = nn.Linear(params['hidden_dim'], self.num_heads * self.attn_dim)
        
        self.o_w = nn.Linear(self.num_heads * self.attn_dim, params['hidden_dim'])
        
    def forward(self, q, k, v, mask=None):
        " q, k, v = [배치 사이즈, 문장 길이, 은닉 차원] "
        
        batch_size = q.size(0)
        
        q = self.q_w(q).view(batch_size, -1, self.num_heads, self.attn_dim).transpose(1, 2)
        k = self.k_w(k).view(batch_size, -1, self.num_heads, self.attn_dim).transpose(1, 2)
        v = self.v_w(v).view(batch_size, -1, self.num_heads, self.attn_dim).transpose(1, 2)
        # q, k, v = [배치 사이즈, 헤드 갯수, 문장 길이, 어텐션 차원]
        
        attn = torch.matmul(q, k.transpose(-1, -2))
        # attn = [배치 사이즈, 헤드 갯수, 문장 길이, 문장 길이]
        
        if mask is not None:
            mask = mask.unsqueeze(1)
            attn.masked_fill(mask==0, -1e9)
        
        score = F.softmax(attn, dim=-1)
        # score = [배치 사이즈, 헤드 갯수, 문장 길이, 문장 길이]
        
        output = torch.matmul(score, v)
        # output = [배치 사이즈, 헤드 갯수, 문장 길이, 어텐션 차원]
        
        output = output.transpose(1, 2).contiguous()
        # output = [배치 사이즈, 문장 길이, 헤드 갯수, 어텐션 차원]
        # PyTorch 텐서를 연속 메모리로 배치하는 역할을 합니다. 
        # 연속 메모리 텐서는 일부 연산과 메모리 복사 작업에서 효율적으로 사용됩니다.
        
        output = output.view(batch_size, -1, self.num_heads * self.attn_dim)
        # output = [배치 사이즈, 문장 길이, 은닉 차원]
        
        output = self.o_w(output)
        # output = [배치 사이즈, 문장 길이, 은닉 차원]
        
        return output, score
        
def create_subsequent_mask(tgt):
    batch_size, tgt_len = tgt.size()
    
    subsequent_mask = torch.triu(torch.ones(tgt_len, tgt_len), diagonal=1).bool()
    # subsequent_mask = [타겟 문장 길이, 타겟 문장 길이]
    
    subsequent_mask = subsequent_mask.unsqueeze(0).repeat(batch_size, 1, 1).to(device)
    # subsquent_mask = [배치 사이즈, 타겟 문장 길이, 타겟 문장 길이]
    
    return subsequent_mask

## 테스트 코드
test_sent = '왜들 그리 다운돼있어? 뭐가 문제야 say something 분위기가 겁나 싸해'
test_tensor = kor_tokenizer.encode(test_sent)
test_tensor = torch.LongTensor(test_tensor.ids).to(device).unsqueeze(0)
# device에 올라간 것을 matplotlib으로 그릴 수 없기 때문에 cpu로 내려주기

plt.figure(figsize=(5,5))
plt.imshow(create_subsequent_mask(test_tensor).cpu()[0])


def create_src_mask(src):
    " source = [배치 사이즈, 소스 문장 길이] "

    src_len = src.size(1)
    
    src_mask = (src == pad_idx)
    # src_mask = [배치 사이즈, 소스 문장 길이]
    
    src_mask = src_mask.unsqueeze(1).repeat(1, src_len, 1)
    # src_mask = [배치 사이즈, 소스 문장 길이, 소스 문장 길이]

    return src_mask.to(device)


def create_tgt_mask(src, tgt):
    " src = [배치 사이즈, 소스 문장 길이] "
    " tgt = [배치 사이즈, 타겟 문장 길이] "
    
    batch_size, tgt_len = tgt.size()
    
    subsequent_mask = create_subsequent_mask(tgt)
    
    enc_dec_mask = (src == pad_idx)
    tgt_mask = (tgt == pad_idx)
    # src_mask = [배치 사이즈, 소스 문장 길이]
    # tgt_mask = [배치 사이즈, 타겟 문장 길이]
    
    enc_dec_mask = enc_dec_mask.unsqueeze(1).repeat(1, tgt_len, 1).to(device)
    tgt_mask = tgt_mask.unsqueeze(1).repeat(1, tgt_len, 1).to(device)
    # src_mask = [배치 사이즈, 타겟 문장 길이, 소스 문장 길이]
    # tgt_mask = [배치 사이즈, 타겟 문장 길이, 타겟 문장 길이]

    tgt_mask = tgt_mask | subsequent_mask
    
    return enc_dec_mask, tgt_mask
    
src = torch.tensor([[1, 2, 3, 4, 2, 11, 28, 7, 0, 0, 0]])
src_mask = create_src_mask(src)
plt.figure(figsize=(5,5))
plt.imshow(src_mask.cpu()[0])

tgt = torch.tensor([[1, 2, 3, 4, 2, 11, 28, 7, 99, 987, 1024, 0, 0]])
enc_dec_mask, tgt_mask = create_tgt_mask(src, tgt)

# 아래 그림은 타겟 문장이 소스 문장에 Attention을 취할 때 [PAD] 토큰이 마스킹 되는 예입니다.
plt.figure(figsize=(5,5))
plt.imshow(enc_dec_mask.cpu()[0])

# 아래 예는 타겟 문장에서 Self-Attention 연산이 취해질 때 
# 타임 스텝 상 뒤에 위치하는 토큰들과 [PAD] 토큰들이 마스킹 되는 예입니다.
plt.figure(figsize=(5,5))
plt.imshow(tgt_mask.cpu()[0])

5 - 2. Position-wise Feed-Forward 네트워크 구현

 
## 5-2. Position-wise Feed-Forward 네트워크 구현class PositionwiseFeedForward(nn.Module):
'''포지션 와이즈 피드 포워드 레이어'''
class PositionwiseFeedForward(nn.Module):
    def __init__(self, params):
        super(PositionwiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(params['hidden_dim'], params['ffn_dim'])
        self.fc2 = nn.Linear(params['ffn_dim'], params['hidden_dim'])
        self.dropout = nn.Dropout(params['dropout'])
    
    def forward(self, x):
        " x = [배치 사이즈, 문장 길이, 은닉 차원] "

        x = self.dropout(F.relu(self.fc1(x)))
        x = self.fc2(x)
        return x

여기서 오류가 발생합니다.

오류가 발생하는 이유는 코드에서 `super(PositionwiseFeedForward, self).__init__()` 부분과 `forward` 메서드의 들여 쓰기가 올바르게 되지 않았기 때문입니다. 들여 쓰기는 파이썬에서 매우 중요하므로 정확하게 맞춰야 합니다.

아래는 수정된 코드입니다:

import torch.nn as nn
import torch.nn.functional as F

class PositionwiseFeedForward(nn.Module):
    def __init__(self, params):
        super(PositionwiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(params['hidden_dim'], params['ffn_dim'])
        self.fc2 = nn.Linear(params['ffn_dim'], params['hidden_dim'])
        self.dropout = nn.Dropout(params['dropout'])
    
    def forward(self, x):
        " x = [배치 사이즈, 문장 길이, 은닉 차원] "

        x = self.dropout(F.relu(self.fc1(x)))
        x = self.fc2(x)
        return x


위의 코드에서는 `super(PositionwiseFeedForward, self).__init__()`와 `forward` 메서드의 들여 쓰기를 올바르게 수정하여 오류를 해결했습니다. 이제 코드가 정상적으로 작동해야 합니다.

 


5 - 3. PositionalEncoding 네트워크 구현

class PositionalEncoding(nn.Module):
    def __init__(self, params):
        super(PositionalEncoding, self).__init__()
        sinusoid = np.array([pos / np.power(10000, 2 * i / params['hidden_dim'])
                            for pos in range(params['max_len']) for i in range(params['hidden_dim'])])
        # sinusoid = [문장 최대 길이 * 은닉 차원]

        sinusoid = sinusoid.reshape(params['max_len'], -1)
        # sinusoid = [문장 최대 길이, 은닉 차원]

        sinusoid[:, 0::2] = np.sin(sinusoid[:, 0::2])
        sinusoid[:, 1::2] = np.cos(sinusoid[:, 1::2])
        sinusoid = torch.FloatTensor(sinusoid).to(device)

        self.embedding = nn.Embedding.from_pretrained(sinusoid, freeze=True)
        
    def forward(self, x):
        " x = [배치 사이즈, 문장 길이] "
        
        pos = torch.arange(x.size(-1), dtype=torch.long).to(device)
        # pos = [배치 사이즈, 문장 길이]

        embed = self.embedding(pos)
        # embed = [배치 사이즈, 문장 길이, 은닉 차원]
        return embed

5 - 4. Transformer 인코더 부 구현

class EncoderLayer(nn.Module):
    '''인코더 레이어'''
    def __init__(self, params):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(params)
        self.layer_norm1 = nn.LayerNorm(params['hidden_dim'])
        self.feed_forward = PositionwiseFeedForward(params)
        self.layer_norm2 = nn.LayerNorm(params['hidden_dim'])
        self.dropout = nn.Dropout(params['dropout'])
        
    def forward(self, x, src_mask):
        " x = [배치 사이즈, 문장 길이, 은닉 차원] "
        
        residual = x
        x, _ = self.self_attn(x, x, x, src_mask)
        x = self.dropout(x)
        x = residual + x
        x = self.layer_norm1(x)
        
        residual = x
        x = self.feed_forward(x)
        x = self.dropout(x)
        x = residual + x
        x = self.layer_norm2(x)
        
        return x


class Encoder(nn.Module):
    '''트랜스포머 인코더'''
    def __init__(self, params):
        super(Encoder, self).__init__()
        self.tok_embedding = nn.Embedding(params['vocab_size'], params['hidden_dim'], padding_idx=pad_idx)
        self.pos_embedding = PositionalEncoding(params)
        self.layers = nn.ModuleList([EncoderLayer(params) for _ in range(params['num_layers'])])
        
    def forward(self, src):
        " src = [배치 사이즈, 소스 문장 길이] "

        src_mask = create_src_mask(src)
        src = self.tok_embedding(src) + self.pos_embedding(src)
        
        for layer in self.layers:
            src = layer(src, src_mask)
            
        # src = [배치 사이즈, 소스 문장 길이, 은닉 차원]
        return src

5 - 4. Transformer 디코더 부 구현

class DecoderLayer(nn.Module):
    '''디코더 레이어'''
    def __init__(self, params):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(params)
        self.layer_norm1 = nn.LayerNorm(params['hidden_dim'])

        self.enc_dec_attn = MultiHeadAttention(params)
        self.layer_norm2 = nn.LayerNorm(params['hidden_dim'])
        
        self.feed_forward = PositionwiseFeedForward(params)
        self.layer_norm3 = nn.LayerNorm(params['hidden_dim'])
        
        self.dropout = nn.Dropout(params['dropout'])
        
    def forward(self, x, tgt_mask, enc_output, src_mask):
        " x = [배치 사이즈, 문장 길이, 은닉 차원] "
        
        residual = x
        x, _ = self.self_attn(x, x, x, tgt_mask)
        x = self.dropout(x)
        x = residual + x
        x = self.layer_norm1(x)
        
        residual = x
        x, attn_map = self.enc_dec_attn(x, enc_output, enc_output, src_mask)
        x = self.dropout(x)
        x = residual + x
        x = self.layer_norm2(x)
        
        residual = x
        x = self.feed_forward(x)
        x = self.dropout(x)
        x = residual + x
        x = self.layer_norm3(x)
        
        return x, attn_map


class Decoder(nn.Module):
    '''트랜스포머 디코더'''
    def __init__(self, params):
        super(Decoder, self).__init__()
        self.tok_embedding = nn.Embedding(params['vocab_size'], params['hidden_dim'], padding_idx=pad_idx)
        self.pos_embedding = PositionalEncoding(params)
        self.layers = nn.ModuleList([DecoderLayer(params) for _ in range(params['num_layers'])])
        
    def forward(self, tgt, src, enc_out):
        " tgt = [배치 사이즈, 타겟 문장 길이] "

        src_mask, tgt_mask = create_tgt_mask(src, tgt)
        tgt = self.tok_embedding(tgt) + self.pos_embedding(tgt)
        
        for layer in self.layers:
            tgt, attn_map = layer(tgt, tgt_mask, enc_out, src_mask)
            
        tgt = torch.matmul(tgt, self.tok_embedding.weight.transpose(0, 1))
        # tgt = [배치 사이즈, 타겟 문장 길이, 은닉 차원]

        return tgt, attn_map

5 - 6. Transformer 구현

class Transformer(nn.Module):
    '''트랜스포머 네트워크'''
    def __init__(self, params):
        super(Transformer, self).__init__()
        self.encoder = Encoder(params)
        self.decoder = Decoder(params)
    
    def forward(self, src, tgt):
        " src = [배치 사이즈, 소스 문장 길이] "
        " tgt = [배치 사이즈, 타겟 문장 길이] "
        
        enc_out = self.encoder(src)
        dec_out, attn = self.decoder(tgt, src, enc_out)
        return dec_out, attn
    
    def count_params(self):
        return sum(p.numel() for p in self.parameters() if p.requires_grad)
        
## Transformer 에서는 Adam Optimizer에 일부 스케줄 옵션을 적용해 사용하고 있습니다.

5 - 7. Transformer Optimizer구현

class ScheduledOptim:
    '''스케줄 옵티마이저'''
    def __init__(self, optimizer, warmup_steps):
        self.init_lr = np.power(params['hidden_dim'], -0.5)
        self.optimizer = optimizer
        self.step_num = 0
        self.warmup_steps = warmup_steps
    
    def step(self):
        self.step_num += 1
        lr = self.init_lr * self.get_scale()
        
        for p in self.optimizer.param_groups:
            p['lr'] = lr
            
        self.optimizer.step()
    
    def zero_grad(self):
        self.optimizer.zero_grad()  # 학습을 시작하기 전에 0으로 세팅
    
    def get_scale(self):
        return np.min([
            np.power(self.step_num, -0.5),
            self.step_num * np.power(self.warmup_steps, -1.5)
        ])

6. 모델 학습

 

# 모델 정의

model = Transformer(params)
model.to(device)
print(f'The model has {model.count_params():,} trainable parameters')


# 로스 함수 정의

criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)
criterion.to(device)


# 옵티마이저 정의

optimizer = ScheduledOptim(
    optim.Adam(model.parameters(), betas=[0.9, 0.98], eps=1e-9),
    warmup_steps=4000
)
params['num_epoch'] = 20

from tqdm import tqdm

# 훈련 로직

start_tic = time.time()

for epoch in tqdm(range(params['num_epoch']), desc="epoch"):
    model.train()
    epoch_loss = 0

    tic = time.time()
    for src, tgt in tqdm(zip(src_iter, tgt_iter), total=len(src_iter), desc="training", leave=False, mininterval=10):
        " src = [배치 사이즈, 소스 문장 길이] "
        " tgt = [배치 사이즈, 타겟 문장 길이] "

        optimizer.zero_grad()

        logits, _ = model(src, tgt[:, :-1])
        # logits = [배치 사이즈, 타겟 문장 길이, 은닉 차원]

        logits = logits.contiguous().view(-1, logits.size(-1))
        # logits = [(배치 사이즈 * 타겟 문장 길이) - 1, 은닉 차원]
        golds = tgt[:, 1:].contiguous().view(-1)
        # golds = [(배치 사이즈 * 타겟 문장 길이) - 1]

        loss = criterion(logits, golds)
        epoch_loss += loss.item()

        loss.backward()
#         torch.nn.utils.clip_grad_norm_(model.parameters(), self.params.clip)
        optimizer.step()

    train_loss = epoch_loss / len(src_iter)
    toc = time.time()

    print(f'Epoch: {epoch+1:02} | Train Loss: {train_loss:.3f} | Time: {toc - tic}')

    tic = time.time()
    # torch.save(model.state_dict(), f'/content/drive/MyDrive/transformer/epoch_{epoch}.pth')
    torch.save(model.state_dict(), f'./epoch_{epoch}.pth')
    toc = time.time()
    print(f"저장한 파일 이름 : ./epoch_{epoch}.pth  저장하는 데 걸린 시간 : {toc-tic}")

end_tic = time.time()
print("총 걸린 시간 : ", end_tic - start_tic)

이 코드는 어마어마한 시간이 걸립니다.. 특히 저처럼 vscode로 돌렸을 때요 ㅎㅎ

'The model has 64,618,496 trainable parameters'라고 하니 어마어마한 숫자네요.

참고로 저는 1 에폭당 5~6시간 정도 소요됐습니다. (학원 컴이 그렇게 좋진 않은 것 같아요)

다음 실습에서는 코랩으로 많이 돌려보고 싶습니다.

# 모델의 state_dict 저장
torch.save(model.state_dict(), './final.pth')

# Transformer 모델 인스턴스 생성 후에, state_dict 불러오기
state_dict = torch.load('./my_model.pth')

# 모델에 state_dict 로드
model.load_state_dict(state_dict)

# 모델을 평가 모드로 설정
model.eval()

type(model)

7. 학습 결과 확인

이제 학습된 모델이 어느 정도 성능을 보이는지 확인해 볼 차례입니다.

device = "cpu"

model.to(device)

# 모델을 평가 모드로 설정
model.eval()

# 입력 문장
sent = '나'

# 문장 토큰화
proc_sent = kor_tokenizer.encode(sent)

# 후처리
post_proc_sent = postprocess(proc_sent.ids)

# Tensor로 변환 및 배치 차원 추가
input_tensor = torch.LongTensor(post_proc_sent).to(device)
input_tensor = input_tensor.unsqueeze(0)

# 출력 문장 초기화
output_tensor = torch.LongTensor([1]).to(device).unsqueeze(0)  # <sos> 토큰 + 배치 차원 추가

# 디코딩
with torch.no_grad():
    for _ in range(50):  # 최대 길이
        logits, _ = model(input_tensor, output_tensor)
        next_token = logits.argmax(-1)[:,-1]
        output_tensor = torch.cat([output_tensor, next_token.unsqueeze(-1)], dim=-1)
        if next_token.item() == 2:  # <eos> 토큰
            break

# 결과 디코딩
decoded_output = eng_tokenizer.decode(output_tensor.squeeze().tolist())
print(decoded_output)
# ...
# 디코딩
with torch.no_grad():
    for _ in range(50):  # 최대 길이
        logits, _ = model(input_tensor, output_tensor)

        print("Logits: ", logits)  # 로짓 값 출력
        print("Argmax: ", logits.argmax(-1))  # 로짓의 argmax 결과 출력

        next_token = logits.argmax(-1)[:,-1]
        output_tensor = torch.cat([output_tensor, next_token.unsqueeze(0)], dim=-1)

        print("Output Tensor: ", output_tensor)  # 현재까지의 출력 텐서 값 출력

        if next_token.item() == 2:  # <eos> 토큰
            break

# 결과 디코딩
print("Decoding Output Tensor: ", output_tensor.squeeze().tolist())
decoded_output = eng_tokenizer.decode(output_tensor.squeeze().tolist())
print(decoded_output)

이렇게 길고 긴... 트랜스포머 모델 직접 손으로 쳐보기 실습이 끝났습니다.

밑바닥부터 시작하는 딥러닝이라는 책도 예제를 올릴까 했는데 고민이 되네요..(너무 코드만 좔좔좔 있어서)

마지막으로 원본 github 주소를 남기겠습니다.

https://github.com/Huffon/nlp-various-tutorials/blob/master/transformer-aihub.ipynb