TextCNN
文本数据预处理
1. read_data
函数
def read_data(file):
with open(file, encoding="utf-8") as f:
all_data = f.read().split("\n")
texts, labels = [], []
for data in all_data:
if data:
text, label = data.split("\t")
texts.append(text)
labels.append(label)
return texts, labels
- 功能:读取一个文件,并将文件中的每一行按制表符(
\t
)分割成文本和标签,分别存储在texts
和labels
列表中。 - 参数:
file
:文件路径。
- 返回值:
texts
:包含所有文本的列表。labels
:包含所有标签的列表。
2. built_curpus
函数
def built_curpus(train_texts, embedding_num):
word_2_index = {"<PAD>": 0, "<UNK>": 1}
for text in train_texts:
for word in text:
word_2_index[word] = word_2_index.get(word, len(word_2_index))
embedding = nn.Embedding(len(word_2_index), embedding_num)
pkl.dump([word_2_index, embedding], open(parsers().data_pkl, "wb"))
return word_2_index, embedding
代码定义了一个名为 built_curpus
的函数,该函数主要用于构建词汇表和词嵌入矩阵,并将它们保存到一个文件中。具体来说,这段代码完成了以下几项任务:
词汇表的初始化:
word_2_index = {"<PAD>": 0, "<UNK>": 1}
:这个字典用来将单词映射到对应的索引。"<PAD>"
和"<UNK>"
是两个特殊的标记,分别用于填充和未知单词,分别对应索引0
和1
。
构建词汇表:
for text in train_texts:
:遍历所有的训练文本,每个文本是一个包含多个单词的列表。for word in text:
:进一步遍历每个文本中的单词。word_2_index[word] = word_2_index.get(word, len(word_2_index))
:将每个单词添加到word_2_index
字典中。如果该单词已经存在于字典中,则保持其原有的索引;如果不存在,则将其添加到字典中,并分配一个新的索引(即当前字典的长度)。
词嵌入矩阵的创建:
embedding = nn.Embedding(len(word_2_index), embedding_num)
:这里使用 PyTorch 的nn.Embedding
创建了一个词嵌入矩阵。len(word_2_index)
表示词汇表的大小,而embedding_num
则表示每个单词的嵌入维度。nn.Embedding
会根据词汇表的大小和嵌入维度创建一个形状为[len(word_2_index), embedding_num]
的矩阵,每个单词会对应一行向量。
保存词汇表和嵌入矩阵:
pkl.dump([word_2_index, embedding], open(parsers().data_pkl, "wb"))
:将词汇表和嵌入矩阵保存到一个.pkl
文件中。这里使用了pickle
模块的dump
方法将数据序列化,并写入到指定的文件中。parsers().data_pkl
返回了文件路径(假设parsers()
是一个定义了data_pkl
属性的类或者函数)。
返回结果:
return word_2_index, embedding
:函数最终返回了构建的词汇表word_2_index
和词嵌入矩阵embedding
。
3. TextDataset
类
class TextDataset(Dataset):
def __init__(self, all_text, all_label, word_2_index, max_len):
self.all_text = all_text
self.all_label = all_label
self.word_2_index = word_2_index
self.max_len = max_len
def __getitem__(self, index):
text = self.all_text[index][:self.max_len]
label = int(self.all_label[index])
text_idx = [self.word_2_index.get(i, 1) for i in text]
text_idx = text_idx + [0] * (self.max_len - len(text_idx))
text_idx = torch.tensor(text_idx).unsqueeze(dim=0)
return text_idx, label
def __len__(self):
return len(self.all_text)
代码定义了一个名为 TextDataset
的类,继承自 torch.utils.data.Dataset
,主要用于处理文本数据以便在深度学习模型中使用。它实现了 PyTorch 的 Dataset
接口,包括 __init__
、__getitem__
和 __len__
三个核心方法。这个类的主要功能是将文本数据转换为适合模型输入的格式,包括将文本转化为索引序列,并将数据组织为数据集以供模型训练和评估。
1. __init__
方法
def __init__(self, all_text, all_label, word_2_index, max_len):
self.all_text = all_text
self.all_label = all_label
self.word_2_index = word_2_index
self.max_len = max_len
- 功能:初始化
TextDataset
对象的实例变量。self.all_text
:存储所有的文本数据,通常是一个包含文本序列的列表。self.all_label
:存储所有的标签数据,通常是一个包含对应标签的列表。self.word_2_index
:存储词汇表,即单词到索引的映射字典。self.max_len
:指定文本的最大长度。超出此长度的文本将被截断,短于此长度的文本将被填充。
2. __getitem__
方法
def __getitem__(self, index):
text = self.all_text[index][:self.max_len]
label = int(self.all_label[index])
text_idx = [self.word_2_index.get(i, 1) for i in text]
text_idx = text_idx + [0] * (self.max_len - len(text_idx))
text_idx = torch.tensor(text_idx).unsqueeze(dim=0)
return text_idx, label
- 功能:获取数据集中指定索引
index
处的文本和标签,并将文本转换为索引序列。text = self.all_text[index][:self.max_len]
:从self.all_text
中提取指定索引的文本,并截取到max_len
的长度。label = int(self.all_label[index])
:从self.all_label
中提取指定索引的标签,并转换为整数类型。text_idx = [self.word_2_index.get(i, 1) for i in text]
:将文本中的每个单词转换为对应的索引。如果单词不在词汇表中,则返回索引1
(通常对应于<UNK>
)。text_idx = text_idx + [0] * (self.max_len - len(text_idx))
:将索引序列填充到max_len
的长度,使用索引0
(通常对应于<PAD>
)。text_idx = torch.tensor(text_idx).unsqueeze(dim=0)
:将索引序列转换为 PyTorch 的tensor
,并在第一个维度上增加一个维度,使其适合模型的输入形状。return text_idx, label
:返回处理后的文本索引序列和对应的标签。
3. __len__
方法
def __len__(self):
return len(self.all_text)
- 功能:返回数据集的大小,即文本序列的数量。通常用于告诉 PyTorch 数据加载器(
DataLoader
)数据集的总长度。
4. 举例
在 TextDataset
类中,text_idx
的形状是一个 PyTorch 张量 (tensor
),其维度是 (1, max_len)
,其中 max_len
是在初始化时指定的最大句子长度。
假设我们有一个句子 ["hello", "world"]
,词汇表 word_2_index
为 {"<PAD>": 0, "<UNK>": 1, "hello": 2, "world": 3}
,max_len
为 5。以下是如何生成对应的 text_idx
:
- 原始句子:
["hello", "world"]
- 转换为索引:
[2, 3]
("hello"
对应索引 2,"world"
对应索引 3) - 填充到
max_len
:[2, 3, 0, 0, 0]
(填充索引 0,表示<PAD>
)
最后,text_idx
经过 torch.tensor(text_idx).unsqueeze(dim=0)
转换为 PyTorch 张量,并增加了一个维度:
text_idx = torch.tensor([2, 3, 0, 0, 0]).unsqueeze(dim=0)
- 最终
text_idx
的形状:(1, 5)
具体的值如下:
tensor([[2, 3, 0, 0, 0]])
这个张量的形状 (1, 5)
表示它有一个批次(即第一个维度为 1),每个输入序列的长度是 max_len
(这里为 5)。在实际使用中,这种形状适合直接输入到神经网络模型中进行处理。
前向传播网络
class Block(nn.Module):
def __init__(self, kernel_s, embeddin_num, max_len, hidden_num):
super().__init__()
# shape [batch * in_channel * max_len * emb_num]
self.cnn = nn.Conv2d(in_channels=1, out_channels=hidden_num, kernel_size=(kernel_s, embeddin_num))
self.act = nn.ReLU()
self.mxp = nn.MaxPool1d(kernel_size=(max_len - kernel_s + 1))
def forward(self, batch_emb):
c = self.cnn(batch_emb)
a = self.act(c)
a = a.squeeze(dim=-1)
m = self.mxp(a)
m = m.squeeze(dim=-1)
return m
class TextCNNModel(nn.Module):
def __init__(self, emb_matrix, max_len, class_num, hidden_num):
super().__init__()
self.emb_num = emb_matrix.weight.shape[1]
self.block1 = Block(2, self.emb_num, max_len, hidden_num)
self.block2 = Block(3, self.emb_num, max_len, hidden_num)
self.block3 = Block(4, self.emb_num, max_len, hidden_num)
self.emb_matrix = emb_matrix
self.classifier = nn.Linear(hidden_num * 3, class_num)
self.loss_fun = nn.CrossEntropyLoss()
def forward(self, batch_idx):
batch_emb = self.emb_matrix(batch_idx)
b1_result = self.block1(batch_emb)
b2_result = self.block2(batch_emb)
b3_result = self.block3(batch_emb)
feature = torch.cat([b1_result, b2_result, b3_result], dim=1)
pre = self.classifier(feature)
return pre
这个代码定义了一个用于文本分类任务的卷积神经网络模型 (TextCNNModel
),它包含多个卷积块 (Block
),每个卷积块使用不同的卷积核大小,以捕获文本中不同范围的特征。以下是对代码的详细解释:
1. Block
类
Block
类定义了一个卷积块,负责处理输入的嵌入向量并提取特征。
class Block(nn.Module):
def __init__(self, kernel_s, embeddin_num, max_len, hidden_num):
super().__init__()
# shape [batch * in_channel * max_len * emb_num]
self.cnn = nn.Conv2d(in_channels=1, out_channels=hidden_num, kernel_size=(kernel_s, embeddin_num))
self.act = nn.ReLU()
self.mxp = nn.MaxPool1d(kernel_size=(max_len - kernel_s + 1))
__init__
方法:kernel_s
:卷积核的大小(高度)。embeddin_num
:嵌入向量的维度(宽度)。max_len
:句子的最大长度。hidden_num
:卷积层输出的通道数(即特征图数量)。self.cnn
:二维卷积层,处理输入形状为[batch_size, 1, max_len, embeddin_num]
的张量。self.act
:激活函数 ReLU,应用于卷积层的输出。self.mxp
:一维最大池化层,缩小特征图的维度。
def forward(self, batch_emb):
c = self.cnn(batch_emb)
a = self.act(c)
a = a.squeeze(dim=-1)
m = self.mxp(a)
m = m.squeeze(dim=-1)
return m
forward
方法:batch_emb
:输入的嵌入矩阵,形状为[batch_size, 1, max_len, embeddin_num]
。c = self.cnn(batch_emb)
:对嵌入矩阵进行卷积操作,输出形状为[batch_size, hidden_num, max_len - kernel_s + 1, 1]
。a = self.act(c)
:应用激活函数 ReLU。a = a.squeeze(dim=-1)
:去掉最后一个维度,形状变为[batch_size, hidden_num, max_len - kernel_s + 1]
。m = self.mxp(a)
:最大池化操作,输出形状为[batch_size, hidden_num, 1]
。m = m.squeeze(dim=-1)
:去掉最后一个维度,最终形状为[batch_size, hidden_num]
。
2. TextCNNModel
类
TextCNNModel
是一个多通道的文本卷积神经网络模型,使用了多个不同大小的卷积核来提取文本特征。
class TextCNNModel(nn.Module):
def __init__(self, emb_matrix, max_len, class_num, hidden_num):
super().__init__()
self.emb_num = emb_matrix.weight.shape[1]
self.block1 = Block(2, self.emb_num, max_len, hidden_num)
self.block2 = Block(3, self.emb_num, max_len, hidden_num)
self.block3 = Block(4, self.emb_num, max_len, hidden_num)
self.emb_matrix = emb_matrix
self.classifier = nn.Linear(hidden_num * 3, class_num)
self.loss_fun = nn.CrossEntropyLoss()
__init__
方法:emb_matrix
:预训练的嵌入矩阵。max_len
:句子的最大长度。class_num
:分类任务的类别数量。hidden_num
:每个卷积块输出的特征图数量。self.block1
,self.block2
,self.block3
:三个卷积块,每个使用不同的卷积核大小(2、3、4),以捕获不同的 n-gram 特征。self.classifier
:全连接层,将拼接后的特征向量映射到类别标签。self.loss_fun
:交叉熵损失函数,用于计算分类任务的损失。
def forward(self, batch_idx):
batch_emb = self.emb_matrix(batch_idx)
b1_result = self.block1(batch_emb)
b2_result = self.block2(batch_emb)
b3_result = self.block3(batch_emb)
feature = torch.cat([b1_result, b2_result, b3_result], dim=1)
pre = self.classifier(feature)
return pre
forward
方法:batch_idx
:输入的文本索引序列。batch_emb = self.emb_matrix(batch_idx)
:将文本索引转换为嵌入矩阵。b1_result = self.block1(batch_emb)
:通过第一个卷积块提取特征。b2_result = self.block2(batch_emb)
:通过第二个卷积块提取特征。b3_result = self.block3(batch_emb)
:通过第三个卷积块提取特征。feature = torch.cat([b1_result, b2_result, b3_result], dim=1)
:将三个卷积块的输出特征拼接在一起,形成一个综合的特征向量。pre = self.classifier(feature)
:通过全连接层进行分类预测。return pre
:返回分类结果。
为了更直观地理解这个 TextCNNModel
的工作流程,我们可以举一个简单的例子,演示从输入到输出的整个过程。
3. 例子概述
1. 输入处理
假设我们有以下输入和模型参数:
- 输入句子:
"I love NLP"
- 词汇表
word_2_index
:{"<PAD>": 0, "<UNK>": 1, "I": 2, "love": 3, "NLP": 4}
- 句子最大长度
max_len
:5 - 嵌入维度
embedding_dim
:3 - 类别数量
class_num
:2 - 每个卷积块的输出通道数
hidden_num
:2 - 卷积核大小:2、3、4
1.1 索引化和填充
原始句子索引化:
- 句子
"I love NLP"
对应的索引为[2, 3, 4]
- 由于
max_len
为 5,我们将句子填充到[2, 3, 4, 0, 0]
- 句子
嵌入矩阵:
假设嵌入矩阵如下(形状为[5, 3]
,即5个词汇,每个词汇有3维的嵌入向量):emb_matrix = torch.tensor([ [0.0, 0.0, 0.0], # <PAD> [0.1, 0.1, 0.1], # <UNK> [0.2, 0.2, 0.2], # I [0.3, 0.3, 0.3], # love [0.4, 0.4, 0.4] # NLP ])
嵌入后的结果:
batch_idx = torch.tensor([[2, 3, 4, 0, 0]]) batch_emb = emb_matrix[batch_idx] # shape: [1, 5, 3]
batch_emb
的内容为:batch_emb = torch.tensor([ [[0.2, 0.2, 0.2], # I [0.3, 0.3, 0.3], # love [0.4, 0.4, 0.4], # NLP [0.0, 0.0, 0.0], # <PAD> [0.0, 0.0, 0.0]] # <PAD> ])
转换为
torch
张量的形状为[1, 1, 5, 3]
。
2. 通过卷积块提取特征
2.1 卷积块处理
卷积核(示例):
假设
Block
类中的卷积核和偏置如下:kernel = torch.tensor([ [[0.1, 0.2, 0.1], [0.2, 0.1, 0.2]], # 卷积核1 [[0.3, 0.1, 0.3], [0.1, 0.3, 0.1]] # 卷积核2 ])
Block
类中的卷积操作(使用kernel_s
为 2):c = self.cnn(batch_emb) # shape after cnn: [1, 2, 4, 1] -> [batch_size, hidden_num, max_len-kernel_size+1, 1]
卷积操作的结果(示例)为:
c = torch.tensor([ [[[0.5], [0.6], [0.7], [0.8]], # feature map 1 [[0.4], [0.5], [0.6], [0.7]]] # feature map 2 ])
对应于卷积核的特征图,形状为
[1, hidden_num, max_len-kernel_size+1, 1]
。激活函数和池化操作:
a = self.act(c) # Apply ReLU activation m = self.mxp(a.squeeze(-1)) # Max pooling operation # shape after max pooling: [1, 2, 1] -> [batch_size, hidden_num, 1]
a
和m
的内容(示例)为:a = torch.tensor([ [[0.5], [0.6], [0.7], [0.8]], # ReLU activated feature map 1 [[0.4], [0.5], [0.6], [0.7]] # ReLU activated feature map 2 ]) m = torch.tensor([ [[0.8]], # Max pooled feature map 1 [[0.7]] # Max pooled feature map 2 ])
最终
m
的形状为[1, hidden_num]
。
3. 合并特征并分类
3.1 多个卷积块的特征
假设 Block
类中的三个卷积块(kernel_s
为 2、3、4)的输出为:
b1_result = torch.tensor([[0.8, 0.7]]) # From Block with kernel size 2
b2_result = torch.tensor([[0.9, 0.8]]) # From Block with kernel size 3
b3_result = torch.tensor([[1.0, 0.9]]) # From Block with kernel size 4
3.2 拼接特征
feature = torch.cat([b1_result, b2_result, b3_result], dim=1)
# shape: [1, hidden_num * 3] -> [1, 6]
拼接后的特征向量为:
feature = torch.tensor([[0.8, 0.7, 0.9, 0.8, 1.0, 0.9]])
3.3 全连接层分类
假设 self.classifier
的权重和偏置如下:
classifier_weight = torch.tensor([
[0.1, 0.2, 0.3, 0.4, 0.5, 0.6], # For class 1
[0.6, 0.5, 0.4, 0.3, 0.2, 0.1] # For class 2
])
classifier_bias = torch.tensor([0.1, -0.1])
计算分类预测结果:
pre = torch.matmul(feature, classifier_weight.t()) + classifier_bias
# shape: [1, class_num] -> [1, 2]
最终输出的预测结果(示例)为:
pre = torch.tensor([[1.5, 0.7]]) # Logits for class 1 and class 2
模型测试和参数解析
def test_data():
args = parsers()
device = "cuda:0" if torch.cuda.is_available() else "cpu"
dataset = pkl.load(open(args.data_pkl, "rb"))
word_2_index, words_embedding = dataset[0], dataset[1]
test_text, test_label = read_data(args.test_file)
test_dataset = TextDataset(test_text, test_label, word_2_index, args.max_len)
test_dataloader = DataLoader(test_dataset, batch_size=args.batch_size, shuffle=False)
model = TextCNNModel(words_embedding, args.max_len, args.class_num, args.num_filters).to(device)
model.load_state_dict(torch.load(args.save_model_best))
model.eval()
all_pred, all_true = [], []
with torch.no_grad():
for batch_text, batch_label in test_dataloader:
batch_text, batch_label = batch_text.to(device), batch_label.to(device)
pred = model(batch_text)
pred = torch.argmax(pred, dim=1)
pred = pred.cpu().numpy().tolist()
label = batch_label.cpu().numpy().tolist()
all_pred.extend(pred)
all_true.extend(label)
accuracy = accuracy_score(all_true, all_pred)
print(f"test dataset accuracy: {accuracy:.4f}")
def parsers():
parser = argparse.ArgumentParser(description="TextCNN model of argparse")
parser.add_argument("--train_file", type=str, default=os.path.join("data", "train.txt"))
parser.add_argument("--dev_file", type=str, default=os.path.join("data", "dev.txt"))
parser.add_argument("--test_file", type=str, default=os.path.join("data", "test.txt"))
parser.add_argument("--classification", type=str, default=os.path.join("data", "class.txt"))
parser.add_argument("--data_pkl", type=str, default=os.path.join("data", "dataset.pkl"))
parser.add_argument("--class_num", type=int, default=10)
parser.add_argument("--max_len", type=int, default=38)
parser.add_argument("--embedding_num", type=int, default=100)
parser.add_argument("--batch_size", type=int, default=32)
parser.add_argument("--epochs", type=int, default=30)
parser.add_argument("--learn_rate", type=float, default=1e-3)
parser.add_argument("--num_filters", type=int, default=2, help="卷积产生的通道数")
parser.add_argument("--save_model_best", type=str, default=os.path.join("model", "best_model.pth"))
parser.add_argument("--save_model_last", type=str, default=os.path.join("model", "last_model.pth"))
args = parser.parse_args()
代码包含两个主要部分:test_data
函数和 parsers
函数。下面详细解释这两个部分的功能。
test_data
函数
test_data
函数用于测试训练好的文本分类模型,并计算测试数据集上的准确率。主要步骤如下:
获取参数:
args = parsers()
调用
parsers
函数来获取命令行参数配置。确定设备:
device = "cuda:0" if torch.cuda.is_available() else "cpu"
根据是否有可用的 GPU,选择计算设备(GPU 或 CPU)。
加载数据:
dataset = pkl.load(open(args.data_pkl, "rb")) word_2_index, words_embedding = dataset[0], dataset[1] test_text, test_label = read_data(args.test_file)
从
args.data_pkl
文件中加载词汇到索引的映射和词嵌入矩阵。然后从args.test_file
文件中读取测试文本和标签。创建数据集和数据加载器:
test_dataset = TextDataset(test_text, test_label, word_2_index, args.max_len) test_dataloader = DataLoader(test_dataset, batch_size=args.batch_size, shuffle=False)
使用
TextDataset
类创建测试数据集,并用DataLoader
创建数据加载器,以便批量处理数据。初始化模型:
model = TextCNNModel(words_embedding, args.max_len, args.class_num, args.num_filters).to(device) model.load_state_dict(torch.load(args.save_model_best)) model.eval()
初始化
TextCNNModel
模型,加载预训练的模型权重,并将模型设置为评估模式。模型推理:
all_pred, all_true = [], [] with torch.no_grad(): for batch_text, batch_label in test_dataloader: batch_text, batch_label = batch_text.to(device), batch_label.to(device) pred = model(batch_text) pred = torch.argmax(pred, dim=1) pred = pred.cpu().numpy().tolist() label = batch_label.cpu().numpy().tolist() all_pred.extend(pred) all_true.extend(label)
在不计算梯度的情况下进行模型推理,将预测结果和真实标签保存到
all_pred
和all_true
列表中。计算并打印准确率:
accuracy = accuracy_score(all_true, all_pred) print(f"test dataset accuracy: {accuracy:.4f}")
计算测试集上的准确率,并输出结果。
parsers
函数
parsers
函数用于解析命令行参数,以配置训练和测试过程中的各种设置。主要功能包括:
创建解析器:
parser = argparse.ArgumentParser(description="TextCNN model of argparse")
定义参数:
文件路径:
parser.add_argument("--train_file", type=str, default=os.path.join("data", "train.txt")) parser.add_argument("--dev_file", type=str, default=os.path.join("data", "dev.txt")) parser.add_argument("--test_file", type=str, default=os.path.join("data", "test.txt")) parser.add_argument("--classification", type=str, default=os.path.join("data", "class.txt")) parser.add_argument("--data_pkl", type=str, default=os.path.join("data", "dataset.pkl"))
这些参数用于指定训练、验证、测试数据文件的路径以及数据集和分类文件的路径。
模型参数:
parser.add_argument("--class_num", type=int, default=10) parser.add_argument("--max_len", type=int, default=38) parser.add_argument("--embedding_num", type=int, default=100) parser.add_argument("--batch_size", type=int, default=32) parser.add_argument("--epochs", type=int, default=30) parser.add_argument("--learn_rate", type=float, default=1e-3) parser.add_argument("--num_filters", type=int, default=2, help="卷积产生的通道数") parser.add_argument("--save_model_best", type=str, default=os.path.join("model", "best_model.pth")) parser.add_argument("--save_model_last", type=str, default=os.path.join("model", "last_model.pth"))
这些参数用于配置模型的类别数量、最大句子长度、嵌入维度、批次大小、训练周期数、学习率、卷积层输出通道数以及保存模型的路径。
解析参数并返回:
args = parser.parse_args() return args
解析命令行参数并将其返回,以便在程序中使用。
训练
if __name__ == "__main__":
start = time.time()
args = parsers()
train_text, train_label = read_data(args.train_file)
dev_text, dev_label = read_data(args.dev_file)
device = "cuda:0" if torch.cuda.is_available() else "cpu"
if os.path.exists(args.data_pkl):
dataset = pkl.load(open(args.data_pkl, "rb"))
word_2_index, words_embedding = dataset[0], dataset[1]
else:
word_2_index, words_embedding = built_curpus(train_text, args.embedding_num)
train_dataset = TextDataset(train_text, train_label, word_2_index, args.max_len)
train_loader = DataLoader(train_dataset, args.batch_size, shuffle=True)
dev_dataset = TextDataset(dev_text, dev_label, word_2_index, args.max_len)
dev_loader = DataLoader(dev_dataset, args.batch_size, shuffle=False)
model = TextCNNModel(words_embedding, args.max_len, args.class_num, args.num_filters).to(device)
opt = torch.optim.AdamW(model.parameters(), lr=args.learn_rate)
loss_fn = nn.CrossEntropyLoss()
acc_max = float("-inf")
for epoch in range(args.epochs):
model.train()
loss_sum, count = 0, 0
for batch_index, (batch_text, batch_label) in enumerate(train_loader):
batch_text, batch_label = batch_text.to(device), batch_label.to(device)
pred = model(batch_text)
loss = loss_fn(pred, batch_label)
opt.zero_grad()
loss.backward()
opt.step()
loss_sum += loss
count += 1
# 打印内容
if len(train_loader) - batch_index <= len(train_loader) % 1000 and count == len(train_loader) % 1000:
msg = "[{0}/{1:5d}]\tTrain_Loss:{2:.4f}"
print(msg.format(epoch + 1, batch_index + 1, loss_sum / count))
loss_sum, count = 0.0, 0
if batch_index % 1000 == 999:
msg = "[{0}/{1:5d}]\tTrain_Loss:{2:.4f}"
print(msg.format(epoch + 1, batch_index + 1, loss_sum / count))
loss_sum, count = 0.0, 0
model.eval()
all_pred, all_true = [], []
with torch.no_grad():
for batch_text, batch_label in dev_loader:
batch_text, batch_label = batch_text.to(device), batch_label.to(device)
pred = model(batch_text)
pred = torch.argmax(pred, dim=1)
pred = pred.cpu().numpy().tolist()
label = batch_label.cpu().numpy().tolist()
all_pred.extend(pred)
all_true.extend(label)
acc = accuracy_score(all_pred, all_true)
print(f"dev acc: {acc:.4f}")
if acc > acc_max:
acc_max = acc
# 检查目录是否存在,如果不存在则创建
if not os.path.exists(os.path.dirname(args.save_model_best)):
os.makedirs(os.path.dirname(args.save_model_best))
torch.save(model.state_dict(), args.save_model_best)
print(f"Saved best model")
print("*" * 50)
end = time.time()
print(f"Run time: {(end - start)/60:.4f} min")
test_data()
该代码实现了一个基于 TextCNN
模型的文本分类任务。它负责从训练数据中学习,并通过在验证集上的性能(准确率)来保存最佳模型。代码使用了 PyTorch 框架来进行神经网络的训练和评估。以下是代码的详细解释:
1. 主程序入口
if __name__ == "__main__":
这部分是 Python 程序的入口,确保代码块仅在该脚本直接运行时执行,而不是在作为模块导入时执行。
2. 参数解析与数据准备
start = time.time()
args = parsers()
train_text, train_label = read_data(args.train_file)
dev_text, dev_label = read_data(args.dev_file)
device = "cuda:0" if torch.cuda.is_available() else "cpu"
start = time.time()
:记录程序开始执行的时间。args = parsers()
:调用parsers()
函数解析命令行参数,获取所有需要的超参数。train_text, train_label
和dev_text, dev_label
:分别读取训练集和验证集的文本数据及标签。device = "cuda:0" if torch.cuda.is_available() else "cpu"
:检查是否有 GPU 可用,如果有则使用 GPU,否则使用 CPU。
3. 词汇与嵌入加载
if os.path.exists(args.data_pkl):
dataset = pkl.load(open(args.data_pkl, "rb"))
word_2_index, words_embedding = dataset[0], dataset[1]
else:
word_2_index, words_embedding = built_curpus(train_text, args.embedding_num)
- 检查是否存在词汇与嵌入文件
data_pkl
。如果存在,则加载词汇表和嵌入矩阵;否则,调用built_curpus()
函数构建词汇表和嵌入矩阵。
4. 数据加载器的创建
train_dataset = TextDataset(train_text, train_label, word_2_index, args.max_len)
train_loader = DataLoader(train_dataset, args.batch_size, shuffle=True)
dev_dataset = TextDataset(dev_text, dev_label, word_2_index, args.max_len)
dev_loader = DataLoader(dev_dataset, args.batch_size, shuffle=False)
- 创建
TextDataset
对象,并分别用于训练集和验证集。 - 使用
DataLoader
创建数据加载器,train_loader
负责打乱数据,dev_loader
则保持数据顺序。
5. 模型与优化器的初始化
model = TextCNNModel(words_embedding, args.max_len, args.class_num, args.num_filters).to(device)
opt = torch.optim.AdamW(model.parameters(), lr=args.learn_rate)
loss_fn = nn.CrossEntropyLoss()
acc_max = float("-inf")
- 初始化
TextCNNModel
模型,并将其移动到指定设备(GPU 或 CPU)。 - 使用
AdamW
作为优化器,学习率通过命令行参数设置。 - 定义交叉熵损失函数
loss_fn
。 acc_max
用于存储在验证集上最高的准确率。
6. 模型训练与验证
for epoch in range(args.epochs):
model.train()
loss_sum, count = 0, 0
for batch_index, (batch_text, batch_label) in enumerate(train_loader):
batch_text, batch_label = batch_text.to(device), batch_label.to(device)
pred = model(batch_text)
loss = loss_fn(pred, batch_label)
opt.zero_grad()
loss.backward()
opt.step()
loss_sum += loss
count += 1
# 打印内容
if len(train_loader) - batch_index <= len(train_loader) % 1000 and count == len(train_loader) % 1000:
msg = "[{0}/{1:5d}]\tTrain_Loss:{2:.4f}"
print(msg.format(epoch + 1, batch_index + 1, loss_sum / count))
loss_sum, count = 0.0, 0
if batch_index % 1000 == 999:
msg = "[{0}/{1:5d}]\tTrain_Loss:{2:.4f}"
print(msg.format(epoch + 1, batch_index + 1, loss_sum / count))
loss_sum, count = 0.0, 0
- 训练循环中,模型进入训练模式 (
model.train()
),逐批次进行前向传播、计算损失、反向传播和参数更新。 loss_sum
和count
用于累加并计算平均训练损失,打印训练信息。- 每 1000 个批次打印一次当前的训练损失。
7. 验证模型与保存最佳模型
model.eval()
all_pred, all_true = [], []
with torch.no_grad():
for batch_text, batch_label in dev_loader:
batch_text, batch_label = batch_text.to(device), batch_label.to(device)
pred = model(batch_text)
pred = torch.argmax(pred, dim=1)
pred = pred.cpu().numpy().tolist()
label = batch_label.cpu().numpy().tolist()
all_pred.extend(pred)
all_true.extend(label)
acc = accuracy_score(all_pred, all_true)
print(f"dev acc: {acc:.4f}")
if acc > acc_max:
acc_max = acc
# 检查目录是否存在,如果不存在则创建
if not os.path.exists(os.path.dirname(args.save_model_best)):
os.makedirs(os.path.dirname(args.save_model_best))
torch.save(model.state_dict(), args.save_model_best)
print(f"Saved best model")
- 模型进入验证模式 (
model.eval()
),通过关闭梯度计算 (torch.no_grad()
),在验证集上进行前向传播。 - 计算当前模型在验证集上的准确率,并与之前的最佳准确率
acc_max
进行比较。 - 如果当前准确率更高,则更新
acc_max
并保存当前模型为“最佳模型”。
8. 训练结束与测试
end = time.time()
print(f"Run time: {(end - start)/60:.4f} min")
test_data()
- 记录程序结束时间,并计算和打印整个训练过程的运行时间。
- 调用
test_data()
函数在测试集上评估模型的最终性能。
9.升级版本
import matplotlib.pyplot as plt
if __name__ == "__main__":
start = time.time()
args = parsers()
train_text, train_label = read_data(args.train_file)
dev_text, dev_label = read_data(args.dev_file)
device = "cuda:0" if torch.cuda.is_available() else "cpu"
if os.path.exists(args.data_pkl):
dataset = pkl.load(open(args.data_pkl, "rb"))
word_2_index, words_embedding = dataset[0], dataset[1]
else:
word_2_index, words_embedding = built_curpus(train_text, args.embedding_num)
train_dataset = TextDataset(train_text, train_label, word_2_index, args.max_len)
train_loader = DataLoader(train_dataset, args.batch_size, shuffle=True)
dev_dataset = TextDataset(dev_text, dev_label, word_2_index, args.max_len)
dev_loader = DataLoader(dev_dataset, args.batch_size, shuffle=False)
model = TextCNNModel(words_embedding, args.max_len, args.class_num, args.num_filters).to(device)
opt = torch.optim.AdamW(model.parameters(), lr=args.learn_rate)
loss_fn = nn.CrossEntropyLoss()
acc_max = float("-inf")
# 用于记录损失和准确率
train_losses, dev_losses, dev_accuracies = [], [], []
for epoch in range(args.epochs):
model.train()
loss_sum, count = 0, 0
for batch_index, (batch_text, batch_label) in enumerate(train_loader):
batch_text, batch_label = batch_text.to(device), batch_label.to(device)
pred = model(batch_text)
loss = loss_fn(pred, batch_label)
opt.zero_grad()
loss.backward()
opt.step()
loss_sum += loss.item()
count += 1
# 打印内容
if len(train_loader) - batch_index <= len(train_loader) % 1000 and count == len(train_loader) % 1000:
msg = "[{0}/{1:5d}]\tTrain_Loss:{2:.4f}"
print(msg.format(epoch + 1, batch_index + 1, loss_sum / count))
loss_sum, count = 0.0, 0
if batch_index % 1000 == 999:
msg = "[{0}/{1:5d}]\tTrain_Loss:{2:.4f}"
print(msg.format(epoch + 1, batch_index + 1, loss_sum / count))
loss_sum, count = 0.0, 0
train_losses.append(loss_sum / count)
model.eval()
dev_loss_sum, count = 0, 0
all_pred, all_true = [], []
with torch.no_grad():
for batch_text, batch_label in dev_loader:
batch_text, batch_label = batch_text.to(device), batch_label.to(device)
pred = model(batch_text)
dev_loss_sum += loss_fn(pred, batch_label).item()
pred = torch.argmax(pred, dim=1)
pred = pred.cpu().numpy().tolist()
label = batch_label.cpu().numpy().tolist()
all_pred.extend(pred)
all_true.extend(label)
count += 1
dev_losses.append(dev_loss_sum / count)
acc = accuracy_score(all_true, all_pred)
dev_accuracies.append(acc)
print(f"dev acc: {acc:.4f}")
if acc > acc_max:
acc_max = acc
# 检查目录是否存在,如果不存在则创建
if not os.path.exists(os.path.dirname(args.save_model_best)):
os.makedirs(os.path.dirname(args.save_model_best))
torch.save(model.state_dict(), args.save_model_best)
print(f"Saved best model")
print("*" * 50)
end = time.time()
print(f"Run time: {(end - start)/60:.4f} min")
test_data()
# 保存损失和准确率图像
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.plot(train_losses, label="Train Loss")
plt.plot(dev_losses, label="Validation Loss")
plt.title("Loss over Epochs")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(dev_accuracies, label="Validation Accuracy")
plt.title("Accuracy over Epochs")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.tight_layout()
plt.savefig("training_metrics.png")
plt.show()