BiribiriBird's Gallery.

动手学深度学习 · Part4 现代序列模型与机器翻译实践

字数统计: 5k阅读时长: 24 min
2024/08/20
loading

现代序列模型与机器翻译实践

动手学深度学习v2 - https://zh-v2.d2l.ai/

个人评价是需要有一点基础

  • Pytorch 小土堆 先把Pytorch基础看一下
  • 李宏毅2022春机器学习
    • 理论部分更推荐李宏毅或者吴恩达,会更好理解
    • 我的策略是理论在李宏毅这里补,作业不做,在李沐这里实操一下代码

本文不会放太多理论的东西

记录一下代码实操即可

理论请移步李宏毅课程的相关笔记

门控循环单元GRU

并不是每个细节都值得关注

随着喂入序列的变长,序列开头的影响占比会变小

但很有可能序列开头存在重要的关键词

因此我们希望我们的网络能够对序列的不同部分,有所侧重、关注、选择

  • 存储:早期重要信息
  • 跳过:无用信息(网页文章的html代码)
  • 重置:书的章节之间的逻辑中断

image-20240815015651031

Rt=σ(XtWxr+Ht1Whr+br)Zt=σ(XtWxz+Ht1Whz+bz)R_t = \sigma( X_tW_{xr} +H_{t-1}W_{hr} + b_r ) \\ Z_t = \sigma( X_tW_{xz} +H_{t-1}W_{hz} + b_z )

  • 利用sigmoid函数,全连接层通过输入、隐状态,预测出门的值(0-1之间)

候选隐状态 <- 重置门

正常情况下,隐状态的计算:

Ht=ϕ(XtWxh+Ht1Whh+bh)H_t =\phi(X_tW_{xh} + H_{t-1}W_{hh} + b_h)

我们希望引入RtR_t,对状态进行重置

  • 设定激活函数为tanh\tanh,确保候选隐状态值在(-1,1)内
  • 使用Hadamard积(矩阵元素对应相乘):RtHt1R_t \odot H_{t-1}
    • Rt0R_t \to 0,此时HtH_t只由当前输入决定。相当于重置了隐状态为一开始的默认值,从头开始
    • Rt1R_t \to 1,正常的循环神经网络,隐状态照常保存

综上,定义候选隐状态为:

Ht~=tanh(XtWxh+(RtHt1)Whh+bh)\tilde{H_t} =\tanh(X_tW_{xh} + (R_t \odot H_{t-1})W_{hh} + b_h)

image-20240815021155828

隐状态 <- 更新门

上文中,我们计算得到的是候选隐状态

但是如果当前的文本并不让我们感到有意义,我们需要跳过这部分

也就是基本不会修改,直接沿用之前的隐状态

反之,我们希望将当前值更新为最新的隐状态

引入更新门:

Ht=ZtHt1+(1Zt)Ht~H_t = Z_t\odot H_{t- 1} + (1-Z_t)\odot \tilde{H_t}

  • Zt0Z_t \to 0,相当于完全使用当前新的隐状态
  • Zt1Z_t \to 1,直接沿用之前的隐状态

image-20240815021600369

总结:

  • 重置门:有助于捕获序列的短期依赖关系
  • 更新门:有助于捕获序列的长期依赖关系

代码

RNN换成GRU即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class RNNModel(nn.Module):
def __init__(self, vocab, **kwargs):

super(RNNModel, self).__init__(**kwargs)

self.vocab_size = len(vocab)
self.num_hiddens = 256

self.rnn = nn.GRU( ##### 唯一区别 ############
input_size = self.vocab_size, # 输入数据的维度(28种字符)
hidden_size = self.num_hiddens, # 隐藏层维度
num_layers = 1 # 隐藏层层数
)

# 输出层 由rnn的隐藏层 预测-> 每一个vocab的概率
self.linear = nn.Linear(self.num_hiddens, self.vocab_size)

image-20240815023150171

  • 相比RNN,困惑度整体会变更低

长短期记忆网络LSTM

设计上比GRU更加复杂,但是早了20年

image-20240815200752066

  • 输入门:

It=σ(XtWxi+Ht1Whi+bi)I_t = \sigma(X_tW_{xi} + H_{t_1}W_{hi} + b_i)

  • 遗忘门:

Ft=σ(XtWxf+Ht1Whf+bf)F_t = \sigma(X_tW_{xf} + H_{t_1}W_{hf} + b_f)

  • 输出门:

Ot=σ(XtWxo+Ht1Who+bo)O_t = \sigma(X_tW_{xo} + H_{t_1}W_{ho} + b_o)

通过sigmoid激活函数,三个门的值都在(0,1)内

候选记忆元

使用tanh\tanh做激活函数,取值[-1,1]

image-20240815201321767

Ct~=tanh(XtWxc+Ht1Whc+bc)\tilde{C_t} = \tanh(X_tW_{xc}+H_{t-1}W_{hc}+b_c)

记忆元

记忆元主要来自两个部分:

  • 过去的记忆:即Ct1C_{t-1},由遗忘门FtF_t控制保留多少过去的记忆
  • 新的记忆:即当前输入带来的候选记忆元Ct~\tilde{C_t},由输入门控制引入多少

则:

Ct=FtCt1+ItCt~C_t = F_t\odot C_{t-1}+I_t\odot \tilde{C_t}

image-20240815202050099

这种机制有助于模型记录下非常久远以前的记忆

某种程度上缓解了梯度消失,捕获长距离依赖关系

隐状态

Ht=Ottanh(Ct)H_t = O_t\odot\tanh(C_t)

  • 确保隐状态仍然在[1,1][-1,1]
  • 输出门接近1:完整保留记忆作为隐状态
  • 输出门接近0:隐状态被重置,只保留了记忆信息

image-20240815202726087

深度RNN

前一篇讲过了(

其实就是隐状态由单个全连接层,变成多个隐藏层

image-20240815203100752

双向RNN

普通的RNN只能考虑到上文,无法考虑到下文:

1
2
3
4
5
6
7
'''
我___。

我___饿了。

我___饿了,我可以吃半头猪。
'''

image-20240815205000491

将隐状态分为正向隐状态、反向隐状态

对于正向隐状态:

image-20240815205044213

Ht=ϕ(XtWxhfront+Ht1Whhfront+bhfront)\overrightarrow{H_t}= \phi( X_tW_{xh}^{\text{front}} + \overrightarrow{H}_{t-1}W_{hh}^{\text{front}}+b_h^{\text{front}})

  • 由输入、上一个前向隐状态得到

对于反向隐状态:

image-20240815205315871

Ht=ϕ(XtWxhback+Ht+1Whhback+bhback)\overleftarrow{H_t}= \phi( X_tW_{xh}^{\text{back}} + \overleftarrow{H}_{t+1}W_{hh}^{\text{back}}+b_h^{\text{back}})

  • 由输入、后一个反向隐状态得到

对于输出,我们把Ht,Ht\overrightarrow{H_t},\overleftarrow{H_t}合并成HtH_t(矩阵连起来)

Ot=HTWhq+bqO_t = H_TW_{hq} + b_q

image-20240815205543222

代价

  • 计算速度非常慢,计算链条很长

  • 需要存储的内存非常大

  • 用处有限

    • 填充缺失单词、词元、注释
    • 机器翻译

机器翻译

数据集下载

数据读入与处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
'''
Go. Va !

Hi. Salut !

Run! Cours !

Run! Courez !

Who? Qui ?

Wow! Ça alors !

Fire! Au feu !
'''
  • 每行两个字符串,前者是英文,后者是法语,使用\t隔开
  • 文本中含有一些不间断空格、不可见空格(\u202f\xa0),我们需要替换成普通空格
  • 将大写字母转化成小写字母,简化数据
1
"go away.	fous le camp !"

我们希望最后切分成多个词元列表,因此需要处理字符串:

  • 单词、标点符号之间需要有空格
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def filter(s):
out = []
s = s.strip()
for i,char in enumerate(s):
if i > 0 and char != ' ':
if (not s[i-1].isalpha()) and char.isalpha():
out.append(' ' + char)
elif char in set('.,?!') : out.append(' ' + char)
else: out.append(char)
elif char != ' ': out.append(char)
return ''.join(out)



def read_data_fra(num_samples=None):

with open('data/fra.txt') as f:
# 替换字符
source_txt = f.read().replace('\u202f', ' ').replace('\xa0', ' ').lower()
lines = source_txt.split('\n') # 切分单个数据

# 每个词元前后添加空格
lines = [filter(line) for line in lines]
source, target = [], []

for i, line in enumerate(lines):

# 自定义数量
if num_samples and i > num_samples:
break

part = line.split('\t')
if len(part) == 2:
# 词元化
source.append(part[0].strip().split(' '))
target.append(part[1].strip().split(' '))

return source, target


source, target = read_data_fra()
print( source[:5])
print( target[:5])

'''
[['go', '.'], ['hi', '.'], ['run', '!'], ['run', '!'], ['who', '?']]
[['va', '!'], ['salut', '!'], ['cours', '!'], ['courez', '!'], ['qui', '?']]
'''

我们可以绘制图表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import matplotlib.pyplot as plt
from collections import Counter

source_counts = Counter([len(sentence) for sentence in source])
target_counts = Counter([len(sentence) for sentence in target])

# 计算每个柱子的宽度
width = 0.35

# 绘制直方图
plt.bar(source_counts.keys(), source_counts.values(), width=width, color='blue', label='Source')
plt.bar([x + width for x in target_counts.keys()], target_counts.values(), width=width, color='red', label='Target')

# 添加图例和标签
plt.legend()
plt.xlabel('Words')
plt.ylabel('Counts')

# 显示图形
plt.show()

image-20240816165130121

  • 大部分句子的词元数量都不超过20,主要集中在10

接下来我们需要构建词表

沿用的是之前RNN的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import collections

def count_corpus(tokens): #@save
"""统计词元的频率"""
# 这里的tokens是1D列表或2D列表
if len(tokens) == 0 or isinstance(tokens[0], list):
# 将2D词元列表展平成一个1D列表
tokens = [token for line in tokens for token in line]
# 使用collections.Counter统计词元的频率
return collections.Counter(tokens)

class Vocab: #@save
"""文本词表"""
def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
# 如果没有提供词元列表,则使用空列表
if tokens is None:
tokens = []
# 如果没有提供保留词元列表,则使用空列表
if reserved_tokens is None:
reserved_tokens = []
# 统计词元的频率并按频率降序排序
counter = count_corpus(tokens)
self._token_freqs = sorted(counter.items(), key=lambda x: x[1],
reverse=True)
# 初始化词表,未知词元的索引为0
self.idx_to_token = ['<unk>'] + reserved_tokens
# 创建从词元到索引的映射字典
self.token_to_idx = {token: idx
for idx, token in enumerate(self.idx_to_token)}
# 遍历按频率排序后的词元和对应频率
for token, freq in self._token_freqs:
# 如果词元的频率小于最小频率,则停止添加
if freq < min_freq:
break
# 如果词元不在词表中,则将其添加到词表并更新映射字典
if token not in self.token_to_idx:
self.idx_to_token.append(token)
self.token_to_idx[token] = len(self.idx_to_token) - 1

def __len__(self):
# 返回词表的长度
return len(self.idx_to_token)

def __getitem__(self, tokens):
# 如果输入的是单个词元,返回其对应的索引
if not isinstance(tokens, (list, tuple)): #不是列表或元组
return self.token_to_idx.get(tokens, self.unk) # 字典中不存则自动返回unk
# 如果输入的是一个词元列表,递归获取每个词元的索引
return [self.__getitem__(token) for token in tokens]

def to_tokens(self, indices):
# 如果输入的是单个索引,返回其对应的词元
if not isinstance(indices, (list, tuple)):
return self.idx_to_token[indices]
# 如果输入的是一个索引列表,递归获取每个索引对应的词元
return [self.idx_to_token[index] for index in indices]

@property
def unk(self): # 未知词元的索引为0
# 返回未知词元的索引
return 0

@property
def token_freqs(self):
# 返回词元及其对应频率的列表
return self._token_freqs

# 将出现次数少于2的词元过滤为unk
# 引入填充词元<pad> 开始词元 结束词元
src_vocab = Vocab(source, min_freq=2, reserved_tokens=['<pad>', '<bos>', '<eos>'])
len(src_vocab)

在序列模型中,我们会使用定长序列进行训练

但一般数据是不定长的:

  • 超长度,截断
  • 长度不足,填充<pad>
1
2
3
4
5
6
7
8
9
10
11
def truncate_pad(src, padding_token, num_steps):
# 超过长度则截断
if (len(src) > num_steps): return src[:num_steps]

# 长度不足 则补充<pad>
return src + [padding_token] * (num_steps - len(src))

truncate_pad(src_vocab[source[0]], src_vocab['<pad>'], 10)
'''
[51,4] -> [51, 4, 1, 1, 1, 1, 1, 1, 1, 1]
'''

接下来我们构建小批量训练数据

  • 每个序列最后需要添加一个<eos>,表示句子的结束
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def build_array(tokens, vocab, num_steps):
# 将词元列表转换为索引列表
tokens = [vocab[token] for token in tokens]
# 每个序列最后添加一个eos
tokens = [token + [vocab['<eos>']] for token in tokens]

# 将处理后的定长序列构造为张量
array = torch.tensor([
truncate_pad(token, vocab['<pad>'], num_steps)
for token in tokens
], dtype=torch.long)# 后续需要进入embedding 只接受整数

# 统计非pad的词元数量
valid_len = (array != vocab['<pad>'] ).type(torch.int32).sum(1)
return array, valid_len

最后封装一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class fraDataset(Dataset):
def __init__(self, src_array, src_valid_len, tgt_array, tgt_valid_len):
self.src_array = src_array
self.src_valid_len = src_valid_len
self.tgt_array = tgt_array
self.tgt_valid_len = tgt_valid_len

def __len__(self):
return len(self.src_array)

def __getitem__(self, idx):
X = self.src_array[idx]
X_valid_len = self.src_valid_len[idx]
Y = self.tgt_array[idx]
Y_valid_len = self.tgt_valid_len[idx]

return X, X_valid_len, Y, Y_valid_len

def load_data(batch_size, num_steps, num_examples=600):
source, target = read_data_fra(num_examples)

# 构造词表
src_vocab = Vocab(source, min_freq=2, reserved_tokens=['<pad>', '<bos>', '<eos>'])
tgt_vocab = Vocab(target, min_freq=2, reserved_tokens=['<pad>', '<bos>', '<eos>'])

# 构造训练数据
src_array, src_valid_len = build_array(source, src_vocab, num_steps)
tgt_array, tgt_valid_len = build_array(target, tgt_vocab, num_steps)

print( src_array.shape , tgt_array.shape )
print( src_valid_len.shape , tgt_valid_len.shape )
'''
torch.Size([167130, 8]) torch.Size([167130, 8])
torch.Size([167130]) torch.Size([167130])
'''


# 构造数据迭代器
data_arrays = (src_array, src_valid_len, tgt_array, tgt_valid_len)
dataset = fraDataset(*data_arrays)
return DataLoader(dataset, batch_size, shuffle=True, num_workers=2), src_vocab, tgt_vocab

train_iter, src_vocab, tgt_vocab = load_data(batch_size=2, num_steps=8)


for X, X_valid_len, Y, Y_valid_len in train_iter:
print('X:', X.type(torch.int32))
print('X的有效长度:', X_valid_len)
print('Y:', Y.type(torch.int32))
print('Y的有效长度:', Y_valid_len)
break # 只打印第一个批次的内容
'''
X: tensor([[ 66, 12, 77, 2545, 4, 3, 1, 1],
[ 454, 74, 57, 491, 23, 109, 7, 265]], dtype=torch.int32)
X的有效长度: tensor([6, 8])
Y: tensor([[ 12, 22, 43, 20, 321, 4770, 4, 3],
[3050, 9, 6, 44, 2765, 92, 80, 13]], dtype=torch.int32)
Y的有效长度: tensor([8, 8])
'''

编码器 + 解码器

  • 编码器可以使用双向RNN

  • 解码器需要进行预测,无法看到未来,只能单向

  • encoder没有输出,encoder最后一个时间步的隐状态,将作为decoder的初始隐状态

Encoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Seq2SeqEncoder(nn.Module):
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers, dropout=0, **kwargs):
super(Seq2SeqEncoder, self).__init__(**kwargs)
# 嵌入层
self.embedding = nn.Embedding(vocab_size, embed_size)
self.rnn = nn.GRU(embed_size, num_hiddens, num_layers, dropout=dropout)

def forward(self, X, *args):

# X形状:(batch_size,num_steps)
X = self.embedding(X)

# X形状:(batch_size,num_steps,embed_size)
# 变换(num_steps,batch_size,embed_size)
X = X.permute(1, 0, 2)

# 获取隐状态
output, state = self.rnn(X)
# output = (num_steps, batch_size, num_hiddens)
# state = (num_layers, batch_size, num_hiddens)
return output, state

验证:

1
2
3
4
5
6
7
8
encoder = Seq2SeqEncoder(vocab_size=10, embed_size=8, num_hiddens=16, num_layers=2)
encoder.eval()
X = torch.zeros((4,7), dtype=torch.long) # embed接受int和long
output,state = encoder(X)
print(output.shape, state.shape
# torch.Size([7, 4, 16]) torch.Size([2, 4, 16])


decoder

  • 我们需要取出最后一层的state

image-20240820014458014

  • 和X进行拼接

image-20240820014726439

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class Seq2SeqDecoder(nn.Module):
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers, dropout=0, **kwargs):
super(Seq2SeqDecoder, self).__init__(**kwargs)

self.embedding = nn.Embedding(vocab_size, embed_size)
# 需要同时接受 X 和 state 进行拼接 见forward
self.rnn = nn.GRU(embed_size + num_hiddens, num_hiddens, num_layers, dropout=dropout)

# 输出层 全连接
self.dense = nn.Linear(num_hiddens, vocab_size)

def forward(self, X, state):

X = self.embedding(X).permute(1, 0, 2)

# state = (num_layers, batch_size, num_hiddens)
# 取state中 由最后一层的所有节点产生的输出
# 第一维重复num_steps次 第二、三维重复1次
# context = (num_steps, batch_size, num_hiddens)
context = state[-1].repeat(X.shape[0], 1, 1)

# 组合X和context 在最后一个维度进行连接
# -> (batch_size,num_steps,embed_size + num_hiddens)
X_and_context = torch.cat((X, context), 2)

output, state = self.rnn(X_and_context, state)

# 预测输出 调整为batch_size在前的模式
output = self.dense(output).permute(1, 0, 2)

return output, state

测试:

1
2
3
4
5
6
decoder = Seq2SeqDecoder(vocab_size=10, embed_size=8, num_hiddens=16, num_layers=2)
decoder.eval()
output, state = decoder(X, state) # 喂入encoder的state
output.shape, state.shape
# (batch_size, num_steps, vocab_size)
# (torch.Size([4, 7, 10]), torch.Size([2, 4, 16]))

封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class EncoderDecoder(nn.Module):
def __init__(self, encoder, decoder, **kwargs):
super(EncoderDecoder, self).__init__(**kwargs)
self.encoder = encoder
self.decoder = decoder

def forward(self, enc_X, dec_X):
output,state = self.encoder(enc_X)
return self.decoder(dec_X, state)


encoder = Seq2SeqEncoder(vocab_size=10, embed_size=8, num_hiddens=16, num_layers=2)
decoder = Seq2SeqDecoder(vocab_size=10, embed_size=8, num_hiddens=16, num_layers=2)
seq2seq = EncoderDecoder(encoder,decoder)
X = torch.zeros((4,7), dtype=torch.long)
output, state = seq2seq(X,X)
output.shape, state.shape
# (torch.Size([4, 7, 10]), torch.Size([2, 4, 16]))

Vaild Length

在前文中我们处理得到了一个vaild_len

1
2
3
4
5
'''
X: tensor([[ 66, 12, 77, 2545, 4, 3, 1, 1],
[ 454, 74, 57, 491, 23, 109, 7, 1]], dtype=torch.int32)
X的有效长度: tensor([6, 7])
'''

我们希望可以将不相干的、超出有效长度的进行屏蔽

1
2
3
4
5
6
7
[  66,   12,   77, 2545,    4,    3,    1,    1],
[ 454, 74, 57, 491, 23, 109, 7, 1]

->

[ 66, 12, 77, 2545, 4, 3, 0, 0],
[ 454, 74, 57, 491, 23, 109, 7, 0]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def sequence_mask(X, valid_len, value=0):

maxlen = X.size(1) # 取出定长序列的长度 size(0)是batch_size

# 生成一个从 0 到 maxlen-1 的一维张量 [0,1,2,3...,maxlen-1]

# torch.arrange()[None, :] -> 将[0,1,2,3...,maxlen-1]变成[[0,1,2,3...,maxlen-1]]
# 即:(maxlen)-> (1,maxlen) 准备后续的广播机制

# valid_len[:, None] -> 将(batch_size)变成(batch_size,1)

# 比较[0, 1, 2, 3, 4] 和 [3,4]
# 通过广播机制:
'''
[[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]]


[[3,3,3,3,3]
[4,4,4,4,4]] 进行比较

故得到:
[True, True, True, False, False],
[True, True, True, True, False]]
'''
mask = torch.arange(maxlen, dtype=torch.float32, device=X.device)[None, :] < valid_len[:, None]

X[~mask] = value # False对应位置进行修改
return X

X = torch.tensor([[1, 2, 3], [4, 5, 6]])
sequence_mask(X, torch.tensor([1, 2]))
# tensor([[1, 0, 0], [4, 5, 0]])

损失函数

超出有效长度的部分,不能计算交叉熵,需要置为0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class MaskedSoftmaxCELoss(nn.CrossEntropyLoss):
def forward(self, pred, label, valid_len):
# label: (batch_size, seq_len)
# pred: (batch_size, seq_len, num_classes)
weights = torch.ones_like(label) # 默认保留所有值
weights = sequence_mask(weights, valid_len) # 超出合法长度的权值为0
self.reduction = 'none' # 确保得到与输入相同形状的损失张量

# pred转换为 (batch_size, num_classes, seq_len) 符合pytorch接口
# unweighted_loss为(batch_size, seq_len)
unweighted_loss = super().forward(pred.permute(0, 2, 1), label)

# 乘以权值 算均值
weighted_loss = (unweighted_loss * weights).mean(dim=1)
return weighted_loss

loss = MaskedSoftmaxCELoss()
loss(torch.ones(3, 4, 10), torch.ones((3, 4), dtype=torch.long),
torch.tensor([4, 2, 0]))
# tensor([2.3026, 1.1513, 0.0000])

训练

  • 训练时需要给decoder喂入真正的数据,即都是同一套X作为输入
  • 但是需要处理一下boseos

image-20240820170241942

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device):
"""训练序列到序列模型。"""

net.to(device)
optimizer = torch.optim.Adam(net.parameters(), lr=lr)
loss = MaskedSoftmaxCELoss()

net.train()
st = time.time()
epoch_losses = [] # 用于存储每轮的损失

for epoch in range(num_epochs):

sum_loss, sum_tokens = 0.0, 0.0

for X, X_valid_len, Y, Y_valid_len in data_iter:
X, X_valid_len, Y, Y_valid_len = X.to(device), X_valid_len.to(device), Y.to(device), Y_valid_len.to(device)

# 取出-> [ tgt_vocab['<bos>'] ]
# 在列表中重复batch_size次 -> [ bos, bos, bos, ... ]
# reshape -> [ [bos], [bos], [bos], ... ] # 每个bos占一个维度
bos = torch.tensor([tgt_vocab['<bos>']] * Y.shape[0], device=device).reshape(-1, 1)

# decoder_X的输入要去掉最后一个eos 每个输入前加一个bos
dec_input = torch.cat([bos, Y[:, :-1]], 1)

Y_hat, _ = net(X, dec_input)
l = loss(Y_hat, Y, Y_valid_len)
l.sum().backward()
grad_clipping(net, 1) # 梯度裁剪

num_tokens = Y_valid_len.sum()
optimizer.step()
with torch.no_grad():
sum_loss += l.sum()
sum_tokens += num_tokens

avg_loss = (sum_loss / sum_tokens).item()
epoch_losses.append(avg_loss) # 记录每轮的平均损失

if (epoch + 1) % 10 == 0:
print(f'epoch{epoch + 1}/{num_epochs}, loss = {avg_loss}')

plt.plot(range(1, num_epochs + 1), epoch_losses, label='Train Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss over Time')
plt.legend()
plt.show()

print(f'loss {sum_loss.item() / sum_tokens.item():.3f}, {sum_tokens.item() / (time.time()-st):.1f} '
f'tokens/sec on {str(device)}')
  • 准备工作
1
2
3
4
5
6
7
8
9
10
embed_size, num_hiddens, num_layers, dropout = 32, 32, 2, 0.1
batch_size, num_steps = 64, 10
lr, num_epochs, device = 0.005, 300, torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_iter, src_vocab, tgt_vocab = load_data(batch_size, num_steps)
encoder = Seq2SeqEncoder(len(src_vocab), embed_size, num_hiddens, num_layers, dropout)
decoder = Seq2SeqDecoder(len(tgt_vocab), embed_size, num_hiddens, num_layers, dropout)
net = EncoderDecoder(encoder, decoder)

train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)
  • 结果

image-20240820175148162

预测

我们需要一个个喂入decoder,以decoder的输出作为下一次输入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps, device):

net.eval()

# 对测试数据需要进行相同的预处理

# 对源句子进行分词,并将其转换为对应的词汇表索引
# 在句子末尾添加结束标记 `<eos>`
src_tokens = src_vocab[src_sentence.lower().split(' ')] + [src_vocab['<eos>']]

# 有效长度
enc_valid_len = torch.tensor([len(src_tokens)], device=device)

src_tokens = truncate_pad(src_tokens, num_steps, src_vocab['<pad>'])

# 添加一个批次维度
enc_X = torch.unsqueeze(
torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0)

# encoder
_,dec_state = net.encoder(enc_X)

# decoder初始输入 设置为 `<bos>` 标记
dec_X = torch.unsqueeze(
torch.tensor([tgt_vocab['<bos>']], dtype=torch.long, device=device), dim=0
)

# 输出序列
output_seq = []

# 执行num_steps次
for _ in range(num_steps):
Y, dec_state = net.decoder(dec_X, dec_state)
# 最大值的索引作为预测结果
dec_X = Y.argmax(dim=2)
pred = dec_X.squeeze(dim=0).type(torch.int32).item()

# 如果预测到结束标记 `<eos>`,停止解码
if pred == tgt_vocab['<eos>']: break

# 将当前预测的词汇索引加入输出序列
output_seq.append(pred)

# 返回真实文本
return ' '.join(tgt_vocab.to_tokens(output_seq))

测试:

1
2
3
4
5
6
7
8
9
10
11
12
engs = ['go .', "i lost .", 'he\'s calm .', 'i\'m home .']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']
for eng, fra in zip(engs, fras):
translation = predict_seq2seq(net, eng, src_vocab, tgt_vocab, num_steps, device)
print(f'{eng} => {translation}')

'''
go . => vas- !
i lost . => suis- je
he's calm . => !
i'm home . => !
'''

不太行

CATALOG
  1. 1. 现代序列模型与机器翻译实践
    1. 1.1. 门控循环单元GRU
      1. 1.1.1.
      2. 1.1.2. 候选隐状态 <- 重置门
      3. 1.1.3. 隐状态 <- 更新门
      4. 1.1.4. 代码
    2. 1.2. 长短期记忆网络LSTM
      1. 1.2.1.
      2. 1.2.2. 候选记忆元
      3. 1.2.3. 记忆元
      4. 1.2.4. 隐状态
    3. 1.3. 深度RNN
    4. 1.4. 双向RNN
      1. 1.4.0.1. 代价
  2. 1.5. 机器翻译
    1. 1.5.1. 数据读入与处理
    2. 1.5.2. 编码器 + 解码器
      1. 1.5.2.1. Encoder
      2. 1.5.2.2. decoder
      3. 1.5.2.3. 封装
    3. 1.5.3. Vaild Length
    4. 1.5.4. 损失函数
    5. 1.5.5. 训练
    6. 1.5.6. 预测