LOADING

加载过慢请开启缓存 浏览器默认开启

TextCNN

TextCNN

文本数据预处理

1. read_data函数

def read_data(file):
    with open(file, encoding="utf-8") as f:
        all_data = f.read().split("\n")
    texts, labels = [], []
    for data in all_data:
        if data:
            text, label = data.split("\t")
            texts.append(text)
            labels.append(label)
    return texts, labels
  • 功能:读取一个文件,并将文件中的每一行按制表符(\t)分割成文本和标签,分别存储在 textslabels 列表中。
  • 参数
    • file:文件路径。
  • 返回值
    • texts:包含所有文本的列表。
    • labels:包含所有标签的列表。

2. built_curpus 函数

def built_curpus(train_texts, embedding_num):
    word_2_index = {"<PAD>": 0, "<UNK>": 1}
    for text in train_texts:
        for word in text:
            word_2_index[word] = word_2_index.get(word, len(word_2_index))
    embedding = nn.Embedding(len(word_2_index), embedding_num)
    pkl.dump([word_2_index, embedding], open(parsers().data_pkl, "wb"))
    return word_2_index, embedding

代码定义了一个名为 built_curpus 的函数,该函数主要用于构建词汇表和词嵌入矩阵,并将它们保存到一个文件中。具体来说,这段代码完成了以下几项任务:

  1. 词汇表的初始化:

    • word_2_index = {"<PAD>": 0, "<UNK>": 1}:这个字典用来将单词映射到对应的索引。"<PAD>""<UNK>" 是两个特殊的标记,分别用于填充和未知单词,分别对应索引 01
  2. 构建词汇表:

    • for text in train_texts::遍历所有的训练文本,每个文本是一个包含多个单词的列表。
    • for word in text::进一步遍历每个文本中的单词。
    • word_2_index[word] = word_2_index.get(word, len(word_2_index)):将每个单词添加到 word_2_index 字典中。如果该单词已经存在于字典中,则保持其原有的索引;如果不存在,则将其添加到字典中,并分配一个新的索引(即当前字典的长度)。
  3. 词嵌入矩阵的创建:

    • embedding = nn.Embedding(len(word_2_index), embedding_num):这里使用 PyTorch 的 nn.Embedding 创建了一个词嵌入矩阵。len(word_2_index) 表示词汇表的大小,而 embedding_num 则表示每个单词的嵌入维度。nn.Embedding 会根据词汇表的大小和嵌入维度创建一个形状为 [len(word_2_index), embedding_num] 的矩阵,每个单词会对应一行向量。
  4. 保存词汇表和嵌入矩阵:

    • pkl.dump([word_2_index, embedding], open(parsers().data_pkl, "wb")):将词汇表和嵌入矩阵保存到一个 .pkl 文件中。这里使用了 pickle 模块的 dump 方法将数据序列化,并写入到指定的文件中。parsers().data_pkl 返回了文件路径(假设 parsers() 是一个定义了 data_pkl 属性的类或者函数)。
  5. 返回结果:

    • return word_2_index, embedding:函数最终返回了构建的词汇表 word_2_index 和词嵌入矩阵 embedding

3. TextDataset

class TextDataset(Dataset):
    def __init__(self, all_text, all_label, word_2_index, max_len):
        self.all_text = all_text
        self.all_label = all_label
        self.word_2_index = word_2_index
        self.max_len = max_len

    def __getitem__(self, index):
        text = self.all_text[index][:self.max_len]
        label = int(self.all_label[index])
        text_idx = [self.word_2_index.get(i, 1) for i in text]
        text_idx = text_idx + [0] * (self.max_len - len(text_idx))
        text_idx = torch.tensor(text_idx).unsqueeze(dim=0)
        return text_idx, label

    def __len__(self):
        return len(self.all_text)

代码定义了一个名为 TextDataset 的类,继承自 torch.utils.data.Dataset,主要用于处理文本数据以便在深度学习模型中使用。它实现了 PyTorch 的 Dataset 接口,包括 __init____getitem____len__ 三个核心方法。这个类的主要功能是将文本数据转换为适合模型输入的格式,包括将文本转化为索引序列,并将数据组织为数据集以供模型训练和评估。

1. __init__ 方法

def __init__(self, all_text, all_label, word_2_index, max_len):
    self.all_text = all_text
    self.all_label = all_label
    self.word_2_index = word_2_index
    self.max_len = max_len
  • 功能:初始化 TextDataset 对象的实例变量。
    • self.all_text:存储所有的文本数据,通常是一个包含文本序列的列表。
    • self.all_label:存储所有的标签数据,通常是一个包含对应标签的列表。
    • self.word_2_index:存储词汇表,即单词到索引的映射字典。
    • self.max_len:指定文本的最大长度。超出此长度的文本将被截断,短于此长度的文本将被填充。

2. __getitem__ 方法

def __getitem__(self, index):
    text = self.all_text[index][:self.max_len]
    label = int(self.all_label[index])
    text_idx = [self.word_2_index.get(i, 1) for i in text]
    text_idx = text_idx + [0] * (self.max_len - len(text_idx))
    text_idx = torch.tensor(text_idx).unsqueeze(dim=0)
    return text_idx, label
  • 功能:获取数据集中指定索引 index 处的文本和标签,并将文本转换为索引序列。
    • text = self.all_text[index][:self.max_len]:从 self.all_text 中提取指定索引的文本,并截取到 max_len 的长度。
    • label = int(self.all_label[index]):从 self.all_label 中提取指定索引的标签,并转换为整数类型。
    • text_idx = [self.word_2_index.get(i, 1) for i in text]:将文本中的每个单词转换为对应的索引。如果单词不在词汇表中,则返回索引 1(通常对应于 <UNK>)。
    • text_idx = text_idx + [0] * (self.max_len - len(text_idx)):将索引序列填充到 max_len 的长度,使用索引 0(通常对应于 <PAD>)。
    • text_idx = torch.tensor(text_idx).unsqueeze(dim=0):将索引序列转换为 PyTorch 的 tensor,并在第一个维度上增加一个维度,使其适合模型的输入形状。
    • return text_idx, label:返回处理后的文本索引序列和对应的标签。

3. __len__ 方法

def __len__(self):
    return len(self.all_text)
  • 功能:返回数据集的大小,即文本序列的数量。通常用于告诉 PyTorch 数据加载器(DataLoader)数据集的总长度。

4. 举例

TextDataset 类中,text_idx 的形状是一个 PyTorch 张量 (tensor),其维度是 (1, max_len),其中 max_len 是在初始化时指定的最大句子长度。

假设我们有一个句子 ["hello", "world"],词汇表 word_2_index{"<PAD>": 0, "<UNK>": 1, "hello": 2, "world": 3}max_len 为 5。以下是如何生成对应的 text_idx

  1. 原始句子: ["hello", "world"]
  2. 转换为索引: [2, 3]"hello" 对应索引 2,"world" 对应索引 3)
  3. 填充到 max_len: [2, 3, 0, 0, 0] (填充索引 0,表示 <PAD>

最后,text_idx 经过 torch.tensor(text_idx).unsqueeze(dim=0) 转换为 PyTorch 张量,并增加了一个维度:

text_idx = torch.tensor([2, 3, 0, 0, 0]).unsqueeze(dim=0)
  • 最终 text_idx 的形状: (1, 5)

具体的值如下:

tensor([[2, 3, 0, 0, 0]])

这个张量的形状 (1, 5) 表示它有一个批次(即第一个维度为 1),每个输入序列的长度是 max_len(这里为 5)。在实际使用中,这种形状适合直接输入到神经网络模型中进行处理。

前向传播网络

class Block(nn.Module):
    def __init__(self, kernel_s, embeddin_num, max_len, hidden_num):
        super().__init__()
        # shape [batch * in_channel * max_len * emb_num]
        self.cnn = nn.Conv2d(in_channels=1, out_channels=hidden_num, kernel_size=(kernel_s, embeddin_num))
        self.act = nn.ReLU()
        self.mxp = nn.MaxPool1d(kernel_size=(max_len - kernel_s + 1))

    def forward(self, batch_emb):
        c = self.cnn(batch_emb)
        a = self.act(c)
        a = a.squeeze(dim=-1)
        m = self.mxp(a)
        m = m.squeeze(dim=-1)
        return m

class TextCNNModel(nn.Module):
    def __init__(self, emb_matrix, max_len, class_num, hidden_num):
        super().__init__()
        self.emb_num = emb_matrix.weight.shape[1]
        self.block1 = Block(2, self.emb_num, max_len, hidden_num)
        self.block2 = Block(3, self.emb_num, max_len, hidden_num)
        self.block3 = Block(4, self.emb_num, max_len, hidden_num)
        self.emb_matrix = emb_matrix
        self.classifier = nn.Linear(hidden_num * 3, class_num)
        self.loss_fun = nn.CrossEntropyLoss()

    def forward(self, batch_idx):
        batch_emb = self.emb_matrix(batch_idx)
        b1_result = self.block1(batch_emb)
        b2_result = self.block2(batch_emb)
        b3_result = self.block3(batch_emb)
        feature = torch.cat([b1_result, b2_result, b3_result], dim=1)
        pre = self.classifier(feature)
        return pre

这个代码定义了一个用于文本分类任务的卷积神经网络模型 (TextCNNModel),它包含多个卷积块 (Block),每个卷积块使用不同的卷积核大小,以捕获文本中不同范围的特征。以下是对代码的详细解释:

1. Block

Block 类定义了一个卷积块,负责处理输入的嵌入向量并提取特征。

class Block(nn.Module):
    def __init__(self, kernel_s, embeddin_num, max_len, hidden_num):
        super().__init__()
        # shape [batch * in_channel * max_len * emb_num]
        self.cnn = nn.Conv2d(in_channels=1, out_channels=hidden_num, kernel_size=(kernel_s, embeddin_num))
        self.act = nn.ReLU()
        self.mxp = nn.MaxPool1d(kernel_size=(max_len - kernel_s + 1))
  • __init__ 方法:
    • kernel_s:卷积核的大小(高度)。
    • embeddin_num:嵌入向量的维度(宽度)。
    • max_len:句子的最大长度。
    • hidden_num:卷积层输出的通道数(即特征图数量)。
    • self.cnn:二维卷积层,处理输入形状为 [batch_size, 1, max_len, embeddin_num] 的张量。
    • self.act:激活函数 ReLU,应用于卷积层的输出。
    • self.mxp:一维最大池化层,缩小特征图的维度。
def forward(self, batch_emb):
    c = self.cnn(batch_emb)
    a = self.act(c)
    a = a.squeeze(dim=-1)
    m = self.mxp(a)
    m = m.squeeze(dim=-1)
    return m
  • forward 方法:
    • batch_emb:输入的嵌入矩阵,形状为 [batch_size, 1, max_len, embeddin_num]
    • c = self.cnn(batch_emb):对嵌入矩阵进行卷积操作,输出形状为 [batch_size, hidden_num, max_len - kernel_s + 1, 1]
    • a = self.act(c):应用激活函数 ReLU。
    • a = a.squeeze(dim=-1):去掉最后一个维度,形状变为 [batch_size, hidden_num, max_len - kernel_s + 1]
    • m = self.mxp(a):最大池化操作,输出形状为 [batch_size, hidden_num, 1]
    • m = m.squeeze(dim=-1):去掉最后一个维度,最终形状为 [batch_size, hidden_num]

2. TextCNNModel

TextCNNModel 是一个多通道的文本卷积神经网络模型,使用了多个不同大小的卷积核来提取文本特征。

class TextCNNModel(nn.Module):
    def __init__(self, emb_matrix, max_len, class_num, hidden_num):
        super().__init__()
        self.emb_num = emb_matrix.weight.shape[1]
        self.block1 = Block(2, self.emb_num, max_len, hidden_num)
        self.block2 = Block(3, self.emb_num, max_len, hidden_num)
        self.block3 = Block(4, self.emb_num, max_len, hidden_num)
        self.emb_matrix = emb_matrix
        self.classifier = nn.Linear(hidden_num * 3, class_num)
        self.loss_fun = nn.CrossEntropyLoss()
  • __init__ 方法:
    • emb_matrix:预训练的嵌入矩阵。
    • max_len:句子的最大长度。
    • class_num:分类任务的类别数量。
    • hidden_num:每个卷积块输出的特征图数量。
    • self.block1, self.block2, self.block3:三个卷积块,每个使用不同的卷积核大小(2、3、4),以捕获不同的 n-gram 特征。
    • self.classifier:全连接层,将拼接后的特征向量映射到类别标签。
    • self.loss_fun:交叉熵损失函数,用于计算分类任务的损失。
def forward(self, batch_idx):
    batch_emb = self.emb_matrix(batch_idx)
    b1_result = self.block1(batch_emb)
    b2_result = self.block2(batch_emb)
    b3_result = self.block3(batch_emb)
    feature = torch.cat([b1_result, b2_result, b3_result], dim=1)
    pre = self.classifier(feature)
    return pre
  • forward 方法:
    • batch_idx:输入的文本索引序列。
    • batch_emb = self.emb_matrix(batch_idx):将文本索引转换为嵌入矩阵。
    • b1_result = self.block1(batch_emb):通过第一个卷积块提取特征。
    • b2_result = self.block2(batch_emb):通过第二个卷积块提取特征。
    • b3_result = self.block3(batch_emb):通过第三个卷积块提取特征。
    • feature = torch.cat([b1_result, b2_result, b3_result], dim=1):将三个卷积块的输出特征拼接在一起,形成一个综合的特征向量。
    • pre = self.classifier(feature):通过全连接层进行分类预测。
    • return pre:返回分类结果。

为了更直观地理解这个 TextCNNModel 的工作流程,我们可以举一个简单的例子,演示从输入到输出的整个过程。

3. 例子概述

1. 输入处理

假设我们有以下输入和模型参数:

  • 输入句子"I love NLP"
  • 词汇表 word_2_index{"<PAD>": 0, "<UNK>": 1, "I": 2, "love": 3, "NLP": 4}
  • 句子最大长度 max_len:5
  • 嵌入维度 embedding_dim:3
  • 类别数量 class_num:2
  • 每个卷积块的输出通道数 hidden_num:2
  • 卷积核大小:2、3、4
1.1 索引化和填充
  • 原始句子索引化

    • 句子 "I love NLP" 对应的索引为 [2, 3, 4]
    • 由于 max_len 为 5,我们将句子填充到 [2, 3, 4, 0, 0]
  • 嵌入矩阵
    假设嵌入矩阵如下(形状为 [5, 3],即5个词汇,每个词汇有3维的嵌入向量):

    emb_matrix = torch.tensor([
        [0.0, 0.0, 0.0],  # <PAD>
        [0.1, 0.1, 0.1],  # <UNK>
        [0.2, 0.2, 0.2],  # I
        [0.3, 0.3, 0.3],  # love
        [0.4, 0.4, 0.4]   # NLP
    ])
    
  • 嵌入后的结果

    batch_idx = torch.tensor([[2, 3, 4, 0, 0]])
    batch_emb = emb_matrix[batch_idx]
    # shape: [1, 5, 3]
    

    batch_emb 的内容为:

    batch_emb = torch.tensor([
        [[0.2, 0.2, 0.2],  # I
         [0.3, 0.3, 0.3],  # love
         [0.4, 0.4, 0.4],  # NLP
         [0.0, 0.0, 0.0],  # <PAD>
         [0.0, 0.0, 0.0]]  # <PAD>
    ])
    

    转换为 torch 张量的形状为 [1, 1, 5, 3]

2. 通过卷积块提取特征

2.1 卷积块处理
  • 卷积核(示例):

    假设 Block 类中的卷积核和偏置如下:

    kernel = torch.tensor([
        [[0.1, 0.2, 0.1], [0.2, 0.1, 0.2]],  # 卷积核1
        [[0.3, 0.1, 0.3], [0.1, 0.3, 0.1]]   # 卷积核2
    ])
    
  • Block 类中的卷积操作(使用 kernel_s 为 2):

    c = self.cnn(batch_emb)
    # shape after cnn: [1, 2, 4, 1] -> [batch_size, hidden_num, max_len-kernel_size+1, 1]
    

    卷积操作的结果(示例)为:

    c = torch.tensor([
        [[[0.5], [0.6], [0.7], [0.8]],   # feature map 1
         [[0.4], [0.5], [0.6], [0.7]]]   # feature map 2
    ])
    

    对应于卷积核的特征图,形状为 [1, hidden_num, max_len-kernel_size+1, 1]

  • 激活函数和池化操作

    a = self.act(c)  # Apply ReLU activation
    m = self.mxp(a.squeeze(-1))  # Max pooling operation
    # shape after max pooling: [1, 2, 1] -> [batch_size, hidden_num, 1]
    

    am 的内容(示例)为:

    a = torch.tensor([
        [[0.5], [0.6], [0.7], [0.8]],  # ReLU activated feature map 1
        [[0.4], [0.5], [0.6], [0.7]]   # ReLU activated feature map 2
    ])
    
    m = torch.tensor([
        [[0.8]],  # Max pooled feature map 1
        [[0.7]]   # Max pooled feature map 2
    ])
    

    最终 m 的形状为 [1, hidden_num]

3. 合并特征并分类

3.1 多个卷积块的特征

假设 Block 类中的三个卷积块(kernel_s 为 2、3、4)的输出为:

b1_result = torch.tensor([[0.8, 0.7]])  # From Block with kernel size 2
b2_result = torch.tensor([[0.9, 0.8]])  # From Block with kernel size 3
b3_result = torch.tensor([[1.0, 0.9]])  # From Block with kernel size 4

3.2 拼接特征

feature = torch.cat([b1_result, b2_result, b3_result], dim=1)
# shape: [1, hidden_num * 3] -> [1, 6]

拼接后的特征向量为:

feature = torch.tensor([[0.8, 0.7, 0.9, 0.8, 1.0, 0.9]])

3.3 全连接层分类

假设 self.classifier 的权重和偏置如下:

classifier_weight = torch.tensor([
    [0.1, 0.2, 0.3, 0.4, 0.5, 0.6],  # For class 1
    [0.6, 0.5, 0.4, 0.3, 0.2, 0.1]   # For class 2
])
classifier_bias = torch.tensor([0.1, -0.1])

计算分类预测结果:

pre = torch.matmul(feature, classifier_weight.t()) + classifier_bias
# shape: [1, class_num] -> [1, 2]

最终输出的预测结果(示例)为:

pre = torch.tensor([[1.5, 0.7]])  # Logits for class 1 and class 2

模型测试和参数解析

def test_data():
    args = parsers()
    device = "cuda:0" if torch.cuda.is_available() else "cpu"
    dataset = pkl.load(open(args.data_pkl, "rb"))
    word_2_index, words_embedding = dataset[0], dataset[1]
    test_text, test_label = read_data(args.test_file)
    test_dataset = TextDataset(test_text, test_label, word_2_index, args.max_len)
    test_dataloader = DataLoader(test_dataset, batch_size=args.batch_size, shuffle=False)
    model = TextCNNModel(words_embedding, args.max_len, args.class_num, args.num_filters).to(device)
    model.load_state_dict(torch.load(args.save_model_best))
    model.eval()
    all_pred, all_true = [], []
    with torch.no_grad():
        for batch_text, batch_label in test_dataloader:
            batch_text, batch_label = batch_text.to(device), batch_label.to(device)
            pred = model(batch_text)
            pred = torch.argmax(pred, dim=1)
            pred = pred.cpu().numpy().tolist()
            label = batch_label.cpu().numpy().tolist()
            all_pred.extend(pred)
            all_true.extend(label)
    accuracy = accuracy_score(all_true, all_pred)
    print(f"test dataset accuracy: {accuracy:.4f}")

def parsers():
    parser = argparse.ArgumentParser(description="TextCNN model of argparse")
    parser.add_argument("--train_file", type=str, default=os.path.join("data", "train.txt"))
    parser.add_argument("--dev_file", type=str, default=os.path.join("data", "dev.txt"))
    parser.add_argument("--test_file", type=str, default=os.path.join("data", "test.txt"))
    parser.add_argument("--classification", type=str, default=os.path.join("data", "class.txt"))
    parser.add_argument("--data_pkl", type=str, default=os.path.join("data", "dataset.pkl"))
    parser.add_argument("--class_num", type=int, default=10)
    parser.add_argument("--max_len", type=int, default=38)
    parser.add_argument("--embedding_num", type=int, default=100)
    parser.add_argument("--batch_size", type=int, default=32)
    parser.add_argument("--epochs", type=int, default=30)
    parser.add_argument("--learn_rate", type=float, default=1e-3)
    parser.add_argument("--num_filters", type=int, default=2, help="卷积产生的通道数")
    parser.add_argument("--save_model_best", type=str, default=os.path.join("model", "best_model.pth"))
    parser.add_argument("--save_model_last", type=str, default=os.path.join("model", "last_model.pth"))
    args = parser.parse_args()

代码包含两个主要部分:test_data 函数和 parsers 函数。下面详细解释这两个部分的功能。

test_data 函数

test_data 函数用于测试训练好的文本分类模型,并计算测试数据集上的准确率。主要步骤如下:

  1. 获取参数

    args = parsers()
    

    调用 parsers 函数来获取命令行参数配置。

  2. 确定设备

    device = "cuda:0" if torch.cuda.is_available() else "cpu"
    

    根据是否有可用的 GPU,选择计算设备(GPU 或 CPU)。

  3. 加载数据

    dataset = pkl.load(open(args.data_pkl, "rb"))
    word_2_index, words_embedding = dataset[0], dataset[1]
    test_text, test_label = read_data(args.test_file)
    

    args.data_pkl 文件中加载词汇到索引的映射和词嵌入矩阵。然后从 args.test_file 文件中读取测试文本和标签。

  4. 创建数据集和数据加载器

    test_dataset = TextDataset(test_text, test_label, word_2_index, args.max_len)
    test_dataloader = DataLoader(test_dataset, batch_size=args.batch_size, shuffle=False)
    

    使用 TextDataset 类创建测试数据集,并用 DataLoader 创建数据加载器,以便批量处理数据。

  5. 初始化模型

    model = TextCNNModel(words_embedding, args.max_len, args.class_num, args.num_filters).to(device)
    model.load_state_dict(torch.load(args.save_model_best))
    model.eval()
    

    初始化 TextCNNModel 模型,加载预训练的模型权重,并将模型设置为评估模式。

  6. 模型推理

    all_pred, all_true = [], []
    with torch.no_grad():
        for batch_text, batch_label in test_dataloader:
            batch_text, batch_label = batch_text.to(device), batch_label.to(device)
            pred = model(batch_text)
            pred = torch.argmax(pred, dim=1)
            pred = pred.cpu().numpy().tolist()
            label = batch_label.cpu().numpy().tolist()
            all_pred.extend(pred)
            all_true.extend(label)
    

    在不计算梯度的情况下进行模型推理,将预测结果和真实标签保存到 all_predall_true 列表中。

  7. 计算并打印准确率

    accuracy = accuracy_score(all_true, all_pred)
    print(f"test dataset accuracy: {accuracy:.4f}")
    

    计算测试集上的准确率,并输出结果。

parsers 函数

parsers 函数用于解析命令行参数,以配置训练和测试过程中的各种设置。主要功能包括:

  1. 创建解析器

    parser = argparse.ArgumentParser(description="TextCNN model of argparse")
    
  2. 定义参数

    • 文件路径

      parser.add_argument("--train_file", type=str, default=os.path.join("data", "train.txt"))
      parser.add_argument("--dev_file", type=str, default=os.path.join("data", "dev.txt"))
      parser.add_argument("--test_file", type=str, default=os.path.join("data", "test.txt"))
      parser.add_argument("--classification", type=str, default=os.path.join("data", "class.txt"))
      parser.add_argument("--data_pkl", type=str, default=os.path.join("data", "dataset.pkl"))
      

      这些参数用于指定训练、验证、测试数据文件的路径以及数据集和分类文件的路径。

    • 模型参数

      parser.add_argument("--class_num", type=int, default=10)
      parser.add_argument("--max_len", type=int, default=38)
      parser.add_argument("--embedding_num", type=int, default=100)
      parser.add_argument("--batch_size", type=int, default=32)
      parser.add_argument("--epochs", type=int, default=30)
      parser.add_argument("--learn_rate", type=float, default=1e-3)
      parser.add_argument("--num_filters", type=int, default=2, help="卷积产生的通道数")
      parser.add_argument("--save_model_best", type=str, default=os.path.join("model", "best_model.pth"))
      parser.add_argument("--save_model_last", type=str, default=os.path.join("model", "last_model.pth"))
      

      这些参数用于配置模型的类别数量、最大句子长度、嵌入维度、批次大小、训练周期数、学习率、卷积层输出通道数以及保存模型的路径。

  3. 解析参数并返回

    args = parser.parse_args()
    return args
    

    解析命令行参数并将其返回,以便在程序中使用。

训练

if __name__ == "__main__":
    start = time.time()
    args = parsers()
    train_text, train_label = read_data(args.train_file)
    dev_text, dev_label = read_data(args.dev_file)
    device = "cuda:0" if torch.cuda.is_available() else "cpu"

    if os.path.exists(args.data_pkl):
        dataset = pkl.load(open(args.data_pkl, "rb"))
        word_2_index, words_embedding = dataset[0], dataset[1]
    else:
        word_2_index, words_embedding = built_curpus(train_text, args.embedding_num)

    train_dataset = TextDataset(train_text, train_label, word_2_index, args.max_len)
    train_loader = DataLoader(train_dataset, args.batch_size, shuffle=True)
    dev_dataset = TextDataset(dev_text, dev_label, word_2_index, args.max_len)
    dev_loader = DataLoader(dev_dataset, args.batch_size, shuffle=False)

    model = TextCNNModel(words_embedding, args.max_len, args.class_num, args.num_filters).to(device)
    opt = torch.optim.AdamW(model.parameters(), lr=args.learn_rate)
    loss_fn = nn.CrossEntropyLoss()
    acc_max = float("-inf")

    for epoch in range(args.epochs):
        model.train()
        loss_sum, count = 0, 0
        for batch_index, (batch_text, batch_label) in enumerate(train_loader):
            batch_text, batch_label = batch_text.to(device), batch_label.to(device)
            pred = model(batch_text)
            loss = loss_fn(pred, batch_label)
            opt.zero_grad()
            loss.backward()
            opt.step()
            loss_sum += loss
            count += 1
            # 打印内容
            if len(train_loader) - batch_index <= len(train_loader) % 1000 and count == len(train_loader) % 1000:
                msg = "[{0}/{1:5d}]\tTrain_Loss:{2:.4f}"
                print(msg.format(epoch + 1, batch_index + 1, loss_sum / count))
                loss_sum, count = 0.0, 0

            if batch_index % 1000 == 999:
                msg = "[{0}/{1:5d}]\tTrain_Loss:{2:.4f}"
                print(msg.format(epoch + 1, batch_index + 1, loss_sum / count))
                loss_sum, count = 0.0, 0

        model.eval()
        all_pred, all_true = [], []
        with torch.no_grad():
            for batch_text, batch_label in dev_loader:
                batch_text, batch_label = batch_text.to(device), batch_label.to(device)
                pred = model(batch_text)
                pred = torch.argmax(pred, dim=1)
                pred = pred.cpu().numpy().tolist()
                label = batch_label.cpu().numpy().tolist()
                all_pred.extend(pred)
                all_true.extend(label)
        acc = accuracy_score(all_pred, all_true)
        print(f"dev acc: {acc:.4f}")
        if acc > acc_max:
            acc_max = acc
            # 检查目录是否存在,如果不存在则创建
            if not os.path.exists(os.path.dirname(args.save_model_best)):
                os.makedirs(os.path.dirname(args.save_model_best))
            torch.save(model.state_dict(), args.save_model_best)
            print(f"Saved best model")

        print("*" * 50)

    end = time.time()
    print(f"Run time: {(end - start)/60:.4f} min")
    test_data()

该代码实现了一个基于 TextCNN 模型的文本分类任务。它负责从训练数据中学习,并通过在验证集上的性能(准确率)来保存最佳模型。代码使用了 PyTorch 框架来进行神经网络的训练和评估。以下是代码的详细解释:

1. 主程序入口

if __name__ == "__main__":

这部分是 Python 程序的入口,确保代码块仅在该脚本直接运行时执行,而不是在作为模块导入时执行。

2. 参数解析与数据准备

start = time.time()
args = parsers()
train_text, train_label = read_data(args.train_file)
dev_text, dev_label = read_data(args.dev_file)
device = "cuda:0" if torch.cuda.is_available() else "cpu"
  • start = time.time():记录程序开始执行的时间。
  • args = parsers():调用 parsers() 函数解析命令行参数,获取所有需要的超参数。
  • train_text, train_labeldev_text, dev_label:分别读取训练集和验证集的文本数据及标签。
  • device = "cuda:0" if torch.cuda.is_available() else "cpu":检查是否有 GPU 可用,如果有则使用 GPU,否则使用 CPU。

3. 词汇与嵌入加载

if os.path.exists(args.data_pkl):
    dataset = pkl.load(open(args.data_pkl, "rb"))
    word_2_index, words_embedding = dataset[0], dataset[1]
else:
    word_2_index, words_embedding = built_curpus(train_text, args.embedding_num)
  • 检查是否存在词汇与嵌入文件 data_pkl。如果存在,则加载词汇表和嵌入矩阵;否则,调用 built_curpus() 函数构建词汇表和嵌入矩阵。

4. 数据加载器的创建

train_dataset = TextDataset(train_text, train_label, word_2_index, args.max_len)
train_loader = DataLoader(train_dataset, args.batch_size, shuffle=True)
dev_dataset = TextDataset(dev_text, dev_label, word_2_index, args.max_len)
dev_loader = DataLoader(dev_dataset, args.batch_size, shuffle=False)
  • 创建 TextDataset 对象,并分别用于训练集和验证集。
  • 使用 DataLoader 创建数据加载器,train_loader 负责打乱数据,dev_loader 则保持数据顺序。

5. 模型与优化器的初始化

model = TextCNNModel(words_embedding, args.max_len, args.class_num, args.num_filters).to(device)
opt = torch.optim.AdamW(model.parameters(), lr=args.learn_rate)
loss_fn = nn.CrossEntropyLoss()
acc_max = float("-inf")
  • 初始化 TextCNNModel 模型,并将其移动到指定设备(GPU 或 CPU)。
  • 使用 AdamW 作为优化器,学习率通过命令行参数设置。
  • 定义交叉熵损失函数 loss_fn
  • acc_max 用于存储在验证集上最高的准确率。

6. 模型训练与验证

for epoch in range(args.epochs):
    model.train()
    loss_sum, count = 0, 0
    for batch_index, (batch_text, batch_label) in enumerate(train_loader):
        batch_text, batch_label = batch_text.to(device), batch_label.to(device)
        pred = model(batch_text)
        loss = loss_fn(pred, batch_label)
        opt.zero_grad()
        loss.backward()
        opt.step()
        loss_sum += loss
        count += 1
        # 打印内容
        if len(train_loader) - batch_index <= len(train_loader) % 1000 and count == len(train_loader) % 1000:
            msg = "[{0}/{1:5d}]\tTrain_Loss:{2:.4f}"
            print(msg.format(epoch + 1, batch_index + 1, loss_sum / count))
            loss_sum, count = 0.0, 0

        if batch_index % 1000 == 999:
            msg = "[{0}/{1:5d}]\tTrain_Loss:{2:.4f}"
            print(msg.format(epoch + 1, batch_index + 1, loss_sum / count))
            loss_sum, count = 0.0, 0
  • 训练循环中,模型进入训练模式 (model.train()),逐批次进行前向传播、计算损失、反向传播和参数更新。
  • loss_sumcount 用于累加并计算平均训练损失,打印训练信息。
  • 每 1000 个批次打印一次当前的训练损失。

7. 验证模型与保存最佳模型

    model.eval()
    all_pred, all_true = [], []
    with torch.no_grad():
        for batch_text, batch_label in dev_loader:
            batch_text, batch_label = batch_text.to(device), batch_label.to(device)
            pred = model(batch_text)
            pred = torch.argmax(pred, dim=1)
            pred = pred.cpu().numpy().tolist()
            label = batch_label.cpu().numpy().tolist()
            all_pred.extend(pred)
            all_true.extend(label)
    acc = accuracy_score(all_pred, all_true)
    print(f"dev acc: {acc:.4f}")
    if acc > acc_max:
        acc_max = acc
        # 检查目录是否存在,如果不存在则创建
        if not os.path.exists(os.path.dirname(args.save_model_best)):
            os.makedirs(os.path.dirname(args.save_model_best))
        torch.save(model.state_dict(), args.save_model_best)
        print(f"Saved best model")
  • 模型进入验证模式 (model.eval()),通过关闭梯度计算 (torch.no_grad()),在验证集上进行前向传播。
  • 计算当前模型在验证集上的准确率,并与之前的最佳准确率 acc_max 进行比较。
  • 如果当前准确率更高,则更新 acc_max 并保存当前模型为“最佳模型”。

8. 训练结束与测试

end = time.time()
print(f"Run time: {(end - start)/60:.4f} min")
test_data()
  • 记录程序结束时间,并计算和打印整个训练过程的运行时间。
  • 调用 test_data() 函数在测试集上评估模型的最终性能。

9.升级版本

import matplotlib.pyplot as plt

if __name__ == "__main__":
    start = time.time()
    args = parsers()
    train_text, train_label = read_data(args.train_file)
    dev_text, dev_label = read_data(args.dev_file)
    device = "cuda:0" if torch.cuda.is_available() else "cpu"

    if os.path.exists(args.data_pkl):
        dataset = pkl.load(open(args.data_pkl, "rb"))
        word_2_index, words_embedding = dataset[0], dataset[1]
    else:
        word_2_index, words_embedding = built_curpus(train_text, args.embedding_num)

    train_dataset = TextDataset(train_text, train_label, word_2_index, args.max_len)
    train_loader = DataLoader(train_dataset, args.batch_size, shuffle=True)
    dev_dataset = TextDataset(dev_text, dev_label, word_2_index, args.max_len)
    dev_loader = DataLoader(dev_dataset, args.batch_size, shuffle=False)

    model = TextCNNModel(words_embedding, args.max_len, args.class_num, args.num_filters).to(device)
    opt = torch.optim.AdamW(model.parameters(), lr=args.learn_rate)
    loss_fn = nn.CrossEntropyLoss()
    acc_max = float("-inf")

    # 用于记录损失和准确率
    train_losses, dev_losses, dev_accuracies = [], [], []

    for epoch in range(args.epochs):
        model.train()
        loss_sum, count = 0, 0
        for batch_index, (batch_text, batch_label) in enumerate(train_loader):
            batch_text, batch_label = batch_text.to(device), batch_label.to(device)
            pred = model(batch_text)
            loss = loss_fn(pred, batch_label)
            opt.zero_grad()
            loss.backward()
            opt.step()
            loss_sum += loss.item()
            count += 1
            # 打印内容
            if len(train_loader) - batch_index <= len(train_loader) % 1000 and count == len(train_loader) % 1000:
                msg = "[{0}/{1:5d}]\tTrain_Loss:{2:.4f}"
                print(msg.format(epoch + 1, batch_index + 1, loss_sum / count))
                loss_sum, count = 0.0, 0

            if batch_index % 1000 == 999:
                msg = "[{0}/{1:5d}]\tTrain_Loss:{2:.4f}"
                print(msg.format(epoch + 1, batch_index + 1, loss_sum / count))
                loss_sum, count = 0.0, 0

        train_losses.append(loss_sum / count)

        model.eval()
        dev_loss_sum, count = 0, 0
        all_pred, all_true = [], []
        with torch.no_grad():
            for batch_text, batch_label in dev_loader:
                batch_text, batch_label = batch_text.to(device), batch_label.to(device)
                pred = model(batch_text)
                dev_loss_sum += loss_fn(pred, batch_label).item()
                pred = torch.argmax(pred, dim=1)
                pred = pred.cpu().numpy().tolist()
                label = batch_label.cpu().numpy().tolist()
                all_pred.extend(pred)
                all_true.extend(label)
                count += 1

        dev_losses.append(dev_loss_sum / count)
        acc = accuracy_score(all_true, all_pred)
        dev_accuracies.append(acc)
        print(f"dev acc: {acc:.4f}")
        if acc > acc_max:
            acc_max = acc
            # 检查目录是否存在,如果不存在则创建
            if not os.path.exists(os.path.dirname(args.save_model_best)):
                os.makedirs(os.path.dirname(args.save_model_best))
            torch.save(model.state_dict(), args.save_model_best)
            print(f"Saved best model")

        print("*" * 50)

    end = time.time()
    print(f"Run time: {(end - start)/60:.4f} min")
    test_data()

    # 保存损失和准确率图像
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label="Train Loss")
    plt.plot(dev_losses, label="Validation Loss")
    plt.title("Loss over Epochs")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(dev_accuracies, label="Validation Accuracy")
    plt.title("Accuracy over Epochs")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.legend()

    plt.tight_layout()
    plt.savefig("training_metrics.png")
    plt.show()