딥러닝/자연어처리

[자연어처리] 기초부터 시작하는 Transformer (Pytorch 구현)

johyeongseob 2025. 1. 2. 00:08

Kaggle의 Transformer from scratch using pytorch (작성자: arunmohan_003)의 글을 한글로 번역하였습니다.

 

원글: https://www.kaggle.com/code/arunmohan003/transformer-from-scratch-using-pytorch

 

참고: 아래 코드 조각들은 하나의 프로젝트에서 클래스 별 파일을 따로 생성하였습니다.)

 

 

 

1. 서론

이 튜토리얼에서는 파이토치를 사용하여 “Attention is all you need”의 트랜스포머를 처음부터 구현하는 방법을 설명합니다. 기본적으로 트랜스포머는 인코더-디코더 아키텍처를 가지고 있습니다. 이는 언어 번역 모델에서 흔히 볼 수 있는 구조입니다.

 

 

위 이미지는 프랑스어에서 영어로 언어 번역 모델을 보여줍니다. 실제로는 아래와 같이 인코더 스택 마지막 (Top) 과 디코더 스택을 사용할 수 있습니다:

 

 

진행하기 앞서 Attention (Transformer) 모델의 전체 이미지를 보겠습니다.

 

 

2. 라이브러리 설치하기

역자 주: Pytorch 2.0.0 과 torchtext 0.15.0 버전 사용

pip install torch==2.0.0 torchvision==0.15.0 torchtext==0.15.0 --index-url https://download.pytorch.org/whl/cu118

 

 트랜스포머는 언어 번역을 위한 인코더 디코더 아키텍처입니다. 인코더 또는 디코더에 대해 살펴보기 전에, 몇 가지 공통 구성 요소를 살펴봅시다.

 

3. 기본 요소

단어 임베딩 만들기


 먼저 입력 시퀀스의 각 단어를 임베딩 벡터로 변환해야 합니다. 임베딩 벡터는 각 단어를 보다 의미적으로 표현합니다.

 

 각 임베딩 벡터의 차원이 512이고 어휘 크기 (vocabulary size) 가 100이라고 가정하면 임베딩 행렬의 크기는 100x512가 됩니다. 이러한 행렬은 학습 중에 학습되며 추론 중에 각 단어는 해당 512d 벡터에 매핑됩니다. 배치 크기가 32이고 시퀀스 길이가 10 단어라고 가정합니다. 출력은 32x10x512가 됩니다.

import torch.nn as nn


class Embedding(nn.Module):
    def __init__(self, vocab_size, embed_dim):
        """
        Args:
            vocab_size: size of vocabulary
            embed_dim: dimension of embeddings
        """
        super(Embedding, self).__init__()
        self.embed = nn.Embedding(vocab_size, embed_dim)
    def forward(self, x):
        """
        Args:
            x: input vector
        Returns:
            out: embedding vector
        """
        out = self.embed(x)
        return out

 

위치 인코딩


 다음 단계는 위치 인코딩을 생성하는 것입니다. 모델이 문장을 이해하려면 각 단어에 대해 두 가지를 알아야 합니다.

  • 단어의 의미는 무엇인가?
  • 문장에서 해당 단어의 위치가 어디인가?

 “Attention is all you need" 에서 저자는 위치 인코딩을 생성하기 위해 다음 함수를 사용했습니다. 홀수 시간 단계에서는 cosine 함수를 사용하고 짝수 시간 단계에서는 sine 함수를 사용합니다.

\begin{align*}
    & PE_{(pos,2i)}=sin(pos/10000^{2i/d_{model}})
\end{align*}

\begin{align*}
    & PE_{(pos,2i+1)}=cos(pos/10000^{2i/d_{model}})
\end{align*}

 여기서 pos는 문장의 순서, i는 임베딩 벡터 차원에 따른 위치를 나타냅니다.

 

 위치 임베딩은 임베딩 행렬과 유사한 행렬을 생성합니다. 이는 (차원 시퀀스 길이 x 임베딩 차원) 의 행렬을 생성합니다. 각 토큰 (단어) 에 대해 순서대로 1 x 512 차원인 임베딩 벡터를 찾고, 여기에 1 x 512 차원인 해당 위치 벡터를 더하여 각 단어 (토큰) 에 대해 1 x 512 차원을 얻습니다.

 예를 들어 배치 크기가 32이고 시퀀스 길이가 10이고 임베딩 차원이 512인 경우입니다. 그러면 32 x 10 x 512 크기의 임베딩 벡터를 갖게 됩니다. 마찬가지로 차원 32 x 10 x 512의 위치 인코딩 벡터를 갖게 됩니다. 그런 다음 둘을 더합니다. (element-wise sum)

 

 

# register buffer in Pytorch ->
# If you have parameters in your model, which should be saved and restored in the state_dict,
# but not trained by the optimizer, you should register them as buffers.

import torch
import torch.nn as nn
import math


class PositionalEmbedding(nn.Module):
    def __init__(self, max_seq_len, embed_model_dim):
        """
        Args:
            seq_len: length of input sequence
            embed_model_dim: demension of embedding
        """
        super(PositionalEmbedding, self).__init__()
        self.embed_dim = embed_model_dim

        pe = torch.zeros(max_seq_len, self.embed_dim)
        for pos in range(max_seq_len):
            for i in range(0, self.embed_dim, 2):
                pe[pos, i] = math.sin(pos / (10000 ** ((2 * i) / self.embed_dim)))
                pe[pos, i + 1] = math.cos(pos / (10000 ** ((2 * (i + 1)) / self.embed_dim)))
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        """
        Args:
            x: input vector
        Returns:
            x: output
        """

        # make embeddings relatively larger
        x = x * math.sqrt(self.embed_dim)
        # add constant to embedding
        seq_len = x.size(1)
        x = x + torch.autograd.Variable(self.pe[:, :seq_len], requires_grad=False)
        return x

if __name__ == '__main__':
    max_seq_len = 50  # 최대 시퀀스 길이
    embed_model_dim = 64  # 임베딩 차원

    pos_embedding = PositionalEmbedding(max_seq_len=max_seq_len, embed_model_dim=embed_model_dim)

    # 입력 데이터 (배치 크기 2, 시퀀스 길이 10, 임베딩 차원 64)
    x = torch.randn(2, 10, embed_model_dim)  # (batch_size, seq_len, embed_dim)

    output = pos_embedding(x)

    # 출력 크기 확인
    print("Input shape: ", x.shape)  # Input shape: (2, 10, 64)
    print("Output shape: ", output.shape)  # Output shape: (2, 10, 64)

    # Positional Encoding 값만 확인
    pos_encoding = pos_embedding.pe[0, :5, :5]
    print("Positional Encoding (first 5 positions, first 5 dimensions):")
    print(pos_encoding)

 

 

자기 주의 (Self attention)


자기 주의와 멀티헤드 주의 (Multihead attention) 에 대해 간략히 설명해 드리겠습니다.

자기 주의란 무엇인가요?

 “Dog is crossing the street because it saw the kitchen"라는 문장이 있다고 가정해 봅시다. 여기서 'it'은 무엇을 의미할까요? 사람이라면 'Dog'라는 것을 쉽게 이해할 수 있습니다. 하지만 컴퓨터는 그렇지 않습니다.

 모델은 각 단어 (it) 를 처리할 때 자기 주의를 기울여 입력 시퀀스의 다른 위치에서 단서 (Dog) 를 찾을 수 있습니다. 그리고 각 단어와 다른 단어의 의존성 (attention) 을 기반으로 벡터를 생성합니다.

자기 주의에 대한 단계별 설명을 살펴보겠습니다.

  • 1단계: 자기 주의를 계산하는 첫 번째 단계는 인코더의 각 입력 벡터 (이 경우 각 단어의 임베딩) 에서 세 개의 벡터를 생성하는 것입니다. 따라서 각 단어에 대해 쿼리 (query) 벡터, 키 (key) 벡터, 값 (value) 벡터를 생성합니다. 각 벡터의 크기는 1x64입니다.

앞으로 멀티 헤드 어텐션이 8개의 자체 어텐션 헤드 (512/64=8) 를 가지게 되며 이를 염두에 두고 코드를 설명하겠습니다.

 

쿼리, 키, 값 벡터는 어떻게 생성하나요?

쿼리, 키, 값 벡터를 생성하기 위해 쿼리 행렬, 키 행렬, 값 행렬이 있습니다. 이러한 행렬은 훈련 중에 학습됩니다.

코드 힌트:
배치 크기=32,시퀀스 길이=10, 임베딩 차원=512라고 가정합니다. 따라서 임베딩 및 위치 인코딩 후 출력의 크기는 32x10x512가 됩니다.
크기를 32x10x8x64로 조정합니다. (멀티헤드 주의에서 헤드 수는 8개 입니다. 코드를 살펴보면 알 수 있으니 걱정하지 마세요.).

 

  • 2단계: 두 번째 단계는 점수를 계산하는 것입니다. 즉, 쿼리 행렬에 키 행렬을 곱합니다. [Q x K.t]
코드 힌트:
키, 쿼리 및 값 차원이 32x10x8x64라고 가정합니다. 더 진행하기 전에 곱셈의 편의를 위해 각각을 전치(32x8x10x64)하겠습니다. 
이제 쿼리 행렬에 키 행렬을 전치하여 곱합니다. 즉, (32x8x10x64) x (32x8x64x10) -> (32x8x10x10).

 

  • 3단계: 이제 출력 행렬을 키 행렬 치수의 제곱근으로 나눈 다음 그 위에 Softmax를 적용합니다.
코드 힌트: 
32x8x10x10 벡터를 8, 즉 64의 제곱근(키 행렬의 차원)으로 나눕니다.

 

역자 주: Softmax (dim=-1) 는 행 별로 적용됩니다. 즉, 하나의 query값에 대한 모든 key값의 유사도에 대해 합이 1이 되도록 확률을 구하게 됩니다.

 

  • 4단계: 그런 다음 값 행렬을 곱합니다.
코드 힌트:
3단계가 끝나면 출력은 32x8x10x10 차원이 됩니다. 이제 값 행렬(32x8x10x64)을 곱하여 차원(32x8x10x64)의 출력을 얻습니다. 
여기서 8은 주의 헤드의 수이고 10은 시퀀스 길이이므로 각 단어마다 64개의 차원 벡터가 있습니다.

 

  • 5단계: 이 결과를 얻으면 이를 선형 레이어에 통과시킵니다. 이렇게 하면 멀티헤드 주의의 출력이 형성됩니다.
코드 힌트:
(32x8x10x64) 벡터는 (32x10x8x64)로 전치된 후 (32x10x512)로 재형성된 다음 선형 레이어를 통과하여 (32x10x512)의 출력을 얻습니다.

 

 이제 멀티헤드 주의가 어떻게 작동하는지에 대한 아이디어를 얻으셨습니다. code 구현 부분을 살펴보면 더 명확해질 것입니다.

 

import torch
import torch.nn as nn
import torch.nn.functional as F
import math


class MultiHeadAttention(nn.Module):
    def __init__(self, embed_dim=512, n_heads=8):
        """
        Args:
            embed_dim: dimension of embeding vector output
            n_heads: number of self attention heads
        """
        super(MultiHeadAttention, self).__init__()

        self.embed_dim = embed_dim  # 512 dim
        self.n_heads = n_heads  # 8
        self.single_head_dim = int(self.embed_dim / self.n_heads)  # 512/8 = 64  . each key,query, value will be of 64d

        # key,query and value matrixes    #64 x 64
        self.query_matrix = nn.Linear(self.single_head_dim, self.single_head_dim, bias=False)
        self.key_matrix = nn.Linear(self.single_head_dim, self.single_head_dim, bias=False)
        self.value_matrix = nn.Linear(self.single_head_dim, self.single_head_dim, bias=False)
        self.out = nn.Linear(self.n_heads * self.single_head_dim, self.embed_dim)

    def forward(self, key, query, value, mask=None):  # batch_size x sequence_length x embedding_dim    # 32 x 10 x 512

        """
        Args:
           key : key vector
           query : query vector
           value : value vector
           mask: mask for decoder

        Returns:
           output vector from multihead attention
        """
        batch_size = key.size(0)
        seq_length = key.size(1)

        # query dimension can change in decoder during inference.
        # so we cant take general seq_length
        seq_length_query = query.size(1)

        key = key.view(batch_size, seq_length, self.n_heads, self.single_head_dim)  # (32x10x8x64)
        query = query.view(batch_size, seq_length_query, self.n_heads, self.single_head_dim)  # (32x10x8x64)
        value = value.view(batch_size, seq_length, self.n_heads, self.single_head_dim)  # (32x10x8x64)

        k = self.key_matrix(key)  # (32x10x8x64)
        q = self.query_matrix(query)
        v = self.value_matrix(value)

        q = q.transpose(1, 2)  # (batch_size, n_heads, seq_len, single_head_dim)    # (32 x 8 x 10 x 64)
        k = k.transpose(1, 2)  # (batch_size, n_heads, seq_len, single_head_dim)
        v = v.transpose(1, 2)  # (batch_size, n_heads, seq_len, single_head_dim)

        # computes attention
        # adjust key for matrix multiplication
        k_adjusted = k.transpose(-1, -2)  # (batch_size, n_heads, single_head_dim, seq_ken)  #(32 x 8 x 64 x 10)
        product = torch.matmul(q, k_adjusted)  # (32 x 8 x 10 x 64) x (32 x 8 x 64 x 10) = #(32x8x10x10)

        # fill those positions of product matrix as (-1e20) where mask positions are 0
        if mask is not None:
            product = product.masked_fill(mask == 0, float("-1e20"))

        # divising by square root of key dimension
        product = product / math.sqrt(self.single_head_dim)  # / sqrt(64)

        # applying softmax
        scores = F.softmax(product, dim=-1)

        # mutiply with value matrix
        scores = torch.matmul(scores, v)  # (32 x 8 x 10 x 10) x (32 x 8 x 10 x 64) = (32 x 8 x 10 x 64)

        # concatenated output, (32x8x10x64) -> (32x10x8x64)  -> (32,10,512)
        concat = scores.transpose(1, 2).contiguous().view(batch_size, seq_length_query, self.single_head_dim * self.n_heads)

        output = self.out(concat)  # (32,10,512) -> (32,10,512)

        return output

 

이제 갑작스러운 질문이 떠오를 수 있습니다. 코드에 있는 마스크 (Mask)는 어디에 사용되나요? 이는 디코더에 대해 같이 설명할 테니 걱정하지 마세요.

 

 

4. 인코더

  • 1단계: 첫 번째 입력 (문장 내 단어 및 추가 패딩 토큰) 은 임베딩 레이어와 위치 인코딩 레이어를 통과합니다.
코드 힌트:
32x10(배치 크기=32, 시퀀스 길이=10)의 입력이 있다고 가정합니다. 임베딩 레이어를 통과하면 32x10x512가 됩니다. 
그런 다음 해당 위치 인코딩 벡터가 추가되어 32x10x512의 출력을 생성합니다. 이것은 멀티헤드 주의로 전달됩니다.

 

  • 2단계: 위에서 설명한 대로 멀티헤드 주의 레이어를 통과하여 유용한 표현 행렬을 출력으로 생성합니다.
코드 힌트:
멀티헤드 주의에 대한 입력은 위와 같이 키, 쿼리 및 값 벡터가 생성된 32x10x512이며, 최종적으로 32x10x512 출력을 생성합니다.

 

  • 3단계: 다음으로 정규화 및 잔여 연결 (residual connection) 이 있습니다. 멀티헤드 주의의 출력에 입력값을 더한 다음 정규화합니다.
코드 힌트:
32x10x512인 멀티헤드 주의의 출력에 32x10x512 이전 입력(임베딩 벡터로 생성된 출력)을 더한 다음 레이어를 정규화합니다.

 

  • 4단계: 다음으로 피드 포워드 레이어와 이전 입력(피드 포워드 레이어의 입력)의 잔여 연결이 있는 정규화 레이어가 있는데, 정규화 후 출력을 통과시켜 최종적으로 인코더의 출력을 얻습니다.
코드 힌트:
정규화된 출력의 크기는 32x10x512입니다. 이는 2개의 선형 레이어를 통과합니다: 32x10x512 -> 32x10x2048 -> 32x10x512. 
마지막으로 출력과 함께 추가되는 잔여 연결이 있고 레이어가 정규화됩니다. 따라서 32x10x512 차원 벡터가 인코더의 출력으로 생성됩니다.

 

import torch.nn as nn
from Embedding import Embedding
from PositionalEmbedding import PositionalEmbedding
from MultiHeadAttention import MultiHeadAttention


class TransformerBlock(nn.Module):
    def __init__(self, embed_dim, expansion_factor=4, n_heads=8):
        super(TransformerBlock, self).__init__()

        """
        Args:
           embed_dim: dimension of the embedding
           expansion_factor: fator ehich determines output dimension of linear layer
           n_heads: number of attention heads

        """
        self.attention = MultiHeadAttention(embed_dim, n_heads)

        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)

        self.feed_forward = nn.Sequential(
            nn.Linear(embed_dim, expansion_factor * embed_dim),
            nn.ReLU(),
            nn.Linear(expansion_factor * embed_dim, embed_dim)
        )

        self.dropout1 = nn.Dropout(0.2)
        self.dropout2 = nn.Dropout(0.2)

    def forward(self, key, query, value):
        """
        Args:
           key: key vector
           query: query vector
           value: value vector
           norm2_out: output of transformer block

        """

        attention_out = self.attention(key, query, value)  # 32x10x512
        attention_residual_out = attention_out + value  # 32x10x512
        norm1_out = self.dropout1(self.norm1(attention_residual_out))  # 32x10x512

        feed_fwd_out = self.feed_forward(norm1_out)  # 32x10x512 -> #32x10x2048 -> 32x10x512
        feed_fwd_residual_out = feed_fwd_out + norm1_out  # 32x10x512
        norm2_out = self.dropout2(self.norm2(feed_fwd_residual_out))  # 32x10x512

        return norm2_out


class TransformerEncoder(nn.Module):
    """
    Args:
        seq_len : length of input sequence
        embed_dim: dimension of embedding
        num_layers: number of encoder layers
        expansion_factor: factor which determines number of linear layers in feed forward layer
        n_heads: number of heads in multihead attention

    Returns:
        out: output of the encoder
    """

    def __init__(self, seq_len, vocab_size, embed_dim, num_layers=2, expansion_factor=4, n_heads=8):
        super(TransformerEncoder, self).__init__()

        self.embedding_layer = Embedding(vocab_size, embed_dim)
        self.positional_encoder = PositionalEmbedding(seq_len, embed_dim)

        self.layers = nn.ModuleList([TransformerBlock(embed_dim, expansion_factor, n_heads) for i in range(num_layers)])

    def forward(self, x):
        embed_out = self.embedding_layer(x)
        out = self.positional_encoder(embed_out)
        for layer in self.layers:
            out = layer(out, out, out)

        return out  # 32x10x512

 

 

5. 디코더

 

 지금까지 우리는 인코더를 살펴 보았습니다. 이제 디코더의 구성 요소에 대해 살펴 보겠습니다. 우리는 인코더의 출력을 사용하여 디코더의 키 및 값 벡터를 생성할 것입니다. 디코더에는 두 가지 종류의 다중 헤드 주의가 있으며 하나는 디코더 주의이고 다른 하나는 인코더-디코더 주의입니다. 걱정하지 마세요. 단계별로 진행하겠습니다.

 

아래 Training 단계를 설명드리겠습니다.

  • 1단계: 먼저 Output이 임베딩 및 위치 인코딩을 거쳐 대상 시퀀스의 각 단어에 해당하는 차원 1x512의 임베딩 벡터를 생성합니다.
코드 힌트:
시퀀스 길이가 10이고 배치 크기가 32, 임베딩 벡터 차원이 512라고 가정합니다. 
임베딩 행렬에 32x10 크기의 입력이 있고, 같은 차원의 위치 인코딩이 더해져 32x10x512 차원의 출력이 생성됩니다.

 

  • 2단계: 임베딩 출력은 이전과 마찬가지로 멀티 헤드 주의 레이어(대상 입력에서 키, 쿼리 및 값 행렬 생성)를 통과하여 출력 벡터를 생성합니다. 이번에는 멀티헤드 주의가 있는 마스크를 사용한다는 점이 가장 큰 차이점입니다.

Why mask?

 마스크는 타겟 단어의 주의력을 생성할 때 의존성을 확인하기 위해 미래 단어를 필요하지 않기 때문에 사용됩니다. 즉, 각 단어와 다른 단어의 기여도를 알아야 하기 때문에 주의력을 생성하는 이유를 이미 배웠습니다. 목표 시퀀스의 단어에 대한 주의를 생성하기 때문에 미래를 예측할 수 있는 특정 단어가 필요하지 않습니다. 예를 들어, “ I am a strudent”라는 단어에서 “ student”라는 단어를 보기 위해 “a”라는 단어가 필요하지 않습니다.

코드 힌트:
주의를 끌기 위해 1과 0이 있는 traingular matrix을 만들었습니다. 예를 들어, 시퀀스 길이 5에 대한 삼각형 행렬은 다음과 같습니다:

1 0 0 0 0
1 1 0 0 0
1 1 1 0 0
1 1 1 1 0
1 1 1 1 1

코드에서는 나누기 오류를 피하기 위해 0 위치에 매우 작은 숫자 (negative infinity e.g. -1e20)로 채웁니다.

 

  • 3단계: 이전과 마찬가지로 추가 (Add layer) 및 정규화 레이어를 추가하여 주의 출력에 임베딩의 출력을 추가하고 정규화합니다.

 

  • 4단계: 다음으로 또 다른 멀티헤드 어텐션과 추가 및 정규화 레이어가 있습니다. 이 멀티헤드 어텐션을 인코더-디코더 멀티헤드 어텐션이라고 합니다. 이 멀티헤드 어텐션의 경우 인코더 출력에서 키와 값 벡터를 생성합니다. 쿼리는 이전 디코더 계층의 출력에서 생성합니다.
코드 힌트:
따라서 인코더 출력에서 32x10x512가 출력됩니다. 모든 단어에 대한 키와 값 행렬이 이로부터 생성됩니다. 
이전 디코더 계층의 출력(32x10x512)으로부터 유사한 쿼리 행렬이 생성됩니다.

 

 따라서 다중 헤드 어텐션(헤드 수 = 8)을 통해 추가 및 정규화 계층을 통과합니다. 여기서 이전 인코더 레이어 (즉, 이전 추가 및 정규화 레이어) 의 출력은 인코더-디코더 주의 출력과 함께 추가되고 정규화됩니다.

 

  • 5단계: 다음으로 인코더에 있는 것과 유사한 피드 포워드 레이어(선형 레이어)를 추가 및 정규화합니다.

  • 6단계: 마지막으로 전체 목표 말뭉치 (corpus) 의 단어 수와 같은 길이의 선형 레이어를 만들고 각 단어의 확률을 얻기 위해 소프트맥스 함수를 사용합니다.
import torch.nn as nn
import torch.nn.functional as F
from PositionalEmbedding import PositionalEmbedding
from MultiHeadAttention import MultiHeadAttention
from TransformerEncoder import TransformerBlock


class DecoderBlock(nn.Module):
    def __init__(self, embed_dim, expansion_factor=4, n_heads=8):
        super(DecoderBlock, self).__init__()

        """
        Args:
           embed_dim: dimension of the embedding
           expansion_factor: fator ehich determines output dimension of linear layer
           n_heads: number of attention heads

        """
        self.attention = MultiHeadAttention(embed_dim, n_heads=8)
        self.norm = nn.LayerNorm(embed_dim)
        self.dropout = nn.Dropout(0.2)
        self.transformer_block = TransformerBlock(embed_dim, expansion_factor, n_heads)

    def forward(self, key, query, x, mask):
        """
        Args:
           key: key vector
           query: query vector
           value: value vector
           mask: mask to be given for multi head attention 
        Returns:
           out: output of transformer block

        """

        # we need to pass mask mask only to fst attention
        attention = self.attention(x, x, x, mask=mask)  # 32x10x512
        value = self.dropout(self.norm(attention + x))

        out = self.transformer_block(key, query, value)

        return out


class TransformerDecoder(nn.Module):
    def __init__(self, target_vocab_size, embed_dim, seq_len, num_layers=2, expansion_factor=4, n_heads=8):
        super(TransformerDecoder, self).__init__()
        """  
        Args:
           target_vocab_size: vocabulary size of taget
           embed_dim: dimension of embedding
           seq_len : length of input sequence
           num_layers: number of encoder layers
           expansion_factor: factor which determines number of linear layers in feed forward layer
           n_heads: number of heads in multihead attention

        """
        self.word_embedding = nn.Embedding(target_vocab_size, embed_dim)
        self.position_embedding = PositionalEmbedding(seq_len, embed_dim)

        self.layers = nn.ModuleList(
            [
                DecoderBlock(embed_dim, expansion_factor=4, n_heads=8)
                for _ in range(num_layers)
            ]

        )
        self.fc_out = nn.Linear(embed_dim, target_vocab_size)
        self.dropout = nn.Dropout(0.2)

    def forward(self, x, enc_out, mask):
        """
        Args:
            x: input vector from target
            enc_out : output from encoder layer
            trg_mask: mask for decoder self attention
        Returns:
            out: output vector
        """

        x = self.word_embedding(x)  # 32x10x512
        x = self.position_embedding(x)  # 32x10x512
        x = self.dropout(x)

        for layer in self.layers:
            x = layer(enc_out, x, enc_out, mask)

        out = F.softmax(self.fc_out(x))

        return out

 

7. 테스트

 마지막으로 모든 서브모듈을 정렬하고 전체 트랜스포머 아키텍처를 생성합니다. 그리고 간단한 예제와 함께 테스트를 진행하겠습니다.

import torch
import torch.nn as nn
from TransformerEncoder import TransformerEncoder
from TransformerDecoder import TransformerDecoder


class Transformer(nn.Module):
    def __init__(self, embed_dim, src_vocab_size, target_vocab_size, seq_length, num_layers=2, expansion_factor=4,
                 n_heads=8):
        super(Transformer, self).__init__()

        """  
        Args:
           embed_dim:  dimension of embedding 
           src_vocab_size: vocabulary size of source
           target_vocab_size: vocabulary size of target
           seq_length : length of input sequence
           num_layers: number of encoder layers
           expansion_factor: factor which determines number of linear layers in feed forward layer
           n_heads: number of heads in multihead attention

        """

        self.target_vocab_size = target_vocab_size

        self.encoder = TransformerEncoder(seq_length, src_vocab_size, embed_dim, num_layers=num_layers,
                                          expansion_factor=expansion_factor, n_heads=n_heads)
        self.decoder = TransformerDecoder(target_vocab_size, embed_dim, seq_length, num_layers=num_layers,
                                          expansion_factor=expansion_factor, n_heads=n_heads)

    def make_trg_mask(self, trg):
        """
        Args:
            trg: target sequence
        Returns:
            trg_mask: target mask
        """
        batch_size, trg_len = trg.shape
        # returns the lower triangular part of matrix filled with ones
        trg_mask = torch.tril(torch.ones((trg_len, trg_len))).expand(
            batch_size, 1, trg_len, trg_len
        )
        return trg_mask

    def decode(self, src, trg):
        """
        for inference
        Args:
            src: input to encoder
            trg: input to decoder
        out:
            out_labels : returns final prediction of sequence
        """
        trg_mask = self.make_trg_mask(trg)
        enc_out = self.encoder(src)
        out_labels = []
        batch_size, seq_len = src.shape[0], src.shape[1]
        # outputs = torch.zeros(seq_len, batch_size, self.target_vocab_size)
        out = trg
        for i in range(seq_len):  # 10
            out = self.decoder(out, enc_out, trg_mask)  # bs x seq_len x vocab_dim
            # taking the last token
            out = out[:, -1, :]

            out = out.argmax(-1)
            out_labels.append(out.item())
            out = torch.unsqueeze(out, axis=0)

        return out_labels

    def forward(self, src, trg):
        """
        Args:
            src: input to encoder
            trg: input to decoder
        out:
            out: final vector which returns probabilities of each target word
        """
        trg_mask = self.make_trg_mask(trg)
        enc_out = self.encoder(src)

        outputs = self.decoder(trg, enc_out, trg_mask)
        return outputs

if __name__ == '__main__':
    src_vocab_size = 11
    target_vocab_size = 11
    num_layers = 6
    seq_length = 12

    # let 0 be sos token and 1 be eos token
    src = torch.tensor([[0, 2, 5, 6, 4, 3, 9, 5, 2, 9, 10, 1],
                        [0, 2, 8, 7, 3, 4, 5, 6, 7, 2, 10, 1]])
    target = torch.tensor([[0, 1, 7, 4, 3, 5, 9, 2, 8, 10, 9, 1],
                           [0, 1, 5, 6, 2, 4, 7, 6, 2, 8, 10, 1]])

    print(src.shape, target.shape)
    model = Transformer(embed_dim=512, src_vocab_size=src_vocab_size,
                        target_vocab_size=target_vocab_size, seq_length=seq_length,
                        num_layers=num_layers, expansion_factor=4, n_heads=8)
    print(model)

 

실행 결과

torch.Size([2, 12]) torch.Size([2, 12])

Transformer(
  (encoder): TransformerEncoder(
    (embedding_layer): Embedding(
      (embed): Embedding(11, 512)
    )
    (positional_encoder): PositionalEmbedding()
    (layers): ModuleList(
      (0-5): 6 x TransformerBlock(
        (attention): MultiHeadAttention(
          (query_matrix): Linear(in_features=64, out_features=64, bias=False)
          (key_matrix): Linear(in_features=64, out_features=64, bias=False)
          (value_matrix): Linear(in_features=64, out_features=64, bias=False)
          (out): Linear(in_features=512, out_features=512, bias=True)
        )
        (norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        (feed_forward): Sequential(
          (0): Linear(in_features=512, out_features=2048, bias=True)
          (1): ReLU()
          (2): Linear(in_features=2048, out_features=512, bias=True)
        )
        (dropout1): Dropout(p=0.2, inplace=False)
        (dropout2): Dropout(p=0.2, inplace=False)
      )
    )
  )
  (decoder): TransformerDecoder(
    (word_embedding): Embedding(11, 512)
    (position_embedding): PositionalEmbedding()
    (layers): ModuleList(
      (0-5): 6 x DecoderBlock(
        (attention): MultiHeadAttention(
          (query_matrix): Linear(in_features=64, out_features=64, bias=False)
          (key_matrix): Linear(in_features=64, out_features=64, bias=False)
          (value_matrix): Linear(in_features=64, out_features=64, bias=False)
          (out): Linear(in_features=512, out_features=512, bias=True)
        )
        (norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        (dropout): Dropout(p=0.2, inplace=False)
        (transformer_block): TransformerBlock(
          (attention): MultiHeadAttention(
            (query_matrix): Linear(in_features=64, out_features=64, bias=False)
            (key_matrix): Linear(in_features=64, out_features=64, bias=False)
            (value_matrix): Linear(in_features=64, out_features=64, bias=False)
            (out): Linear(in_features=512, out_features=512, bias=True)
          )
          (norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
          (norm2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
          (feed_forward): Sequential(
            (0): Linear(in_features=512, out_features=2048, bias=True)
            (1): ReLU()
            (2): Linear(in_features=2048, out_features=512, bias=True)
          )
          (dropout1): Dropout(p=0.2, inplace=False)
          (dropout2): Dropout(p=0.2, inplace=False)
        )
      )
    )
    (fc_out): Linear(in_features=512, out_features=11, bias=True)
    (dropout): Dropout(p=0.2, inplace=False)
  )
)

 

이것으로 Transformer (Attention is all you need) 설명을 마치겠습니다.