Transformer架构解析

  1. Transformer架构

Transformer架构

from torch.autograd import Variable
import torch
import torch.nn as nn
import math
import copy
import config

DEVICE = config.DEVICE

"""
在下面代码的张量形状注释中, batch_size, seq_len 不明确指定数量,
d_model默认指定为512, 注释中的数字计算都是基于d_model = 512来计算的。
"""


# -------------------------Transformer model-------------------------
def clones(module, N):
    """克隆模型块,克隆的模型块参数不共享"""
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])


class TransformerEmbedding(nn.Module):
    """
    将输入的离散词索引转换为连续的向量表示
    例如,将词汇表中的第5个词映射为一个512维的向量
    """

    def __init__(self, d_model, vocab_size):
        super().__init__()
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, d_model)  # torch.Size([151643, 512])

    def forward(self, x):
        # x.shape: torch.Size([10, 87])

        embedding_x = self.embedding(x)  # torch.Size([10, 87, 512])
        # 关键:乘以 sqrt(d_model)
        return embedding_x * math.sqrt(self.d_model)  # torch.Size([10, 87, 512])


class PositionalEncoding(nn.Module):
    """
    为每个输入位置添加一个唯一的位置编码
    这个编码会被添加到词向量中,使模型能够理解位置信息
    """

    def __init__(self, d_model, dropout=0.1, max_len=200):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        # 创建一个形状为 (max_len, d_model) 的位置编码矩阵
        pe = torch.zeros(max_len, d_model, device=DEVICE)

        # 创建一个位置索引向量 (max_len, 1)
        position = torch.arange(
            0,
            max_len,
            dtype=torch.float,
            device=DEVICE,
        ).unsqueeze(1)

        # 计算分母项:10000^(2i/d_model)
        # 这里的 i 是维度索引
        div_term = torch.exp(
            torch.arange(0, d_model, 2, device=DEVICE).float()
            * (-math.log(10000.0) / d_model)
        )

        # 填充偶数位置(sin)
        pe[:, 0::2] = torch.sin(position * div_term)
        # 填充奇数位置(cos)
        pe[:, 1::2] = torch.cos(position * div_term)

        # 添加 batch 维度,方便广播
        pe = pe.unsqueeze(0)  # shape: torch.Size([1, max_len, d_model])

        # 注册为 buffer,表示它不是模型参数,但会随模型一起保存和移动
        self.register_buffer("pe", pe)

    def forward(self, x):
        """
        x: (batch_size, seq_len, d_model) 通常是词嵌入后的输出
        """
        # 取出当前位置编码(前 seq_len 个)并加到输入上
        x = x + Variable(self.pe[:, : x.size(1)], requires_grad=False)
        return self.dropout(x)


def attention(query, key, value, mask=None, dropout=None):
    """
    这是一个纯注意力计算单元,可被多个地方复用。
    这个函数确实不包含 W_Q/W_K 的乘法,它只负责注意力计算的核心部分
    """
    # 将query矩阵的最后一个维度值作为d_k
    d_k = query.size(-1)  # 64

    # 将key的最后两个维度互换(转置),才能与query矩阵相乘,乘完了还要除以d_k开根号
    # 矩阵乘法:query × key^T
    # query:    [batch_size, 8, seq_len, 64]
    # key^T:    [batch_size, 8, 64, seq_len]  (transpose最后两维)
    # scores:   [batch_size, 8, seq_len, seq_len]
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)

    # 如果存在要进行mask的内容,则将那些为0的部分替换成一个很大的负数
    if mask is not None:
        scores = scores.masked_fill(mask == 0, -1e9)

    # 将mask后的attention矩阵按照最后一个维度进行softmax
    # [2, 8, seq_len, seq_len]
    p_attn = nn.functional.softmax(scores, dim=-1)

    # 如果dropout参数设置为非空,则进行dropout操作
    if dropout is not None:
        p_attn = dropout(p_attn)

    # 最后返回注意力矩阵跟value的乘积,以及注意力矩阵
    # 乘以 value
    # p_attn: [batch_size, 8, seq_len0, seq_len]
    # value:  [batch_size, 8, seq_len, 64]
    # 结果:   [batch_size, 8, seq_len, 64]
    return torch.matmul(p_attn, value), p_attn


class MultiHeadedAttention(nn.Module):
    """
    多头注意力机制
    h: 切分头的数量,需要确保d_model能被h整除.
    """

    def __init__(self, h, d_model, dropout=0.1):
        super(MultiHeadedAttention, self).__init__()
        # 保证可以整除
        assert d_model % h == 0
        # 得到一个head的attention表示维度
        self.d_k = d_model // h
        # head数量
        self.h = h

        # 定义4个全连接函数,供后续作为WQ,WK,WV矩阵和最后h个多头注意力矩阵concat之后进行变换的矩阵
        # 前三个用于WQ,WK,WV,最后一个用于w_o
        # 进行克隆,每个全连接函数有自己单独的WQ,WK,WV
        self.linears = clones(nn.Linear(d_model, d_model), 4)
        self.attn = None
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, query, key, value, mask=None):
        """
        :param query: torch.Size([batch, seq_len, d_model])
        :param key: torch.Size([batch, seq_len, d_model])
        :param value: torch.Size([batch, seq_len, d_model])
        :param mask: torch.Size([batch, 1, seq_len])
        return: torch.Size([batch, seq_len, d_model])
        """
        if mask is not None:
            mask = mask.unsqueeze(1)

        # query的第一个维度值为batch size
        batch_size = query.size(0)

        # 将embedding层乘以WQ,WK,WV矩阵(均为全连接)
        # 将 d_model=512 的特征向量切成 8 段,每段 64 维,这不是物理分割,是 reshape
        # 原始: [batch_size, seq_len, 512]
        # view切分:[batch_size, seq_len, 8, 64]
        # transpose转换: [batch_size, 8, seq_len, 64]
        query, key, value = [
            l(x).view(batch_size, -1, self.h, self.d_k).transpose(1, 2)
            for l, x in zip(self.linears, (query, key, value))
            # 这里的(query, key, value)在第一次调用时是都是用户传入的词嵌入向量,后面多层堆叠之后就是
            # 多头注意力的输出
        ]

        # 调用上述定义的attention函数计算得到h个注意力矩阵跟value的乘积,以及注意力矩阵
        # x: [batch_size, 8, seq_len, 64]
        # self.attn: [batch_size, 8, seq_len, seq_len]
        x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)

        # 将h个多头注意力矩阵concat起来(注意要先把h变回到第三维的位置)
        # x.transpose(1, 2): [batch_size, seq_len, 8, 64]
        # view: [2, seq_len, 512]
        x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.h * self.d_k)

        # 上面的x是concat之后的结果,按照transformer架构,我们需要乘以w_o来输出结果
        # w_o存在于linear函数中,数学公式:output = x @ W^T + b
        # 我们不需要自己去乘,调用这个线性函数即可,内部的w_o会在反向传播中自动更新
        # 多头注意力的输出shape应该和输入保持一致,才能堆叠多层
        return self.linears[-1](x)


class LayerNorm(nn.Module):
    """
    pytorch 有现成的层归一化方法 nn.LayerNorm(features)
    他在内部也是定义α为全1, β为全0,跟下面的实现是一样的
    层归一化的输入是什么形状,输出就是什么形状
    """

    def __init__(self, features, eps=1e-6):
        super(LayerNorm, self).__init__()
        # 初始化α为全1, 而β为全0
        self.a_2 = nn.Parameter(torch.ones(features))  # [512]
        self.b_2 = nn.Parameter(torch.zeros(features))  # [512]
        # 平滑项
        self.eps = eps

    def forward(self, x):
        """
        :param x: torch.Size([batch, seq_len, d_model])
        return: torch.Size([batch, seq_len, d_model])
        """
        # 这是一个前向传播函数,用于执行Layer Normalization操作
        # 输入x是神经网络层的输出 [batch_size, seq_len, d_model]
        # 按最后一个维度计算均值和方差
        # keepdim=True确保输出的维度与输入相同
        mean = x.mean(-1, keepdim=True)  # 计算最后一个维度的均值

        # PyTorch 的 .std() 函数为了符合统计学惯例,默认采用无偏估计(除以 n-1)
        # LayerNorm 的目标是对一个样本的所有特征进行归一化,使其均值为 0、方差为 1。
        # 在计算这个特定样本的方差时,除以 n 才是完全正确的做法
        std = x.std(-1, keepdim=True, unbiased=False)  # 计算最后一个维度的标准差

        # 返回Layer Norm的结果
        # Layer Norm公式: y = a * (x - mean) / sqrt(std^2 + eps) + b
        # 其中a和b是可学习的参数,eps是为了防止除以0的小常数
        # 输出和输入的形状是一样的
        return self.a_2 * (x - mean) / torch.sqrt(std**2 + self.eps) + self.b_2


class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        """
        前馈神经网络初始化函数
        参数:
            d_model: 模型的输入维度
            d_ff: 前馈神经网络中间层的维度
            dropout: dropout概率,默认为0.1
        """
        super(PositionwiseFeedForward, self).__init__()
        self.w_1 = nn.Linear(d_model, d_ff)  # 第一个线性层,将维度从d_model扩展到d_ff
        self.w_2 = nn.Linear(d_ff, d_model)  # 第二个线性层,将维度从d_ff压缩回d_model
        self.dropout = nn.Dropout(dropout)  # dropout层,用于防止过拟合

    def forward(self, x):
        """
        :param x: torch.Size([batch, seq_len, d_model])
        return: torch.Size([batch, seq_len, d_model])
        """
        # 先通过第一个线性层,然后应用ReLU激活函数
        return self.w_2(self.dropout(nn.functional.relu(self.w_1(x))))


class SublayerConnection(nn.Module):
    """
    残差连接的作用就是把Multi-Head Attention和Feed Forward层
    连在一起只不过每一层输出之后都要先做Layer Norm再残差连接
    残差连接和层归一化都不会改变输入的形状
    """

    def __init__(self, size, dropout):
        super(SublayerConnection, self).__init__()
        self.norm = LayerNorm(size)
        self.dropout = nn.Dropout(dropout)

    """
    sublayer参数是SublayerConnection类中的核心组件,
    它代表了Transformer中的具体处理层,通过这种设计实现了代码的模块化和灵活性。
    """

    # 将embedding层进行Multi head Attention
    # x = self.sublayers[0](x, lambda x: self.self_attn(x, x, x, mask))
    # # 注意到attn得到的结果x直接作为了下一层的输入
    # return self.sublayers[1](x, self.feed_forward)

    def forward(self, x, sublayer):
        """
        :param x: torch.Size([batch, seq_len, d_model])
        return: torch.Size([batch, seq_len, d_model])
        """
        # 返回Layer Norm和残差连接后结果
        # 注意这里和论文中的架构不一样,论文中是先计算多头注意力,再进行残差连接和归一化
        # 代码是这样的:self.norm(x + self.dropout(self.sublayer(x)))
        # 而现在普遍的做法是先归一化,再计算多头注意力,再进行残差连接
        # 这两种顺序代表了 两种不同的 Transformer 架构设计哲学,而现在都使用下面的这种
        # 原论文中的架构中,梯度需要经过 LayerNorm 的反向传播,在深层网络中,梯度可能消失或爆炸
        # 而现在的架构中残差连接提供了"梯度高速公路",梯度可以直接流回输入,不受 LayerNorm 影响
        return x + self.dropout(sublayer(self.norm(x)))


class EncoderLayer(nn.Module):
    def __init__(self, size, self_attn, feed_forward, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = self_attn  # 多头注意力层
        self.feed_forward = feed_forward  # 前馈网络层

        # SublayerConnection的作用就是把multi和ffn连在一起
        # 只不过每一层输出之后都要先做Layer Norm再残差连接
        self.sublayers = clones(SublayerConnection(size, dropout), 2)
        # d_model
        self.size = size

    def forward(self, x, mask):
        """
        :param x: torch.Size([batch, seq_len, d_model])
        :param mask: torch.Size([batch, 1, seq_len])
        return: torch.Size([batch, seq_len, d_model])
        """
        # 将embedding层进行Multi head Attention
        x = self.sublayers[0](x, lambda x: self.self_attn(x, x, x, mask))
        # 注意到attn得到的结果x直接作为了下一层的输入
        return self.sublayers[1](x, self.feed_forward)


class Encoder(nn.Module):
    # layer = EncoderLayer
    # N = 6
    def __init__(self, layer, N):
        super(Encoder, self).__init__()
        # 复制N个encoder layer
        self.layers = clones(layer, N)
        # Layer Norm
        self.norm = LayerNorm(layer.size)

    def forward(self, x, mask):
        """
        使用循环连续encode N次(这里为6次)
        这里的Eecoderlayer会接收一个对于输入的attention mask处理
        :param x: torch.Size([batch, seq_len, d_model])
        :param mask: torch.Size([batch, 1, seq_len])
        return: torch.Size([batch, seq_len, d_model])
        """
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)


class DecoderLayer(nn.Module):
    def __init__(self, d_model, self_attn, src_attn, feed_forward, dropout):
        super(DecoderLayer, self).__init__()
        self.size = d_model
        # Self-Attention
        self.self_attn = self_attn
        # 与Encoder传入的Context进行Attention
        self.src_attn = src_attn
        self.feed_forward = feed_forward
        self.sublayers = clones(SublayerConnection(d_model, dropout), 3)

    def forward(self, x, memory, src_mask, tgt_mask):
        # 用m来存放encoder的最终hidden表示结果
        m = memory

        # Self-Attention:注意self-attention的q,k和v均为decoder hidden
        # 同时加上了因果掩码,然后做层归一化和残差连接
        x = self.sublayers[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))

        # 这里就是encoder和decoder连接的部分,q来自decoder,k和v来自encoder
        # Context-Attention:注意context-attention的q为decoder hidden,而k和v为encoder hidden
        x = self.sublayers[1](x, lambda x: self.src_attn(x, m, m, src_mask))
        return self.sublayers[2](x, self.feed_forward)


class Decoder(nn.Module):
    def __init__(self, layer, N):
        super(Decoder, self).__init__()
        # 复制N个encoder layer
        self.layers = clones(layer, N)
        # Layer Norm
        self.norm = LayerNorm(layer.size)

    def forward(self, x, memory, src_mask, tgt_mask):
        """
        使用循环连续decode N次(这里为6次)
        这里的Decoderlayer会接收一个对于输入的attention mask处理
        和一个对输出的attention mask + subsequent mask处理
        这里的x是输出的向量:torch.Size([batch_size, seq_len, d_model])
        memory是encoder的输出:torch.Size([batch_size, seq_len, d_model])
        """
        for layer in self.layers:
            x = layer(x, memory, src_mask, tgt_mask)
        return self.norm(x)


class Generator(nn.Module):
    # vocab: vocab_size
    def __init__(self, d_model, vocab_size):
        super(Generator, self).__init__()
        # decode后的结果,先进入一个全连接层变为词典大小的向量
        self.proj = nn.Linear(d_model, vocab_size)

    def forward(self, x):
        # 然后再进行log_softmax操作(在softmax结果上再做多一次log运算)
        return nn.functional.log_softmax(self.proj(x), dim=-1)


class Transformer(nn.Module):
    def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):
        super(Transformer, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.src_embed = src_embed
        self.tgt_embed = tgt_embed
        self.generator = generator

    def encode(self, src, src_mask):
        return self.encoder(self.src_embed(src), src_mask)

    def decode(self, memory, src_mask, tgt, tgt_mask):
        return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)

    def forward(self, src, tgt, src_mask, tgt_mask):
        """
        # Encoder输出,torch.Size([batch_size, seq_len, d_model])
        # 通过self.encode(src, src_mask)计算得出
        # 用途: Decoder 的 Cross-Attention 的 K 和 V

        # src_mask,位置掩码,torch.Size([batch_size, 1, seq_len])
        # 用途: 标记源序列哪些位置是有效token(不是padding)

        # tgt,Target IDs,torch.Size([batch_size, tgt_seq_len])
        # 含义: 10个目标句子,每个句子tgt_seq_len个token的ID

        # tgt_mask,因果掩码 torch.Size([batch_size, tgt_seq_len, tgt_seq_len])
        # 含义: 下三角掩码,防止看到未来信息
        """
        # encoder的结果作为decoder的memory参数传入,进行decode
        return self.decode(self.encode(src, src_mask), src_mask, tgt, tgt_mask)


def make_model(vocab_size, N=6, d_model=512, d_ff=2048, h=8, dropout=0.1):
    c = copy.deepcopy
    # 实例化Attention对象
    attn = MultiHeadedAttention(h, d_model).to(DEVICE)
    # 实例化FeedForward对象
    ff = PositionwiseFeedForward(d_model, d_ff, dropout).to(DEVICE)
    # 实例化PositionalEncoding对象
    position = PositionalEncoding(d_model, dropout).to(DEVICE)
    # 实例化Transformer模型对象

    model = Transformer(
        Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout).to(DEVICE), N).to(
            DEVICE
        ),
        Decoder(
            DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout).to(DEVICE), N
        ).to(DEVICE),
        nn.Sequential(
            TransformerEmbedding(d_model, vocab_size).to(DEVICE), c(position)
        ),
        nn.Sequential(
            TransformerEmbedding(d_model, vocab_size).to(DEVICE), c(position)
        ),
        Generator(d_model, vocab_size),
    ).to(DEVICE)

    # 初始化模型参数
    # 遍历模型中的所有参数
    for p in model.parameters():
        # 判断参数是否为二维或更高维(例如权重矩阵,而不是偏置向量)
        if p.dim() > 1:
            # 这里初始化采用的是nn.init.xavier_uniform
            nn.init.xavier_uniform_(p)
    return model.to(DEVICE)
from DatasetLoader import MTDataset
from Transformer import make_model
from torch.utils.data import DataLoader
from pathlib import Path
import matplotlib.pyplot as plt
import torch
import config


def run():
    # 初始化模型
    model = make_model(
        config.vocab_size,
        config.n_layers,
        config.d_model,
        config.d_ff,
        config.n_heads,
        config.dropout,
    )

    train_dataset = MTDataset(
        r"D:\文档\codes\AI\神经网络\my_transfomer\my_datasets\train.csv"
    )
    validate_dataset = MTDataset(
        r"D:\文档\codes\AI\神经网络\my_transfomer\my_datasets\validate.csv"
    )
    train_dataloader = DataLoader(
        train_dataset,
        shuffle=True,
        batch_size=50,
        collate_fn=train_dataset.collate_fn,
    )
    validate_dataloader = DataLoader(
        validate_dataset,
        shuffle=True,
        batch_size=50,
        collate_fn=validate_dataset.collate_fn,
    )

    # 训练阶段,选择损失函数和优化器
    # CrossEntropyLoss是常见的分类问题损失函数,ignore_index=0表示忽略填充部分
    # reduction='sum'表示计算损失时会对所有token的损失求和
    criterion = torch.nn.CrossEntropyLoss(ignore_index=0, reduction="sum")

    # 调用get_std_opt函数获取标准的Noam优化器,这通常包括学习率调度器(如预热后衰减)
    optimizer = torch.optim.Adam(model.parameters(), lr=config.lr)

    best_model_path = Path.joinpath(
        Path(__file__).parent.resolve(), r"saves/best_lstm_model.pth"
    )
    best_val_loss = float("inf")
    train_losses = []
    val_losses = []
    for epoch in range(1, config.epoch_num + 1):
        print(f"---------- 第{epoch}轮模型训练与验证 START ----------")
        print("训练阶段")
        model.train()
        epoch_train_loss = 0
        for batch in train_dataloader:
            train_loss = loss_compute(model, batch, criterion)
            print(f"batch_train_loss: {train_loss.item():.3f}")
            train_losses.append(train_loss.item())
            optimizer.zero_grad()
            train_loss.backward()
            optimizer.step()
            epoch_train_loss += train_loss.item()
        print(f"epoch_train_loss: {epoch_train_loss / len(train_dataloader):.3f}")

        # 验证阶段
        print("验证阶段")
        model.eval()
        epoch_val_loss = 0
        for batch in validate_dataloader:
            val_loss = loss_compute(model, batch, criterion)
            val_losses.append(val_loss.item())
            print(f"batch_val_loss: {val_loss.item():.3f}")
            epoch_val_loss += val_loss.item()
        print(f"epoch_val_loss: {epoch_val_loss / len(validate_dataloader):.3f}")

        print(f"---------- 第{epoch}轮模型训练与验证 END ----------")

        # 保存最佳模型
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), best_model_path)

    # 绘制训练曲线
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label="Train Loss")
    plt.plot(val_losses, label="Val Loss")
    plt.xlabel("Batch")
    plt.ylabel("Loss")
    plt.title("Training History")
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()


def loss_compute(model, batch, criterion):
    # batch.src:输入的源语言数据
    # batch.trg:目标语言数据
    # batch.src_mask:源语言mask
    # batch.trg_mask:目标语言mask
    out = model(batch.src, batch.tgt, batch.src_mask, batch.tgt_mask)
    # 重塑预测和目标张量以便计算损失
    # gen: [batch, seq_len, vocab_size] -> [batch*seq_len, vocab_size]
    gen = model.generator(out)
    y_pred = gen.contiguous().view(-1, gen.size(-1))

    # # targets: [batch, seq_len] -> [batch*seq_len]
    targets = batch.tgt_y
    y_true = targets.contiguous().view(-1)

    loss = criterion(y_pred, y_true)
    loss = loss / batch.ntokens
    return loss


if __name__ == "__main__":
    run()

转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 lyucan_1@163.com