딥러닝/자연어처리

[자연어처리] 기초부터 시작하는 NLP: Sequence to Sequence 네트워크와 Attention을 이용한 번역

johyeongseob 2024. 12. 15. 21:49
Pytorch 기초부터 시작하는 NLP 시리즈

1. 문자-단위 RNN으로 이름 분류하기
2. 문자-단위 RNN으로 이름 생성하기
3. Sequence to Sequence 네트워크와 Attention을 이용한 번역

 

Pytorch 기초부터 시작하는 NLP: Sequence to Sequence 네트워크와 Attention을 이용한 번역

페이지: https://tutorials.pytorch.kr/intermediate/seq2seq_translation_tutorial.html

 

Author: Sean Robertson
번역: 황성수

 

 Pytorch 기초부터 시작하는 NLP 마지막 튜토리얼입니다. 튜토리얼의 내용은 '프랑스어를 영어로 번역하는 모델 학습하기' 입니다. 제가 재해석한 코드를 공유드립니다. 큰 틀에서 작동원리는 동일합니다.

 

Step 1: 데이터셋 구성 코드

import unicodedata
import re
import random


SOS_token = 0
EOS_token = 1

# 어휘 관리 클래스: 단어를 인덱스와 매핑하고, 단어 빈도를 추적
class Lang:
    def __init__(self, name):
        self.name = name
        self.word2index = {}  # 단어 -> 인덱스 매핑
        self.word2count = {}  # 단어 -> 등장 횟수
        self.index2word = {0: "SOS", 1: "EOS"}  # 인덱스 -> 단어 매핑
        self.n_words = 2  # vocab size

    # 문장을 단어로 분리하여 각 단어를 사전에 추가
    def addSentence(self, sentence):
        for word in sentence.split(' '):
            self.addWord(word)


    def addWord(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words  # 단어에 새 인덱스 부여
            self.index2word[self.n_words] = word  # 인덱스 -> 단어 매핑
            self.word2count[word] = 1  # 단어 등장 횟수 초기화
            self.n_words += 1  # 단어 수 증가
        else:
            self.word2count[word] += 1  # 단어가 이미 있다면 등장 횟수 증가


# 유니 코드 문자열을 일반 ASCII로 변환
# https://stackoverflow.com/a/518232/2809427
def unicodeToAscii(s):
    return ''.join(
        c for c in unicodedata.normalize('NFD', s)
        if unicodedata.category(c) != 'Mn'
    )

# 소문자, 다듬기, 그리고 문자가 아닌 문자 제거
# re: Regular Expression, sub: Substitute, r"": raw string, \1: matching word
# re.sub(pattern, repl, string): pattern: 정규식 패턴, repl: 대체할 문자열, string: 변환할 대상 문자열.
def normalizeString(s):
    s = unicodeToAscii(s.lower().strip())
    s = re.sub(r"([.!?])", r" \1", s)
    s = re.sub(r"[^a-zA-Z!?]+", r" ", s)
    return s.strip()

def readLangs(lang1, lang2, reverse=False):

    # 파일을 읽고 줄로 분리
    file = open(f'../data/{lang1}-{lang2}.txt', encoding='utf-8')
    lines = file.read().strip().split('\n')

    # 모든 줄을 쌍으로 분리하고 정규화
    pairs = []
    for line in lines:  # 파일의 각 줄에 대해 반복
        pair = line.split('\t')  # 탭(\t)을 기준으로 줄을 나눠 문장 쌍을 생성
        normalized_pair = []
        for sentence in pair:  # 문장 쌍의 각 문장에 대해 반복
            normalized_pair.append(normalizeString(sentence))  # 문장을 정규화 후 추가
        pairs.append(normalized_pair)  # 정규화된 문장 쌍을 리스트에 추가

    # 쌍을 뒤집고, Lang 인스턴스 생성
    if reverse:
        pairs = [list(reversed(p)) for p in pairs]
        input_lang = Lang(lang2)
        output_lang = Lang(lang1)
    else:
        input_lang = Lang(lang1)
        output_lang = Lang(lang2)

    return input_lang, output_lang, pairs


# 문장 내, 단어 수가 10개를 넘지 않아야 함
MAX_LENGTH = 10

eng_prefixes = (
    "i am ", "i m ",
    "he is", "he s ",
    "she is", "she s ",
    "you are", "you re ",
    "we are", "we re ",
    "they are", "they re "
)

def filterPair(pair):
    return len(pair[0].split(' ')) < MAX_LENGTH and \
        len(pair[1].split(' ')) < MAX_LENGTH and \
        pair[1].startswith(eng_prefixes)


def filterPairs(pairs):
    filtered_pairs = []
    for pair in pairs:
        if filterPair(pair):
            filtered_pairs.append(pair)
    return filtered_pairs


def prepareData(lang1, lang2, reverse=False):
    input_lang, output_lang, pairs = readLangs(lang1, lang2, reverse)
    pairs = filterPairs(pairs)
    for pair in pairs:
        input_lang.addSentence(pair[0])
        output_lang.addSentence(pair[1])
    return input_lang, output_lang, pairs


if __name__ == '__main__':

    # Example 1
    lang = Lang("English")
    input_str = "Hello, world!"
    lang.addSentence(input_str)
    print(f"\nExample 1, Lang name: {lang.name}, addSentence: {input_str}, word2index: {lang.word2index}, "
          f"word2count:{lang.word2count}, index2word: {lang.index2word}, n_words: {lang.n_words}")

    # Example 2
    print(f"Example 2, normalizeString({input_str}): {normalizeString(input_str)}")

    # Example 3
    input_lang, output_lang, pairs = prepareData('eng', 'fra', True)
    print(f"Example 3, Random pair [france -> english]: {random.choice(pairs)}")

 

실행 결과

Example 1
Lang name: English, addSentence: Hello, world!, word2index: {'Hello,': 2, 'world!': 3}, 
word2count:{'Hello,': 1, 'world!': 1}, index2word: {0: 'SOS', 1: 'EOS', 2: 'Hello,', 3: 'world!'}, 
n_words: 4

Example 2
normalizeString(Hello, world!): hello world !

Example 3
Random pair [france -> english]: 
['je suis desolee je ne vous ai pas reconnue', 'i m sorry i didn t recognize you']

 

 

Step 2: Encoder와 AttentionDecoder 정의

import torch
import torch.nn as nn


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

SOS_token = 0
EOS_token = 1
MAX_LENGTH = 10

class EncoderRNN(nn.Module):
    def __init__(self, input_size, hidden_size, dropout_p=0.1):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size

        # 입력 인덱스(input_indices)는 Embedding 가중치 행렬(input_size)의 특정 행(row)을 선택
        self.embedding = nn.Embedding(num_embeddings=input_size, embedding_dim=hidden_size)
        # hidden_size: 입력 크기와 출력 히든 상태의 크기 (같은 크기로 설정)
        self.gru = nn.GRU(input_size=hidden_size, hidden_size=hidden_size, batch_first=True)
        self.dropout = nn.Dropout(dropout_p)

    def forward(self, input):
        embedded = self.dropout(self.embedding(input))  # [batch_size, seq_len, hidden_size]
        # output: 모든 타임 스텝의 히든 상태 [batch_size, seq_len, hidden_size]
        # hidden: 마지막 타임 스텝의 히든 상태 [num_layers, batch_size, hidden_size]
        output, hidden = self.gru(embedded)
        return output, hidden


class BahdanauAttention(nn.Module):
    def __init__(self, hidden_size):
        super(BahdanauAttention, self).__init__()
        self.Wa = nn.Linear(hidden_size, hidden_size)
        self.Ua = nn.Linear(hidden_size, hidden_size)
        self.Va = nn.Linear(hidden_size, 1)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, query, keys):
        scores = self.Va(torch.tanh(self.Wa(query) + self.Ua(keys)))  # query: h_{dec,t}, keys: h_{enc,i}
        scores = scores.squeeze(2).unsqueeze(1)  # Softmax 적용 값은 마지막 차원: [batch_size, 4, 1] -> [batch_size, 1, 4]

        weights = self.softmax(scores)
        context = torch.bmm(weights, keys)  # [B,N,M] x [B,M,P] = [B,N,P]

        return context, weights


class AttnDecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size, dropout_p=0.1):
        super(AttnDecoderRNN, self).__init__()
        self.embedding = nn.Embedding(num_embeddings=output_size, embedding_dim=hidden_size)
        self.attention = BahdanauAttention(hidden_size)
        self.gru = nn.GRU(input_size=2 * hidden_size, hidden_size=hidden_size, batch_first=True)
        self.out = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(dropout_p)
        self.LogSoftmax = nn.LogSoftmax(dim=-1)

    def forward(self, encoder_outputs, encoder_hidden, target_tensor=None):
        batch_size = encoder_outputs.size(0)
        # 시작 토큰(Start of Sentence Token, SOS_token), [B,1] = [[1],[1],...,[1]]
        decoder_input = torch.empty(batch_size, 1, dtype=torch.long, device=device).fill_(SOS_token)
        # hidden: 마지막 타임스텝의 히든 상태 [num_layers, batch_size, hidden_size]
        decoder_hidden = encoder_hidden
        decoder_outputs = []
        attentions = []

        for i in range(MAX_LENGTH):
            decoder_output, decoder_hidden, attn_weights = self.forward_step(
                decoder_input, decoder_hidden, encoder_outputs
            )
            decoder_outputs.append(decoder_output)
            attentions.append(attn_weights)

            if target_tensor is not None:
                # Teacher forcing 포함: 목표를 다음 입력으로 전달
                decoder_input = target_tensor[:, i].unsqueeze(1)  # Teacher forcing
            else:
                # Teacher forcing 미포함: 자신의 예측을 다음 입력으로 사용
                _, top_idx = decoder_output.topk(1)
                decoder_input = top_idx.squeeze(-1).detach()  # 입력으로 사용할 부분을 히스토리에서 분리

        # [Batch_size, seq_len, vocab_size] -> vocab_size 에 대한 Softmax를 사용하여 추정 단어를 선택
        decoder_outputs = torch.cat(decoder_outputs, dim=1)
        decoder_outputs = self.LogSoftmax(decoder_outputs)
        attentions = torch.cat(attentions, dim=1)

        return decoder_outputs, decoder_hidden, attentions


    def forward_step(self, input, hidden, encoder_outputs):
        # decoder_input: [batch_size, 1, hidden_size]
        embedded = self.dropout(self.embedding(input))

        # 디코더의 히든 상태: [batch_size, 1, hidden_size]
        query = hidden.permute(1, 0, 2)
        context, attn_weights = self.attention(query, encoder_outputs)
        input_gru = torch.cat((embedded, context), dim=2)

        output, hidden = self.gru(input_gru, hidden)
        # *목표 언어의 모든 단어에 대한 점수(logit)
        output = self.out(output)

        return output, hidden, attn_weights

if __name__ == '__main__':
    input_size = 10  # 입력 어휘 사전 크기 (예: 50개의 고유 단어)
    hidden_size = 128  # GRU 히든 상태 크기
    output_size = 10  # 출력 어휘 사전 크기 (예: 50개의 고유 단어)
    batch_size = 2  # 배치 크기
    seq_len = 10  # 입력 시퀀스 길이

    encoder = EncoderRNN(input_size=input_size, hidden_size=hidden_size).to(device)
    attention = BahdanauAttention(hidden_size)
    decoder = AttnDecoderRNN(hidden_size=hidden_size, output_size=output_size).to(device)

    input_tensor = torch.randint(0, input_size, (batch_size, seq_len), dtype=torch.long, device=device)
    target_tensor = torch.randint(0, output_size, (batch_size, seq_len), dtype=torch.long, device=device)
    keys = torch.randn(batch_size, seq_len, hidden_size)
    query = torch.randn(batch_size, 1, hidden_size)
    context, weights = attention(query, keys)

    encoder_outputs, encoder_hidden = encoder(input_tensor)
    decoder_outputs, decoder_hidden, _ = decoder(encoder_outputs, encoder_hidden, target_tensor=target_tensor)

    # Example 1
    print(f"Example 1: Input Tensor Shape: {input_tensor.shape}, Encoder Outputs Shape: {encoder_outputs.shape}, "
          f"Encoder Hidden Shape: {encoder_hidden.shape}\n")  # (1, 배치 크기, 히든 크기)

    # Example 2
    print(f"Example 2: Keys shape: {keys.shape}, Query shape: {query.shape}, Scores shape: {weights.shape}, "
          f"Context shape: {context.shape}\n")

    # Example 3
    print(f"Example 3: Decoder Outputs Shape: {decoder_outputs.shape}, Decoder Hidden Shape: {decoder_hidden.shape}")

 

실행 결과

Example 1
Input Tensor Shape: torch.Size([2, 10]), Encoder Outputs Shape: torch.Size([2, 10, 128]), 
Encoder Hidden Shape: torch.Size([1, 2, 128])

Example 2
Keys shape: torch.Size([2, 10, 128]), Query shape: torch.Size([2, 1, 128]), 
Scores shape: torch.Size([2, 1, 10]), Context shape: torch.Size([2, 1, 128])

Example 3
Decoder Outputs Shape: torch.Size([2, 10, 10]), Decoder Hidden Shape: torch.Size([1, 2, 128])

 

 

Step 3: 훈련데이터 준비

"""
학습 데이터 준비
"""

import numpy as np
from torch.utils.data import TensorDataset, DataLoader, RandomSampler
from Tutorial3_step1 import *
import torch


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

SOS_token = 0
EOS_token = 1
MAX_LENGTH = 10

def indexesFromSentence(lang, sentence):
    indexes = []
    for word in sentence.split(' '):
        indexes.append(lang.word2index[word])
    return indexes


def tensorFromSentence(lang, sentence):
    indexes = indexesFromSentence(lang, sentence)
    indexes.append(EOS_token)
    return torch.tensor(indexes, dtype=torch.long, device=device).view(1, -1)  # size: [1, n], n: num of words


def get_dataloader(batch_size):
    input_lang, output_lang, pairs = prepareData('eng', 'fra', True)

    n = len(pairs)
    input_ids = np.zeros((n, MAX_LENGTH), dtype=np.int32)  # (11445, 10)
    target_ids = np.zeros((n, MAX_LENGTH), dtype=np.int32)  # (11445, 10)

    for idx, (inp, tgt) in enumerate(pairs):
        inp_ids = indexesFromSentence(input_lang, inp)
        tgt_ids = indexesFromSentence(output_lang, tgt)
        inp_ids.append(EOS_token)
        tgt_ids.append(EOS_token)
        input_ids[idx, :len(inp_ids)] = inp_ids   # e.g. [ 116  116 3272  961    1    0    0    0    0    0]
        target_ids[idx, :len(tgt_ids)] = tgt_ids  # e.g. [  2    3   146   32    2    3   65    1    0    0]

    train_data = TensorDataset(torch.LongTensor(input_ids).to(device),
                               torch.LongTensor(target_ids).to(device))

    train_sampler = RandomSampler(train_data)
    train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)
    return input_lang, output_lang, train_dataloader

if __name__ == '__main__':
    input_lang, output_lang, train_dataloader = get_dataloader(batch_size=1)

    for i, (input_ids, target_ids) in enumerate(train_dataloader):
        print(f"Input Ids (영어): {input_ids}")
        print(f"Target Ids (프랑스어): {target_ids}")
        break

 

실행 결과

Input Ids     (영어): tensor([[ 143,   21,  242,   58, 1100,   99, 3327,    1,    0,    0]]
Target Ids (프랑스어): tensor([[  13,   14,  144,   40,  981, 2020,    1,    0,    0,    0]]

 

 

Step 4: 모델 훈련 및 테스트 준비

"""
모델 훈련 준비
"""

import torch
import torch.nn as nn
from torch import optim
from Tutorial3_step1 import prepareData
from Tutorial3_step3 import tensorFromSentence
import time
import math
import random


SOS_token = 0
EOS_token = 1
MAX_LENGTH = 10

def asMinutes(s):
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)


def timeSince(since, percent):
    now = time.time()
    s = now - since
    es = s / (percent)
    rs = es - s
    return '%s (- %s)' % (asMinutes(s), asMinutes(rs))


def train_epoch(dataloader, encoder, decoder):

    total_loss = 0
    criterion = nn.NLLLoss()
    encoder_optimizer = optim.Adam(encoder.parameters(), lr=1e-3)
    decoder_optimizer = optim.Adam(decoder.parameters(), lr=1e-3)

    for data in dataloader:
        input_tensor, target_tensor = data

        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()

        encoder_outputs, encoder_hidden = encoder(input_tensor)
        decoder_outputs, _, _ = decoder(encoder_outputs, encoder_hidden, target_tensor)

        loss = criterion(
            decoder_outputs.view(-1, decoder_outputs.size(-1)),
            target_tensor.view(-1)
        )
        loss.backward()

        encoder_optimizer.step()
        decoder_optimizer.step()

        total_loss += loss.item()

    return total_loss / len(dataloader)


def train(train_dataloader, encoder, decoder, n_epochs):
    start = time.time()
    loss_total = 0  # Reset every print_every

    for epoch in range(1, n_epochs + 1):
        loss = train_epoch(train_dataloader, encoder, decoder)
        loss_total += loss

        if epoch % 1 == 0:
            loss_avg = loss_total / 1
            loss_total = 0
            print('%s (%d %d%%) %.4f' % (timeSince(start, epoch / n_epochs), epoch, epoch / n_epochs * 100, loss_avg))


def evaluate(encoder, decoder, sentence, input_lang, output_lang):
    with torch.no_grad():
        input_tensor = tensorFromSentence(input_lang, sentence)

        encoder_outputs, encoder_hidden = encoder(input_tensor)
        decoder_outputs, decoder_hidden, decoder_attn = decoder(encoder_outputs, encoder_hidden)

        _, topi = decoder_outputs.topk(1)
        decoded_ids = topi.squeeze()

        decoded_words = []
        for idx in decoded_ids:
            if idx.item() == EOS_token:
                decoded_words.append('<EOS>')
                break
            decoded_words.append(output_lang.index2word[idx.item()])
    return decoded_words, decoder_attn


def evaluateRandomly(encoder, decoder, n=3):
    input_lang, output_lang, pairs = prepareData('eng', 'fra', True)
    for i in range(n):
        pair = random.choice(pairs)
        print(f'Input sentence: {pair[0]}')
        print(f'Target sentence: {pair[1]}')
        output_words, _ = evaluate(encoder, decoder, pair[0], input_lang, output_lang)
        output_sentence = ' '.join(output_words)
        print(f'Output sentence: {output_sentence}\n')

 

 

Step 5: 모델 훈련 및 테스트 준비

import os
import torch
from Tutorial3_step2 import EncoderRNN, AttnDecoderRNN
from Tutorial3_step3 import get_dataloader
from Tutorial3_step4 import train, evaluateRandomly


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

hidden_size = 128
batch_size = 32
n_epochs = 80

# 가중치 파일 경로
encoder_path = "../weight/Tutorial3/encoder_weights.pth"
decoder_path = "../weight/Tutorial3/decoder_weights.pth"

input_lang, output_lang, train_dataloader = get_dataloader(batch_size)

encoder = EncoderRNN(input_lang.n_words, hidden_size).to(device)
decoder = AttnDecoderRNN(hidden_size, output_lang.n_words).to(device)

# 가중치 파일이 존재하는지 확인
if os.path.exists(encoder_path) and os.path.exists(decoder_path):
    # 모델 가중치 불러오기
    encoder.load_state_dict(torch.load(encoder_path))
    decoder.load_state_dict(torch.load(decoder_path))

else:
    # 모델 훈련 모드 설정
    encoder.train()
    decoder.train()

    # 모델 훈련
    train(train_dataloader, encoder, decoder, n_epochs)

    # 모델 가중치 저장
    torch.save(encoder.state_dict(), encoder_path)
    torch.save(decoder.state_dict(), decoder_path)

# 모델 테스트 모드 설정
encoder.eval()
decoder.eval()

# 평가 진행
evaluateRandomly(encoder, decoder)

 

실행 결과

0m 9s (- 12m 19s) (1 1%) 2.4665
0m 18s (- 11m 52s) (2 2%) 1.6445
0m 26s (- 11m 31s) (3 3%) 1.3686
......
11m 41s (- 0m 0s) (80 100%) 0.0292


Input sentence: je vous laisse partir
Target sentence: i m letting you go
Output sentence: i m letting you go <EOS>

Input sentence: je suis satisfait de ses progres
Target sentence: i m satisfied with his progress
Output sentence: i m satisfied with his progress <EOS>

Input sentence: nous sommes toutes contentes d aider
Target sentence: we are all happy to help
Output sentence: we are all happy to help <EOS>