Pytorch 기초부터 시작하는 NLP 시리즈
1. 문자-단위 RNN으로 이름 분류하기
2. 문자-단위 RNN으로 이름 생성하기
3. Sequence to Sequence 네트워크와 Attention을 이용한 번역
Pytorch 기초부터 시작하는 NLP: Sequence to Sequence 네트워크와 Attention을 이용한 번역
페이지: https://tutorials.pytorch.kr/intermediate/seq2seq_translation_tutorial.html
Author: Sean Robertson
번역: 황성수
Pytorch 기초부터 시작하는 NLP 마지막 튜토리얼입니다. 튜토리얼의 내용은 '프랑스어를 영어로 번역하는 모델 학습하기' 입니다. 제가 재해석한 코드를 공유드립니다. 큰 틀에서 작동원리는 동일합니다.
Step 1: 데이터셋 구성 코드
import unicodedata
import re
import random
SOS_token = 0
EOS_token = 1
# 어휘 관리 클래스: 단어를 인덱스와 매핑하고, 단어 빈도를 추적
class Lang:
def __init__(self, name):
self.name = name
self.word2index = {} # 단어 -> 인덱스 매핑
self.word2count = {} # 단어 -> 등장 횟수
self.index2word = {0: "SOS", 1: "EOS"} # 인덱스 -> 단어 매핑
self.n_words = 2 # vocab size
# 문장을 단어로 분리하여 각 단어를 사전에 추가
def addSentence(self, sentence):
for word in sentence.split(' '):
self.addWord(word)
def addWord(self, word):
if word not in self.word2index:
self.word2index[word] = self.n_words # 단어에 새 인덱스 부여
self.index2word[self.n_words] = word # 인덱스 -> 단어 매핑
self.word2count[word] = 1 # 단어 등장 횟수 초기화
self.n_words += 1 # 단어 수 증가
else:
self.word2count[word] += 1 # 단어가 이미 있다면 등장 횟수 증가
# 유니 코드 문자열을 일반 ASCII로 변환
# https://stackoverflow.com/a/518232/2809427
def unicodeToAscii(s):
return ''.join(
c for c in unicodedata.normalize('NFD', s)
if unicodedata.category(c) != 'Mn'
)
# 소문자, 다듬기, 그리고 문자가 아닌 문자 제거
# re: Regular Expression, sub: Substitute, r"": raw string, \1: matching word
# re.sub(pattern, repl, string): pattern: 정규식 패턴, repl: 대체할 문자열, string: 변환할 대상 문자열.
def normalizeString(s):
s = unicodeToAscii(s.lower().strip())
s = re.sub(r"([.!?])", r" \1", s)
s = re.sub(r"[^a-zA-Z!?]+", r" ", s)
return s.strip()
def readLangs(lang1, lang2, reverse=False):
# 파일을 읽고 줄로 분리
file = open(f'../data/{lang1}-{lang2}.txt', encoding='utf-8')
lines = file.read().strip().split('\n')
# 모든 줄을 쌍으로 분리하고 정규화
pairs = []
for line in lines: # 파일의 각 줄에 대해 반복
pair = line.split('\t') # 탭(\t)을 기준으로 줄을 나눠 문장 쌍을 생성
normalized_pair = []
for sentence in pair: # 문장 쌍의 각 문장에 대해 반복
normalized_pair.append(normalizeString(sentence)) # 문장을 정규화 후 추가
pairs.append(normalized_pair) # 정규화된 문장 쌍을 리스트에 추가
# 쌍을 뒤집고, Lang 인스턴스 생성
if reverse:
pairs = [list(reversed(p)) for p in pairs]
input_lang = Lang(lang2)
output_lang = Lang(lang1)
else:
input_lang = Lang(lang1)
output_lang = Lang(lang2)
return input_lang, output_lang, pairs
# 문장 내, 단어 수가 10개를 넘지 않아야 함
MAX_LENGTH = 10
eng_prefixes = (
"i am ", "i m ",
"he is", "he s ",
"she is", "she s ",
"you are", "you re ",
"we are", "we re ",
"they are", "they re "
)
def filterPair(pair):
return len(pair[0].split(' ')) < MAX_LENGTH and \
len(pair[1].split(' ')) < MAX_LENGTH and \
pair[1].startswith(eng_prefixes)
def filterPairs(pairs):
filtered_pairs = []
for pair in pairs:
if filterPair(pair):
filtered_pairs.append(pair)
return filtered_pairs
def prepareData(lang1, lang2, reverse=False):
input_lang, output_lang, pairs = readLangs(lang1, lang2, reverse)
pairs = filterPairs(pairs)
for pair in pairs:
input_lang.addSentence(pair[0])
output_lang.addSentence(pair[1])
return input_lang, output_lang, pairs
if __name__ == '__main__':
# Example 1
lang = Lang("English")
input_str = "Hello, world!"
lang.addSentence(input_str)
print(f"\nExample 1, Lang name: {lang.name}, addSentence: {input_str}, word2index: {lang.word2index}, "
f"word2count:{lang.word2count}, index2word: {lang.index2word}, n_words: {lang.n_words}")
# Example 2
print(f"Example 2, normalizeString({input_str}): {normalizeString(input_str)}")
# Example 3
input_lang, output_lang, pairs = prepareData('eng', 'fra', True)
print(f"Example 3, Random pair [france -> english]: {random.choice(pairs)}")
실행 결과
Example 1
Lang name: English, addSentence: Hello, world!, word2index: {'Hello,': 2, 'world!': 3},
word2count:{'Hello,': 1, 'world!': 1}, index2word: {0: 'SOS', 1: 'EOS', 2: 'Hello,', 3: 'world!'},
n_words: 4
Example 2
normalizeString(Hello, world!): hello world !
Example 3
Random pair [france -> english]:
['je suis desolee je ne vous ai pas reconnue', 'i m sorry i didn t recognize you']
Step 2: Encoder와 AttentionDecoder 정의
import torch
import torch.nn as nn
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
SOS_token = 0
EOS_token = 1
MAX_LENGTH = 10
class EncoderRNN(nn.Module):
def __init__(self, input_size, hidden_size, dropout_p=0.1):
super(EncoderRNN, self).__init__()
self.hidden_size = hidden_size
# 입력 인덱스(input_indices)는 Embedding 가중치 행렬(input_size)의 특정 행(row)을 선택
self.embedding = nn.Embedding(num_embeddings=input_size, embedding_dim=hidden_size)
# hidden_size: 입력 크기와 출력 히든 상태의 크기 (같은 크기로 설정)
self.gru = nn.GRU(input_size=hidden_size, hidden_size=hidden_size, batch_first=True)
self.dropout = nn.Dropout(dropout_p)
def forward(self, input):
embedded = self.dropout(self.embedding(input)) # [batch_size, seq_len, hidden_size]
# output: 모든 타임 스텝의 히든 상태 [batch_size, seq_len, hidden_size]
# hidden: 마지막 타임 스텝의 히든 상태 [num_layers, batch_size, hidden_size]
output, hidden = self.gru(embedded)
return output, hidden
class BahdanauAttention(nn.Module):
def __init__(self, hidden_size):
super(BahdanauAttention, self).__init__()
self.Wa = nn.Linear(hidden_size, hidden_size)
self.Ua = nn.Linear(hidden_size, hidden_size)
self.Va = nn.Linear(hidden_size, 1)
self.softmax = nn.Softmax(dim=-1)
def forward(self, query, keys):
scores = self.Va(torch.tanh(self.Wa(query) + self.Ua(keys))) # query: h_{dec,t}, keys: h_{enc,i}
scores = scores.squeeze(2).unsqueeze(1) # Softmax 적용 값은 마지막 차원: [batch_size, 4, 1] -> [batch_size, 1, 4]
weights = self.softmax(scores)
context = torch.bmm(weights, keys) # [B,N,M] x [B,M,P] = [B,N,P]
return context, weights
class AttnDecoderRNN(nn.Module):
def __init__(self, hidden_size, output_size, dropout_p=0.1):
super(AttnDecoderRNN, self).__init__()
self.embedding = nn.Embedding(num_embeddings=output_size, embedding_dim=hidden_size)
self.attention = BahdanauAttention(hidden_size)
self.gru = nn.GRU(input_size=2 * hidden_size, hidden_size=hidden_size, batch_first=True)
self.out = nn.Linear(hidden_size, output_size)
self.dropout = nn.Dropout(dropout_p)
self.LogSoftmax = nn.LogSoftmax(dim=-1)
def forward(self, encoder_outputs, encoder_hidden, target_tensor=None):
batch_size = encoder_outputs.size(0)
# 시작 토큰(Start of Sentence Token, SOS_token), [B,1] = [[1],[1],...,[1]]
decoder_input = torch.empty(batch_size, 1, dtype=torch.long, device=device).fill_(SOS_token)
# hidden: 마지막 타임스텝의 히든 상태 [num_layers, batch_size, hidden_size]
decoder_hidden = encoder_hidden
decoder_outputs = []
attentions = []
for i in range(MAX_LENGTH):
decoder_output, decoder_hidden, attn_weights = self.forward_step(
decoder_input, decoder_hidden, encoder_outputs
)
decoder_outputs.append(decoder_output)
attentions.append(attn_weights)
if target_tensor is not None:
# Teacher forcing 포함: 목표를 다음 입력으로 전달
decoder_input = target_tensor[:, i].unsqueeze(1) # Teacher forcing
else:
# Teacher forcing 미포함: 자신의 예측을 다음 입력으로 사용
_, top_idx = decoder_output.topk(1)
decoder_input = top_idx.squeeze(-1).detach() # 입력으로 사용할 부분을 히스토리에서 분리
# [Batch_size, seq_len, vocab_size] -> vocab_size 에 대한 Softmax를 사용하여 추정 단어를 선택
decoder_outputs = torch.cat(decoder_outputs, dim=1)
decoder_outputs = self.LogSoftmax(decoder_outputs)
attentions = torch.cat(attentions, dim=1)
return decoder_outputs, decoder_hidden, attentions
def forward_step(self, input, hidden, encoder_outputs):
# decoder_input: [batch_size, 1, hidden_size]
embedded = self.dropout(self.embedding(input))
# 디코더의 히든 상태: [batch_size, 1, hidden_size]
query = hidden.permute(1, 0, 2)
context, attn_weights = self.attention(query, encoder_outputs)
input_gru = torch.cat((embedded, context), dim=2)
output, hidden = self.gru(input_gru, hidden)
# *목표 언어의 모든 단어에 대한 점수(logit)
output = self.out(output)
return output, hidden, attn_weights
if __name__ == '__main__':
input_size = 10 # 입력 어휘 사전 크기 (예: 50개의 고유 단어)
hidden_size = 128 # GRU 히든 상태 크기
output_size = 10 # 출력 어휘 사전 크기 (예: 50개의 고유 단어)
batch_size = 2 # 배치 크기
seq_len = 10 # 입력 시퀀스 길이
encoder = EncoderRNN(input_size=input_size, hidden_size=hidden_size).to(device)
attention = BahdanauAttention(hidden_size)
decoder = AttnDecoderRNN(hidden_size=hidden_size, output_size=output_size).to(device)
input_tensor = torch.randint(0, input_size, (batch_size, seq_len), dtype=torch.long, device=device)
target_tensor = torch.randint(0, output_size, (batch_size, seq_len), dtype=torch.long, device=device)
keys = torch.randn(batch_size, seq_len, hidden_size)
query = torch.randn(batch_size, 1, hidden_size)
context, weights = attention(query, keys)
encoder_outputs, encoder_hidden = encoder(input_tensor)
decoder_outputs, decoder_hidden, _ = decoder(encoder_outputs, encoder_hidden, target_tensor=target_tensor)
# Example 1
print(f"Example 1: Input Tensor Shape: {input_tensor.shape}, Encoder Outputs Shape: {encoder_outputs.shape}, "
f"Encoder Hidden Shape: {encoder_hidden.shape}\n") # (1, 배치 크기, 히든 크기)
# Example 2
print(f"Example 2: Keys shape: {keys.shape}, Query shape: {query.shape}, Scores shape: {weights.shape}, "
f"Context shape: {context.shape}\n")
# Example 3
print(f"Example 3: Decoder Outputs Shape: {decoder_outputs.shape}, Decoder Hidden Shape: {decoder_hidden.shape}")
실행 결과
Example 1
Input Tensor Shape: torch.Size([2, 10]), Encoder Outputs Shape: torch.Size([2, 10, 128]),
Encoder Hidden Shape: torch.Size([1, 2, 128])
Example 2
Keys shape: torch.Size([2, 10, 128]), Query shape: torch.Size([2, 1, 128]),
Scores shape: torch.Size([2, 1, 10]), Context shape: torch.Size([2, 1, 128])
Example 3
Decoder Outputs Shape: torch.Size([2, 10, 10]), Decoder Hidden Shape: torch.Size([1, 2, 128])
Step 3: 훈련데이터 준비
"""
학습 데이터 준비
"""
import numpy as np
from torch.utils.data import TensorDataset, DataLoader, RandomSampler
from Tutorial3_step1 import *
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
SOS_token = 0
EOS_token = 1
MAX_LENGTH = 10
def indexesFromSentence(lang, sentence):
indexes = []
for word in sentence.split(' '):
indexes.append(lang.word2index[word])
return indexes
def tensorFromSentence(lang, sentence):
indexes = indexesFromSentence(lang, sentence)
indexes.append(EOS_token)
return torch.tensor(indexes, dtype=torch.long, device=device).view(1, -1) # size: [1, n], n: num of words
def get_dataloader(batch_size):
input_lang, output_lang, pairs = prepareData('eng', 'fra', True)
n = len(pairs)
input_ids = np.zeros((n, MAX_LENGTH), dtype=np.int32) # (11445, 10)
target_ids = np.zeros((n, MAX_LENGTH), dtype=np.int32) # (11445, 10)
for idx, (inp, tgt) in enumerate(pairs):
inp_ids = indexesFromSentence(input_lang, inp)
tgt_ids = indexesFromSentence(output_lang, tgt)
inp_ids.append(EOS_token)
tgt_ids.append(EOS_token)
input_ids[idx, :len(inp_ids)] = inp_ids # e.g. [ 116 116 3272 961 1 0 0 0 0 0]
target_ids[idx, :len(tgt_ids)] = tgt_ids # e.g. [ 2 3 146 32 2 3 65 1 0 0]
train_data = TensorDataset(torch.LongTensor(input_ids).to(device),
torch.LongTensor(target_ids).to(device))
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)
return input_lang, output_lang, train_dataloader
if __name__ == '__main__':
input_lang, output_lang, train_dataloader = get_dataloader(batch_size=1)
for i, (input_ids, target_ids) in enumerate(train_dataloader):
print(f"Input Ids (영어): {input_ids}")
print(f"Target Ids (프랑스어): {target_ids}")
break
실행 결과
Input Ids (영어): tensor([[ 143, 21, 242, 58, 1100, 99, 3327, 1, 0, 0]]
Target Ids (프랑스어): tensor([[ 13, 14, 144, 40, 981, 2020, 1, 0, 0, 0]]
Step 4: 모델 훈련 및 테스트 준비
"""
모델 훈련 준비
"""
import torch
import torch.nn as nn
from torch import optim
from Tutorial3_step1 import prepareData
from Tutorial3_step3 import tensorFromSentence
import time
import math
import random
SOS_token = 0
EOS_token = 1
MAX_LENGTH = 10
def asMinutes(s):
m = math.floor(s / 60)
s -= m * 60
return '%dm %ds' % (m, s)
def timeSince(since, percent):
now = time.time()
s = now - since
es = s / (percent)
rs = es - s
return '%s (- %s)' % (asMinutes(s), asMinutes(rs))
def train_epoch(dataloader, encoder, decoder):
total_loss = 0
criterion = nn.NLLLoss()
encoder_optimizer = optim.Adam(encoder.parameters(), lr=1e-3)
decoder_optimizer = optim.Adam(decoder.parameters(), lr=1e-3)
for data in dataloader:
input_tensor, target_tensor = data
encoder_optimizer.zero_grad()
decoder_optimizer.zero_grad()
encoder_outputs, encoder_hidden = encoder(input_tensor)
decoder_outputs, _, _ = decoder(encoder_outputs, encoder_hidden, target_tensor)
loss = criterion(
decoder_outputs.view(-1, decoder_outputs.size(-1)),
target_tensor.view(-1)
)
loss.backward()
encoder_optimizer.step()
decoder_optimizer.step()
total_loss += loss.item()
return total_loss / len(dataloader)
def train(train_dataloader, encoder, decoder, n_epochs):
start = time.time()
loss_total = 0 # Reset every print_every
for epoch in range(1, n_epochs + 1):
loss = train_epoch(train_dataloader, encoder, decoder)
loss_total += loss
if epoch % 1 == 0:
loss_avg = loss_total / 1
loss_total = 0
print('%s (%d %d%%) %.4f' % (timeSince(start, epoch / n_epochs), epoch, epoch / n_epochs * 100, loss_avg))
def evaluate(encoder, decoder, sentence, input_lang, output_lang):
with torch.no_grad():
input_tensor = tensorFromSentence(input_lang, sentence)
encoder_outputs, encoder_hidden = encoder(input_tensor)
decoder_outputs, decoder_hidden, decoder_attn = decoder(encoder_outputs, encoder_hidden)
_, topi = decoder_outputs.topk(1)
decoded_ids = topi.squeeze()
decoded_words = []
for idx in decoded_ids:
if idx.item() == EOS_token:
decoded_words.append('<EOS>')
break
decoded_words.append(output_lang.index2word[idx.item()])
return decoded_words, decoder_attn
def evaluateRandomly(encoder, decoder, n=3):
input_lang, output_lang, pairs = prepareData('eng', 'fra', True)
for i in range(n):
pair = random.choice(pairs)
print(f'Input sentence: {pair[0]}')
print(f'Target sentence: {pair[1]}')
output_words, _ = evaluate(encoder, decoder, pair[0], input_lang, output_lang)
output_sentence = ' '.join(output_words)
print(f'Output sentence: {output_sentence}\n')
Step 5: 모델 훈련 및 테스트 준비
import os
import torch
from Tutorial3_step2 import EncoderRNN, AttnDecoderRNN
from Tutorial3_step3 import get_dataloader
from Tutorial3_step4 import train, evaluateRandomly
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
hidden_size = 128
batch_size = 32
n_epochs = 80
# 가중치 파일 경로
encoder_path = "../weight/Tutorial3/encoder_weights.pth"
decoder_path = "../weight/Tutorial3/decoder_weights.pth"
input_lang, output_lang, train_dataloader = get_dataloader(batch_size)
encoder = EncoderRNN(input_lang.n_words, hidden_size).to(device)
decoder = AttnDecoderRNN(hidden_size, output_lang.n_words).to(device)
# 가중치 파일이 존재하는지 확인
if os.path.exists(encoder_path) and os.path.exists(decoder_path):
# 모델 가중치 불러오기
encoder.load_state_dict(torch.load(encoder_path))
decoder.load_state_dict(torch.load(decoder_path))
else:
# 모델 훈련 모드 설정
encoder.train()
decoder.train()
# 모델 훈련
train(train_dataloader, encoder, decoder, n_epochs)
# 모델 가중치 저장
torch.save(encoder.state_dict(), encoder_path)
torch.save(decoder.state_dict(), decoder_path)
# 모델 테스트 모드 설정
encoder.eval()
decoder.eval()
# 평가 진행
evaluateRandomly(encoder, decoder)
실행 결과
0m 9s (- 12m 19s) (1 1%) 2.4665
0m 18s (- 11m 52s) (2 2%) 1.6445
0m 26s (- 11m 31s) (3 3%) 1.3686
......
11m 41s (- 0m 0s) (80 100%) 0.0292
Input sentence: je vous laisse partir
Target sentence: i m letting you go
Output sentence: i m letting you go <EOS>
Input sentence: je suis satisfait de ses progres
Target sentence: i m satisfied with his progress
Output sentence: i m satisfied with his progress <EOS>
Input sentence: nous sommes toutes contentes d aider
Target sentence: we are all happy to help
Output sentence: we are all happy to help <EOS>
'딥러닝 > 자연어처리' 카테고리의 다른 글
[자연어처리] 기초부터 시작하는 NLP: 문자-단위 RNN으로 이름 생성하기 (0) | 2024.12.12 |
---|---|
[자연어처리] 기초부터 시작하는 NLP: 문자-단위 RNN으로 이름 분류하기 (0) | 2024.12.11 |