Transformer由四个基本单元组成:

$$
X_e^{[n, d]}=F_e(X_i^{[n]})
$$
- 位置编码的目的是为词嵌入向量增加位置编码,从而使词嵌入向量包含位置信息,更准确反应句子的完整语义
- 位置编码主要方法为:将和词向量同样维度的位置编码矩阵直接相加;
$$
X^{[n, d]} = PE^{[n, d]} + X_e^{[n, d]}
$$
- transformer中用到的位置编码是旋转位置编码(ROPE)
$$
PE(pos, k) = \begin{cases}
sin({\frac {pos} {10000^{\frac {2i}{d_m}}}}) & (k = 2i) \
cos({\frac {pos} {10000^{\frac {2i}{d_m}}}}) & (k = 2i+1) \
\end{cases}
$$
1
2
3
4
5
6
| ## 依赖库
import math
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| # 位置编码的实现
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
super(PositionalEncoding, self).__init__()
# 计算位置编码
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # position.size: (max_len, 1)
#print(position.shape)
'''
pos / 10000^(2i/d_m)
= pos * 10000^(-2i/d_m)
= pos * e^( -2i / d_m * log(10000))
= pos * e^(2i * -log(10000) / d_m)
'''
div_term = torch.exp(
torch.arange(0, d_model, 2).float() *
(-torch.log(torch.tensor(10000.0)) / d_model)
)
#print(div_term.shape)
# div_term.size: (d_model/2)
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, x):
x = x + self.pe[:, :x.size(1)]
return x
|
- 注意力层
- 前馈神经网络
注意力层是用来理解句子各个token在当前上下文中的语义信息的神经网络层;
其基本思想是输入token在句子中的相似度(包含位置编码信息);
实现时,一般使用多头注意力(Multi Head Attention)来建立对输入的多个维度语义的理解;
$$
Q^{[n, d]} = X_q^{[n, d]} * W_q^{[d, d]} \
K^{[n, d]} = X_k^{[n, d]} * W_k^{[d, d]} \
V^{[n, d]} = X_v^{[n, d]} * W_v^{[d, d]} \
X = Atten(X_q,X_k,X_v) = softmax(\frac{Q*K^T}{\sqrt{D_m}}) * V
$$
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
| # 多头注意力的代码实现
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads):
super(MultiHeadAttention, self).__init__()
self.num_heads = num_heads
self.d_model = d_model
assert d_model % num_heads == 0
self.depth = d_model // num_heads
# 查询、键和值的线性投影
self.query_linear = nn.Linear(d_model, d_model)
self.key_linear = nn.Linear(d_model, d_model)
self.value_linear = nn.Linear(d_model, d_model)
# 输出线性投影
self.output_linear = nn.Linear(d_model, d_model)
def split_heads(self, x):
batch_size, seq_length, d_model = x.size()
return x.view(batch_size, seq_length, self.num_heads, self.depth).transpose(1, 2)
def forward(self, query, key, value, mask=None):
# 线性投影
query = self.query_linear(query)
key = self.key_linear(key)
value = self.value_linear(value)
# 分割头部
split_query = self.split_heads(query)
split_key = self.split_heads(key)
split_value = self.split_heads(value)
# 缩放点积注意力
scores = torch.matmul(split_query, split_key.transpose(-2, -1)) / math.sqrt(self.depth)
# 如果提供了掩码,则应用掩码
if mask is not None:
scores += scores.masked_fill(mask == 0, -1e9)
# 计算注意力权重并应用softmax
attention_weights = torch.softmax(scores, dim=-1)
# 应用注意力到值
attention_output = torch.matmul(attention_weights, split_value)
# 合并头部
batch_size, _, seq_length, _ = attention_output.size()
attention_output = attention_output.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
# 线性投影
attention_output = self.output_linear(attention_output)
return attention_output
|
$$
X = ffn1(X) = W_{ff1}^{[N*d, d]} * X^{[]} \
X = leru(X) = F_{active}(X) \
X = ffn2(X) = W_{ff2}^{[d_{l2}, N*d]} * X^{[]} \
X = ffn_e(X) = ffn2(leru(ffn1(X)))
$$
1
2
3
4
5
6
7
8
9
10
11
12
| # 前馈网络的代码实现
class FeedForward(nn.Module):
def __init__(self, d_model, d_ff):
super(FeedForward, self).__init__()
self.net = nn.Sequential(
nn.Linear(d_model, d_ff),
nn.ReLU(),
nn.Linear(d_ff, d_model),
)
def forward(self, x):
return self.net(x)
|
$$
X = LayerNorm1(X + dropout(Atten(X))) \
X = LayerNorm2(X + dropout(ffn(X)))
$$
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| # 编码器
class EncoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout):
super(EncoderLayer, self).__init__()
self.self_attention = MultiHeadAttention(d_model, num_heads)
self.feed_forward = FeedForward(d_model, d_ff)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
# 自注意力层
attention_output= self.self_attention(x, x, x, None)
x = self.norm1(x + self.dropout(attention_output))
# 前馈层
feed_forward_output = self.feed_forward(x)
x = self.norm2(x + self.dropout(feed_forward_output))
return x
|
$$
X_{eo} = EncBlock_N(EncBlock_{N-1}(..EncBlock_1(X)))
$$
$$
Y = W_{mask} + Y \
Q^{[n, d]} = Y_q^{[n, d]} * W_q^{[d, d]} \
K^{[n, d]} = Y_k^{[n, d]} * W_k^{[d, d]} \
V^{[n, d]} = Y_v^{[n, d]} * W_v^{[d, d]} \
Y = Atten(Y_q, Y_k, Y_v) = softmax(\frac{Q*K^T}{\sqrt{D_m}}) * V
$$
带解码器输出的自注意力层
$$
Y = W_{mask} + Y
Q^{[n, d]} = Y_q^{[n, d]} * W_q^{[d, d]} \
K^{[n, d]} = X_k^{[n, d]} * W_k^{[d, d]} \
V^{[n, d]} = X_v^{[n, d]} * W_v^{[d, d]} \
Y = Atten(Y_q, X_{eo}, X_{eo}) = softmax(\frac{Q*K^T}{\sqrt{D_m}}) * V
$$
$$
Y = ffn1(Y) = W_{ff1}^{[n, d_l]} * Y^{[]} \
Y = leru(Y) = F_{active}(X) \
Y = ffn2(Y) = W_{ff2}^{[d_{l2}, n]} * Y^{[]} \
Y = ffn_e(X) = ffn2(leru(ffn1(Y)))
$$
$$
Y = LayerNorm1(X + dropout(Atten(Y))) \
Y = LayerNorm2(X + dropout(ffn(Y)))
$$
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| class DecoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout):
super(DecoderLayer, self).__init__()
self.masked_self_attention = MultiHeadAttention(d_model, num_heads)
self.enc_dec_attention = MultiHeadAttention(d_model, num_heads)
self.feed_forward = FeedForward(d_model, d_ff)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, encoder_output, src_mask, tgt_mask):
# 带掩码的自注意力层
self_attention_output= self.masked_self_attention(x, x, x, tgt_mask)
x = self.norm1(x + self.dropout(self_attention_output))
# 编码器-解码器注意力层
enc_dec_attention_output= self.enc_dec_attention(x, encoder_output, encoder_output, src_mask)
x = self.norm2(x + self.dropout(enc_dec_attention_output))
# 前馈层
ff_output = self.feed_forward(x)
x = self.norm3( x + self.dropout(ff_output))
return x
|
将上面的各个部分组合起来,得到transformer的完整实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
| # TRANSFORMER的实现
class Transformer(nn.Module):
def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff,
max_len, dropout):
super(Transformer, self).__init__()
# 定义编码器和解码器的词嵌入层
self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
# 定义位置编码层
self.positional_encoding = PositionalEncoding(d_model, max_len)
# 定义编码器和解码器的多层堆叠
self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
# 定义线性层
self.out_linear = nn.Linear(d_model, tgt_vocab_size)
self.dropout = nn.Dropout(dropout)
# 生成掩码
def generate_mask(self, src, tgt):
src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
seq_length = tgt.size(1)
nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
tgt_mask = tgt_mask & nopeak_mask
return src_mask, tgt_mask
# 前向传播
def forward(self, src, tgt):
src_mask, tgt_mask = self.generate_mask(src, tgt)
# 编码器输入的词嵌入和位置编码
encoder_embedding = self.encoder_embedding(src)
en_positional_encoding = self.positional_encoding(encoder_embedding)
src_embedded = self.dropout(en_positional_encoding)
# 解码器输入的词嵌入和位置编码
decoder_embedding = self.decoder_embedding(tgt)
de_positional_encoding = self.positional_encoding(decoder_embedding)
tgt_embedded = self.dropout(de_positional_encoding)
enc_output = src_embedded
for enc_layer in self.encoder_layers:
enc_output = enc_layer(enc_output)
dec_output = tgt_embedded
for dec_layer in self.decoder_layers:
dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)
output = self.out_linear(dec_output)
return output
|
transformer的训练步骤:
定义损失函数
设置优化器
准备训练数据
输入顺利数据,前向传播,将输入从前往后经过神经网络,得到本次预测输出
使用损失函数计算预测输出和训练数据中的参照输出数据的损失
反向传播梯度,根据优化器从后向前依次更新神经网络中各层的参数
重复步骤4-6,直到损失函数到达目标值或循环次数达到预设的次数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
| ## transformer训练过程
### 1.定义损失函数
criterion = nn.CrossEntropyLoss(ignore_index=0)
### 2.设置优化器
optimizer = optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)
### 3. 准备数据
src_data = torch.randint(1, src_vocab_size, (5, max_len)) # (batch_size, seq_length)
tgt_data = torch.randint(1, tgt_vocab_size, (5, max_len)) # (batch_size, seq_length)
### 4. 训练循环
transformer.train()
for epoch in range(100):
optimizer.zero_grad()
output = transformer(src_data, tgt_data[:, :-1])
loss = criterion(output.contiguous().view(-1, tgt_vocab_size),
tgt_data[:, 1:].contiguous().view(-1))
loss.backward()
optimizer.step()
print(f"第 {epoch+1} 轮:损失= {loss.item():.4f}")
### 5. 评估
transformer.eval()
with torch.no_grad():
output = transformer(src_data, tgt_data[:, :-1])
loss = criterion(output.contiguous().view(-1, tgt_vocab_size), tgt_data[:, 1:]
.contiguous().view(-1))
print(f"\n虚拟数据的评估损失= {loss.item():.4f}")
|
- https://zhuanlan.zhihu.com/p/454482273
- https://github.com/datawhalechina/learn-nlp-with-transformers/blob/main/docs/%E7%AF%87%E7%AB%A02-Transformer%E7%9B%B8%E5%85%B3%E5%8E%9F%E7%90%86/2.2-%E5%9B%BE%E8%A7%A3transformer.md
- https://www.cnblogs.com/chenhuabin/p/16453665.html
- https://skylyj.github.io/transformer/
- 一文彻底搞懂Transformer