Transformer

Transformer

1. 简介

  • Transformer是Google研究人员在2017年提出的一种大模型架构。

  • Transformer是RNN的一个改进,

  • Transformer的核心思想是通过注意力机制为模型增加对句子各个token的上下文理解能力

2. 架构

Transformer由四个基本单元组成:

  • 词嵌入

  • 位置编码

  • 编码器单元

  • 解码器单元

3. 组成

3.1 词嵌入(Word Embed)

  • 词嵌入(Word Embed)的目的是将输入的token映射到多维的向量空间中,从而通过后续的训练

  • 将输入的文本转化为词嵌入向量(Word Embed Vector)

  • 词嵌入的方法是将输入token与词嵌入矩阵相乘

$$ X_e^{[n, d]}=F_e(X_i^{[n]}) $$

3.2 位置编码(Position Encoding)

  • 位置编码的目的是为词嵌入向量增加位置编码,从而使词嵌入向量包含位置信息,更准确反应句子的完整语义
  • 位置编码主要方法为:将和词向量同样维度的位置编码矩阵直接相加;

$$ X^{[n, d]} = PE^{[n, d]} + X_e^{[n, d]} $$

位置编码矩阵

  • transformer中用到的位置编码是旋转位置编码(ROPE)

$$ PE(pos, k) = \begin{cases} sin({\frac {pos} {10000^{\frac {2i}{d_m}}}}) & (k = 2i) \
cos({\frac {pos} {10000^{\frac {2i}{d_m}}}}) & (k = 2i+1) \
\end{cases} $$

1
2
3
4
5
6
## 依赖库
import math
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 位置编码的实现
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()

        # 计算位置编码
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)     # position.size: (max_len, 1)
        #print(position.shape)
        '''
          pos / 10000^(2i/d_m)
        = pos * 10000^(-2i/d_m)
        = pos * e^( -2i / d_m * log(10000))
        = pos * e^(2i * -log(10000) / d_m)
        '''

        div_term = torch.exp(
            torch.arange(0, d_model, 2).float() * 
            (-torch.log(torch.tensor(10000.0)) / d_model)
        )
        #print(div_term.shape)
        # div_term.size: (d_model/2)

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return x

3.3 编码器(Encoder)

  • 编码器由多个相同的编码器单元块(Block)串联而成;

  • 编码器单元的输入为:位置编码层输出,输出为:

  • 单个编码器块(Block)包含2个部分:

    - 注意力层

   - 前馈神经网络

注意力层

  • 注意力层是用来理解句子各个token在当前上下文中的语义信息的神经网络层;

  • 其基本思想是输入token在句子中的相似度(包含位置编码信息);

  • 实现时,一般使用多头注意力(Multi Head Attention)来建立对输入的多个维度语义的理解;

$$ Q^{[n, d]} = X_q^{[n, d]} * W_q^{[d, d]} \
K^{[n, d]} = X_k^{[n, d]} * W_k^{[d, d]} \
V^{[n, d]} = X_v^{[n, d]} * W_v^{[d, d]} \
X = Atten(X_q,X_k,X_v) = softmax(\frac{Q*K^T}{\sqrt{D_m}}) * V
$$

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# 多头注意力的代码实现
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        assert d_model % num_heads == 0
        self.depth = d_model // num_heads

        # 查询、键和值的线性投影
        self.query_linear = nn.Linear(d_model, d_model)
        self.key_linear = nn.Linear(d_model, d_model)
        self.value_linear = nn.Linear(d_model, d_model)

        # 输出线性投影
        self.output_linear = nn.Linear(d_model, d_model)

    def split_heads(self, x):
      batch_size, seq_length, d_model = x.size()
      return x.view(batch_size, seq_length, self.num_heads, self.depth).transpose(1, 2)

    def forward(self, query, key, value, mask=None):
        # 线性投影
        query = self.query_linear(query)
        key = self.key_linear(key)
        value = self.value_linear(value)

        # 分割头部
        split_query = self.split_heads(query)
        split_key = self.split_heads(key)
        split_value = self.split_heads(value)

        # 缩放点积注意力
        scores = torch.matmul(split_query, split_key.transpose(-2, -1)) / math.sqrt(self.depth)

        # 如果提供了掩码,则应用掩码
        if mask is not None:
            scores += scores.masked_fill(mask == 0, -1e9)

        # 计算注意力权重并应用softmax
        attention_weights = torch.softmax(scores, dim=-1)

        # 应用注意力到值
        attention_output = torch.matmul(attention_weights, split_value)

        # 合并头部
        batch_size, _, seq_length, _ = attention_output.size()
        attention_output = attention_output.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)

        # 线性投影
        attention_output = self.output_linear(attention_output)

        return attention_output

前馈神经网络层

  • 前馈神经网络层是传统的神经网络隐藏层

$$ X = ffn1(X) = W_{ff1}^{[N*d, d]} * X^{[]} \
X = leru(X) = F_{active}(X) \
X = ffn2(X) = W_{ff2}^{[d_{l2}, N*d]} * X^{[]} \
X = ffn_e(X) = ffn2(leru(ffn1(X))) $$

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# 前馈网络的代码实现
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(FeedForward, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model),
        )

    def forward(self, x):
        return self.net(x)

编码器单元

  • 编码器单元

$$ X = LayerNorm1(X + dropout(Atten(X))) \
X = LayerNorm2(X + dropout(ffn(X))) $$

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
# 编码器
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):

        # 自注意力层
        attention_output= self.self_attention(x, x, x, None)
        x = self.norm1(x + self.dropout(attention_output))

        # 前馈层
        feed_forward_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(feed_forward_output))

        return x

编码器

  • 解码器是由N个解码器单元串联而成

$$ X_{eo} = EncBlock_N(EncBlock_{N-1}(..EncBlock_1(X))) $$

3.4 解码器(Decoder)

  • 解码器编码器基本结构类似,都包含了注意力层和前馈神经网络层;

  • 不同之处:

    • 解码器的注意力层是包含掩码(Mask)的因果注意力机制,以表示输出逐个生成的特点;

    • 解码器的在因果注意力层后增加了一个使用编码器作为K,V输入的注意力层,用来利用编码器一训练过的语义信息;

带掩码的注意力层

  • 带mask的自注意力层

$$ Y = W_{mask} + Y \
Q^{[n, d]} = Y_q^{[n, d]} * W_q^{[d, d]} \
K^{[n, d]} = Y_k^{[n, d]} * W_k^{[d, d]} \
V^{[n, d]} = Y_v^{[n, d]} * W_v^{[d, d]} \
Y = Atten(Y_q, Y_k, Y_v) = softmax(\frac{Q*K^T}{\sqrt{D_m}}) * V $$

带编码器输出的注意力层

带解码器输出的自注意力层

$$ Y = W_{mask} + Y Q^{[n, d]} = Y_q^{[n, d]} * W_q^{[d, d]} \
K^{[n, d]} = X_k^{[n, d]} * W_k^{[d, d]} \
V^{[n, d]} = X_v^{[n, d]} * W_v^{[d, d]} \ Y = Atten(Y_q, X_{eo}, X_{eo}) = softmax(\frac{Q*K^T}{\sqrt{D_m}}) * V $$

解码器前馈神经网络层

  • 解码器前馈神经网络层和编码器一样

$$ Y = ffn1(Y) = W_{ff1}^{[n, d_l]} * Y^{[]} \
Y = leru(Y) = F_{active}(X) \
Y = ffn2(Y) = W_{ff2}^{[d_{l2}, n]} * Y^{[]} \
Y = ffn_e(X) = ffn2(leru(ffn1(Y))) $$

$$ Y = LayerNorm1(X + dropout(Atten(Y))) \
Y = LayerNorm2(X + dropout(ffn(Y)))
$$

解码器单元

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        self.masked_self_attention = MultiHeadAttention(d_model, num_heads)
        self.enc_dec_attention = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, encoder_output, src_mask, tgt_mask):
        # 带掩码的自注意力层
        self_attention_output= self.masked_self_attention(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(self_attention_output))

        # 编码器-解码器注意力层
        enc_dec_attention_output= self.enc_dec_attention(x, encoder_output, encoder_output, src_mask)
        x = self.norm2(x + self.dropout(enc_dec_attention_output))

        # 前馈层
        ff_output = self.feed_forward(x)
        x = self.norm3( x + self.dropout(ff_output))

        return x

解码器层

4. 实现

将上面的各个部分组合起来,得到transformer的完整实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# TRANSFORMER的实现
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff,
    max_len, dropout):
        super(Transformer, self).__init__()

        # 定义编码器和解码器的词嵌入层
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)

        # 定义位置编码层
        self.positional_encoding = PositionalEncoding(d_model, max_len)

        # 定义编码器和解码器的多层堆叠
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

        # 定义线性层
        self.out_linear = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)

    # 生成掩码
    def generate_mask(self, src, tgt):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
        seq_length = tgt.size(1)
        nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
        tgt_mask = tgt_mask & nopeak_mask
        return src_mask, tgt_mask

    # 前向传播
    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src, tgt)

        # 编码器输入的词嵌入和位置编码
        encoder_embedding = self.encoder_embedding(src)
        en_positional_encoding = self.positional_encoding(encoder_embedding)
        src_embedded = self.dropout(en_positional_encoding)

        # 解码器输入的词嵌入和位置编码
        decoder_embedding = self.decoder_embedding(tgt)
        de_positional_encoding = self.positional_encoding(decoder_embedding)
        tgt_embedded = self.dropout(de_positional_encoding)

        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output)

        dec_output = tgt_embedded
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

        output = self.out_linear(dec_output)
        return output

4.2 训练

transformer的训练步骤:

  1. 定义损失函数

  2. 设置优化器

  3. 准备训练数据

  4. 输入顺利数据,前向传播,将输入从前往后经过神经网络,得到本次预测输出

  5. 使用损失函数计算预测输出和训练数据中的参照输出数据的损失

  6. 反向传播梯度,根据优化器从后向前依次更新神经网络中各层的参数

  7. 重复步骤4-6,直到损失函数到达目标值或循环次数达到预设的次数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
## transformer训练过程

### 1.定义损失函数
criterion = nn.CrossEntropyLoss(ignore_index=0)

### 2.设置优化器
optimizer = optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

### 3. 准备数据
src_data = torch.randint(1, src_vocab_size, (5, max_len))  # (batch_size, seq_length)
tgt_data = torch.randint(1, tgt_vocab_size, (5, max_len))  # (batch_size, seq_length)

### 4. 训练循环
transformer.train()
for epoch in range(100):
    optimizer.zero_grad()
    output = transformer(src_data, tgt_data[:, :-1])
    loss = criterion(output.contiguous().view(-1, tgt_vocab_size), 
                     tgt_data[:, 1:].contiguous().view(-1))
    loss.backward()
    optimizer.step()
    print(f"第 {epoch+1} 轮:损失= {loss.item():.4f}")

### 5. 评估
transformer.eval()
with torch.no_grad():
    output = transformer(src_data, tgt_data[:, :-1])
    loss = criterion(output.contiguous().view(-1, tgt_vocab_size), tgt_data[:, 1:]
    .contiguous().view(-1))
    print(f"\n虚拟数据的评估损失= {loss.item():.4f}")

4.3 推理

5. 参考

  1. https://zhuanlan.zhihu.com/p/454482273
  2. https://github.com/datawhalechina/learn-nlp-with-transformers/blob/main/docs/%E7%AF%87%E7%AB%A02-Transformer%E7%9B%B8%E5%85%B3%E5%8E%9F%E7%90%86/2.2-%E5%9B%BE%E8%A7%A3transformer.md
  3. https://www.cnblogs.com/chenhuabin/p/16453665.html
  4. https://skylyj.github.io/transformer/
  5. 一文彻底搞懂Transformer