Transformer架构
from torch.autograd import Variable
import torch
import torch.nn as nn
import math
import copy
import config
DEVICE = config.DEVICE
"""
在下面代码的张量形状注释中, batch_size, seq_len 不明确指定数量,
d_model默认指定为512, 注释中的数字计算都是基于d_model = 512来计算的。
"""
# -------------------------Transformer model-------------------------
def clones(module, N):
"""克隆模型块,克隆的模型块参数不共享"""
return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])
class TransformerEmbedding(nn.Module):
"""
将输入的离散词索引转换为连续的向量表示
例如,将词汇表中的第5个词映射为一个512维的向量
"""
def __init__(self, d_model, vocab_size):
super().__init__()
self.d_model = d_model
self.embedding = nn.Embedding(vocab_size, d_model) # torch.Size([151643, 512])
def forward(self, x):
# x.shape: torch.Size([10, 87])
embedding_x = self.embedding(x) # torch.Size([10, 87, 512])
# 关键:乘以 sqrt(d_model)
return embedding_x * math.sqrt(self.d_model) # torch.Size([10, 87, 512])
class PositionalEncoding(nn.Module):
"""
为每个输入位置添加一个唯一的位置编码
这个编码会被添加到词向量中,使模型能够理解位置信息
"""
def __init__(self, d_model, dropout=0.1, max_len=200):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
# 创建一个形状为 (max_len, d_model) 的位置编码矩阵
pe = torch.zeros(max_len, d_model, device=DEVICE)
# 创建一个位置索引向量 (max_len, 1)
position = torch.arange(
0,
max_len,
dtype=torch.float,
device=DEVICE,
).unsqueeze(1)
# 计算分母项:10000^(2i/d_model)
# 这里的 i 是维度索引
div_term = torch.exp(
torch.arange(0, d_model, 2, device=DEVICE).float()
* (-math.log(10000.0) / d_model)
)
# 填充偶数位置(sin)
pe[:, 0::2] = torch.sin(position * div_term)
# 填充奇数位置(cos)
pe[:, 1::2] = torch.cos(position * div_term)
# 添加 batch 维度,方便广播
pe = pe.unsqueeze(0) # shape: torch.Size([1, max_len, d_model])
# 注册为 buffer,表示它不是模型参数,但会随模型一起保存和移动
self.register_buffer("pe", pe)
def forward(self, x):
"""
x: (batch_size, seq_len, d_model) 通常是词嵌入后的输出
"""
# 取出当前位置编码(前 seq_len 个)并加到输入上
x = x + Variable(self.pe[:, : x.size(1)], requires_grad=False)
return self.dropout(x)
def attention(query, key, value, mask=None, dropout=None):
"""
这是一个纯注意力计算单元,可被多个地方复用。
这个函数确实不包含 W_Q/W_K 的乘法,它只负责注意力计算的核心部分
"""
# 将query矩阵的最后一个维度值作为d_k
d_k = query.size(-1) # 64
# 将key的最后两个维度互换(转置),才能与query矩阵相乘,乘完了还要除以d_k开根号
# 矩阵乘法:query × key^T
# query: [batch_size, 8, seq_len, 64]
# key^T: [batch_size, 8, 64, seq_len] (transpose最后两维)
# scores: [batch_size, 8, seq_len, seq_len]
scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
# 如果存在要进行mask的内容,则将那些为0的部分替换成一个很大的负数
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
# 将mask后的attention矩阵按照最后一个维度进行softmax
# [2, 8, seq_len, seq_len]
p_attn = nn.functional.softmax(scores, dim=-1)
# 如果dropout参数设置为非空,则进行dropout操作
if dropout is not None:
p_attn = dropout(p_attn)
# 最后返回注意力矩阵跟value的乘积,以及注意力矩阵
# 乘以 value
# p_attn: [batch_size, 8, seq_len0, seq_len]
# value: [batch_size, 8, seq_len, 64]
# 结果: [batch_size, 8, seq_len, 64]
return torch.matmul(p_attn, value), p_attn
class MultiHeadedAttention(nn.Module):
"""
多头注意力机制
h: 切分头的数量,需要确保d_model能被h整除.
"""
def __init__(self, h, d_model, dropout=0.1):
super(MultiHeadedAttention, self).__init__()
# 保证可以整除
assert d_model % h == 0
# 得到一个head的attention表示维度
self.d_k = d_model // h
# head数量
self.h = h
# 定义4个全连接函数,供后续作为WQ,WK,WV矩阵和最后h个多头注意力矩阵concat之后进行变换的矩阵
# 前三个用于WQ,WK,WV,最后一个用于w_o
# 进行克隆,每个全连接函数有自己单独的WQ,WK,WV
self.linears = clones(nn.Linear(d_model, d_model), 4)
self.attn = None
self.dropout = nn.Dropout(p=dropout)
def forward(self, query, key, value, mask=None):
"""
:param query: torch.Size([batch, seq_len, d_model])
:param key: torch.Size([batch, seq_len, d_model])
:param value: torch.Size([batch, seq_len, d_model])
:param mask: torch.Size([batch, 1, seq_len])
return: torch.Size([batch, seq_len, d_model])
"""
if mask is not None:
mask = mask.unsqueeze(1)
# query的第一个维度值为batch size
batch_size = query.size(0)
# 将embedding层乘以WQ,WK,WV矩阵(均为全连接)
# 将 d_model=512 的特征向量切成 8 段,每段 64 维,这不是物理分割,是 reshape
# 原始: [batch_size, seq_len, 512]
# view切分:[batch_size, seq_len, 8, 64]
# transpose转换: [batch_size, 8, seq_len, 64]
query, key, value = [
l(x).view(batch_size, -1, self.h, self.d_k).transpose(1, 2)
for l, x in zip(self.linears, (query, key, value))
# 这里的(query, key, value)在第一次调用时是都是用户传入的词嵌入向量,后面多层堆叠之后就是
# 多头注意力的输出
]
# 调用上述定义的attention函数计算得到h个注意力矩阵跟value的乘积,以及注意力矩阵
# x: [batch_size, 8, seq_len, 64]
# self.attn: [batch_size, 8, seq_len, seq_len]
x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)
# 将h个多头注意力矩阵concat起来(注意要先把h变回到第三维的位置)
# x.transpose(1, 2): [batch_size, seq_len, 8, 64]
# view: [2, seq_len, 512]
x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.h * self.d_k)
# 上面的x是concat之后的结果,按照transformer架构,我们需要乘以w_o来输出结果
# w_o存在于linear函数中,数学公式:output = x @ W^T + b
# 我们不需要自己去乘,调用这个线性函数即可,内部的w_o会在反向传播中自动更新
# 多头注意力的输出shape应该和输入保持一致,才能堆叠多层
return self.linears[-1](x)
class LayerNorm(nn.Module):
"""
pytorch 有现成的层归一化方法 nn.LayerNorm(features)
他在内部也是定义α为全1, β为全0,跟下面的实现是一样的
层归一化的输入是什么形状,输出就是什么形状
"""
def __init__(self, features, eps=1e-6):
super(LayerNorm, self).__init__()
# 初始化α为全1, 而β为全0
self.a_2 = nn.Parameter(torch.ones(features)) # [512]
self.b_2 = nn.Parameter(torch.zeros(features)) # [512]
# 平滑项
self.eps = eps
def forward(self, x):
"""
:param x: torch.Size([batch, seq_len, d_model])
return: torch.Size([batch, seq_len, d_model])
"""
# 这是一个前向传播函数,用于执行Layer Normalization操作
# 输入x是神经网络层的输出 [batch_size, seq_len, d_model]
# 按最后一个维度计算均值和方差
# keepdim=True确保输出的维度与输入相同
mean = x.mean(-1, keepdim=True) # 计算最后一个维度的均值
# PyTorch 的 .std() 函数为了符合统计学惯例,默认采用无偏估计(除以 n-1)
# LayerNorm 的目标是对一个样本的所有特征进行归一化,使其均值为 0、方差为 1。
# 在计算这个特定样本的方差时,除以 n 才是完全正确的做法
std = x.std(-1, keepdim=True, unbiased=False) # 计算最后一个维度的标准差
# 返回Layer Norm的结果
# Layer Norm公式: y = a * (x - mean) / sqrt(std^2 + eps) + b
# 其中a和b是可学习的参数,eps是为了防止除以0的小常数
# 输出和输入的形状是一样的
return self.a_2 * (x - mean) / torch.sqrt(std**2 + self.eps) + self.b_2
class PositionwiseFeedForward(nn.Module):
def __init__(self, d_model, d_ff, dropout=0.1):
"""
前馈神经网络初始化函数
参数:
d_model: 模型的输入维度
d_ff: 前馈神经网络中间层的维度
dropout: dropout概率,默认为0.1
"""
super(PositionwiseFeedForward, self).__init__()
self.w_1 = nn.Linear(d_model, d_ff) # 第一个线性层,将维度从d_model扩展到d_ff
self.w_2 = nn.Linear(d_ff, d_model) # 第二个线性层,将维度从d_ff压缩回d_model
self.dropout = nn.Dropout(dropout) # dropout层,用于防止过拟合
def forward(self, x):
"""
:param x: torch.Size([batch, seq_len, d_model])
return: torch.Size([batch, seq_len, d_model])
"""
# 先通过第一个线性层,然后应用ReLU激活函数
return self.w_2(self.dropout(nn.functional.relu(self.w_1(x))))
class SublayerConnection(nn.Module):
"""
残差连接的作用就是把Multi-Head Attention和Feed Forward层
连在一起只不过每一层输出之后都要先做Layer Norm再残差连接
残差连接和层归一化都不会改变输入的形状
"""
def __init__(self, size, dropout):
super(SublayerConnection, self).__init__()
self.norm = LayerNorm(size)
self.dropout = nn.Dropout(dropout)
"""
sublayer参数是SublayerConnection类中的核心组件,
它代表了Transformer中的具体处理层,通过这种设计实现了代码的模块化和灵活性。
"""
# 将embedding层进行Multi head Attention
# x = self.sublayers[0](x, lambda x: self.self_attn(x, x, x, mask))
# # 注意到attn得到的结果x直接作为了下一层的输入
# return self.sublayers[1](x, self.feed_forward)
def forward(self, x, sublayer):
"""
:param x: torch.Size([batch, seq_len, d_model])
return: torch.Size([batch, seq_len, d_model])
"""
# 返回Layer Norm和残差连接后结果
# 注意这里和论文中的架构不一样,论文中是先计算多头注意力,再进行残差连接和归一化
# 代码是这样的:self.norm(x + self.dropout(self.sublayer(x)))
# 而现在普遍的做法是先归一化,再计算多头注意力,再进行残差连接
# 这两种顺序代表了 两种不同的 Transformer 架构设计哲学,而现在都使用下面的这种
# 原论文中的架构中,梯度需要经过 LayerNorm 的反向传播,在深层网络中,梯度可能消失或爆炸
# 而现在的架构中残差连接提供了"梯度高速公路",梯度可以直接流回输入,不受 LayerNorm 影响
return x + self.dropout(sublayer(self.norm(x)))
class EncoderLayer(nn.Module):
def __init__(self, size, self_attn, feed_forward, dropout):
super(EncoderLayer, self).__init__()
self.self_attn = self_attn # 多头注意力层
self.feed_forward = feed_forward # 前馈网络层
# SublayerConnection的作用就是把multi和ffn连在一起
# 只不过每一层输出之后都要先做Layer Norm再残差连接
self.sublayers = clones(SublayerConnection(size, dropout), 2)
# d_model
self.size = size
def forward(self, x, mask):
"""
:param x: torch.Size([batch, seq_len, d_model])
:param mask: torch.Size([batch, 1, seq_len])
return: torch.Size([batch, seq_len, d_model])
"""
# 将embedding层进行Multi head Attention
x = self.sublayers[0](x, lambda x: self.self_attn(x, x, x, mask))
# 注意到attn得到的结果x直接作为了下一层的输入
return self.sublayers[1](x, self.feed_forward)
class Encoder(nn.Module):
# layer = EncoderLayer
# N = 6
def __init__(self, layer, N):
super(Encoder, self).__init__()
# 复制N个encoder layer
self.layers = clones(layer, N)
# Layer Norm
self.norm = LayerNorm(layer.size)
def forward(self, x, mask):
"""
使用循环连续encode N次(这里为6次)
这里的Eecoderlayer会接收一个对于输入的attention mask处理
:param x: torch.Size([batch, seq_len, d_model])
:param mask: torch.Size([batch, 1, seq_len])
return: torch.Size([batch, seq_len, d_model])
"""
for layer in self.layers:
x = layer(x, mask)
return self.norm(x)
class DecoderLayer(nn.Module):
def __init__(self, d_model, self_attn, src_attn, feed_forward, dropout):
super(DecoderLayer, self).__init__()
self.size = d_model
# Self-Attention
self.self_attn = self_attn
# 与Encoder传入的Context进行Attention
self.src_attn = src_attn
self.feed_forward = feed_forward
self.sublayers = clones(SublayerConnection(d_model, dropout), 3)
def forward(self, x, memory, src_mask, tgt_mask):
# 用m来存放encoder的最终hidden表示结果
m = memory
# Self-Attention:注意self-attention的q,k和v均为decoder hidden
# 同时加上了因果掩码,然后做层归一化和残差连接
x = self.sublayers[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))
# 这里就是encoder和decoder连接的部分,q来自decoder,k和v来自encoder
# Context-Attention:注意context-attention的q为decoder hidden,而k和v为encoder hidden
x = self.sublayers[1](x, lambda x: self.src_attn(x, m, m, src_mask))
return self.sublayers[2](x, self.feed_forward)
class Decoder(nn.Module):
def __init__(self, layer, N):
super(Decoder, self).__init__()
# 复制N个encoder layer
self.layers = clones(layer, N)
# Layer Norm
self.norm = LayerNorm(layer.size)
def forward(self, x, memory, src_mask, tgt_mask):
"""
使用循环连续decode N次(这里为6次)
这里的Decoderlayer会接收一个对于输入的attention mask处理
和一个对输出的attention mask + subsequent mask处理
这里的x是输出的向量:torch.Size([batch_size, seq_len, d_model])
memory是encoder的输出:torch.Size([batch_size, seq_len, d_model])
"""
for layer in self.layers:
x = layer(x, memory, src_mask, tgt_mask)
return self.norm(x)
class Generator(nn.Module):
# vocab: vocab_size
def __init__(self, d_model, vocab_size):
super(Generator, self).__init__()
# decode后的结果,先进入一个全连接层变为词典大小的向量
self.proj = nn.Linear(d_model, vocab_size)
def forward(self, x):
# 然后再进行log_softmax操作(在softmax结果上再做多一次log运算)
return nn.functional.log_softmax(self.proj(x), dim=-1)
class Transformer(nn.Module):
def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):
super(Transformer, self).__init__()
self.encoder = encoder
self.decoder = decoder
self.src_embed = src_embed
self.tgt_embed = tgt_embed
self.generator = generator
def encode(self, src, src_mask):
return self.encoder(self.src_embed(src), src_mask)
def decode(self, memory, src_mask, tgt, tgt_mask):
return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)
def forward(self, src, tgt, src_mask, tgt_mask):
"""
# Encoder输出,torch.Size([batch_size, seq_len, d_model])
# 通过self.encode(src, src_mask)计算得出
# 用途: Decoder 的 Cross-Attention 的 K 和 V
# src_mask,位置掩码,torch.Size([batch_size, 1, seq_len])
# 用途: 标记源序列哪些位置是有效token(不是padding)
# tgt,Target IDs,torch.Size([batch_size, tgt_seq_len])
# 含义: 10个目标句子,每个句子tgt_seq_len个token的ID
# tgt_mask,因果掩码 torch.Size([batch_size, tgt_seq_len, tgt_seq_len])
# 含义: 下三角掩码,防止看到未来信息
"""
# encoder的结果作为decoder的memory参数传入,进行decode
return self.decode(self.encode(src, src_mask), src_mask, tgt, tgt_mask)
def make_model(vocab_size, N=6, d_model=512, d_ff=2048, h=8, dropout=0.1):
c = copy.deepcopy
# 实例化Attention对象
attn = MultiHeadedAttention(h, d_model).to(DEVICE)
# 实例化FeedForward对象
ff = PositionwiseFeedForward(d_model, d_ff, dropout).to(DEVICE)
# 实例化PositionalEncoding对象
position = PositionalEncoding(d_model, dropout).to(DEVICE)
# 实例化Transformer模型对象
model = Transformer(
Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout).to(DEVICE), N).to(
DEVICE
),
Decoder(
DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout).to(DEVICE), N
).to(DEVICE),
nn.Sequential(
TransformerEmbedding(d_model, vocab_size).to(DEVICE), c(position)
),
nn.Sequential(
TransformerEmbedding(d_model, vocab_size).to(DEVICE), c(position)
),
Generator(d_model, vocab_size),
).to(DEVICE)
# 初始化模型参数
# 遍历模型中的所有参数
for p in model.parameters():
# 判断参数是否为二维或更高维(例如权重矩阵,而不是偏置向量)
if p.dim() > 1:
# 这里初始化采用的是nn.init.xavier_uniform
nn.init.xavier_uniform_(p)
return model.to(DEVICE)
from DatasetLoader import MTDataset
from Transformer import make_model
from torch.utils.data import DataLoader
from pathlib import Path
import matplotlib.pyplot as plt
import torch
import config
def run():
# 初始化模型
model = make_model(
config.vocab_size,
config.n_layers,
config.d_model,
config.d_ff,
config.n_heads,
config.dropout,
)
train_dataset = MTDataset(
r"D:\文档\codes\AI\神经网络\my_transfomer\my_datasets\train.csv"
)
validate_dataset = MTDataset(
r"D:\文档\codes\AI\神经网络\my_transfomer\my_datasets\validate.csv"
)
train_dataloader = DataLoader(
train_dataset,
shuffle=True,
batch_size=50,
collate_fn=train_dataset.collate_fn,
)
validate_dataloader = DataLoader(
validate_dataset,
shuffle=True,
batch_size=50,
collate_fn=validate_dataset.collate_fn,
)
# 训练阶段,选择损失函数和优化器
# CrossEntropyLoss是常见的分类问题损失函数,ignore_index=0表示忽略填充部分
# reduction='sum'表示计算损失时会对所有token的损失求和
criterion = torch.nn.CrossEntropyLoss(ignore_index=0, reduction="sum")
# 调用get_std_opt函数获取标准的Noam优化器,这通常包括学习率调度器(如预热后衰减)
optimizer = torch.optim.Adam(model.parameters(), lr=config.lr)
best_model_path = Path.joinpath(
Path(__file__).parent.resolve(), r"saves/best_lstm_model.pth"
)
best_val_loss = float("inf")
train_losses = []
val_losses = []
for epoch in range(1, config.epoch_num + 1):
print(f"---------- 第{epoch}轮模型训练与验证 START ----------")
print("训练阶段")
model.train()
epoch_train_loss = 0
for batch in train_dataloader:
train_loss = loss_compute(model, batch, criterion)
print(f"batch_train_loss: {train_loss.item():.3f}")
train_losses.append(train_loss.item())
optimizer.zero_grad()
train_loss.backward()
optimizer.step()
epoch_train_loss += train_loss.item()
print(f"epoch_train_loss: {epoch_train_loss / len(train_dataloader):.3f}")
# 验证阶段
print("验证阶段")
model.eval()
epoch_val_loss = 0
for batch in validate_dataloader:
val_loss = loss_compute(model, batch, criterion)
val_losses.append(val_loss.item())
print(f"batch_val_loss: {val_loss.item():.3f}")
epoch_val_loss += val_loss.item()
print(f"epoch_val_loss: {epoch_val_loss / len(validate_dataloader):.3f}")
print(f"---------- 第{epoch}轮模型训练与验证 END ----------")
# 保存最佳模型
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), best_model_path)
# 绘制训练曲线
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(train_losses, label="Train Loss")
plt.plot(val_losses, label="Val Loss")
plt.xlabel("Batch")
plt.ylabel("Loss")
plt.title("Training History")
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
def loss_compute(model, batch, criterion):
# batch.src:输入的源语言数据
# batch.trg:目标语言数据
# batch.src_mask:源语言mask
# batch.trg_mask:目标语言mask
out = model(batch.src, batch.tgt, batch.src_mask, batch.tgt_mask)
# 重塑预测和目标张量以便计算损失
# gen: [batch, seq_len, vocab_size] -> [batch*seq_len, vocab_size]
gen = model.generator(out)
y_pred = gen.contiguous().view(-1, gen.size(-1))
# # targets: [batch, seq_len] -> [batch*seq_len]
targets = batch.tgt_y
y_true = targets.contiguous().view(-1)
loss = criterion(y_pred, y_true)
loss = loss / batch.ntokens
return loss
if __name__ == "__main__":
run()
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 lyucan_1@163.com