2 Star 1 Fork 0

jason / 知识工程大作业__地址信息

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
LSTM.py 10.50 KB
一键复制 编辑 原始数据 按行查看 历史
wuzirui 提交于 2021-07-06 19:31 . DEAD VERSION
#!/usr/bin/env python
# coding: utf-8
# In[1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import time
import random
import sys
import numpy
import matplotlib.pyplot as plt
import torch.optim as optim
from torchcrf import CRF
from langconv import *
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence,pack_padded_sequence,pack_sequence,pad_packed_sequence
# In[2]:
list_embedding = []
dict = {} # 索引字典
list1 = [] # 用来存储训练集
list3 = []
batch_size = 1
sql_length = 5000
device = torch.device('cuda:0')
batch_all = 600
input_size = 50
# In[3]:
# class Classification(nn.Module):
# def __init__(self, indim, hidden_dim, outdim):
# super(Classification, self).__init__()
# self.indim, self.hidden_dim, self.outdim = indim, hidden_dim, outdim
# self.num_layers = 2
# self.LSTM = nn.LSTM(indim, hidden_dim // 2, num_layers=self.num_layers, bidirectional=True)
# self.Linear = nn.Linear(hidden_dim, outdim)
# self.hidden = self.init_hidden()
# def init_hidden(self):
# return (torch.randn(2 * self.num_layers, 1, self.hidden_dim // 2).cuda(), torch.randn(2 * self.num_layers, 1, self.hidden_dim // 2).cuda())
# def forward(self, sentence):
# self.hidden = self.init_hidden()
# LSTM_out, self.hidden = self.LSTM(sentence, self.hidden)
# LSTM_out = LSTM_out.view(sentence.shape[0], self.hidden_dim)
# Linear_out = self.Linear(LSTM_out)
# return Linear_out - Linear_out.logsumexp(dim=1, keepdims=True)
# model = Classification(50, 500, 57).cuda()
# optimizer = optim.Adam(model.parameters(), lr=0.0005)
# criterion = nn.NLLLoss()
# In[4]:
class GRU(nn.Module):
def __init__(self, indim, hidden_dim, outdim):
super(GRU, self).__init__()
self.indim, self.hidden_dim, self.outdim = indim, hidden_dim, outdim
self.num_layers = 2
self.gru = nn.GRU(indim, hidden_dim // 2, num_layers=self.num_layers, bidirectional=True)
self.Linear = nn.Linear(hidden_dim, outdim)
self.CRF = CRF(outdim)
self.hidden = self.init_hidden()
def init_hidden(self):
return (torch.randn(2 * self.num_layers, 69, self.hidden_dim // 2).cuda())
def forward(self, sentences, seq_lengths):
self.hidden = self.init_hidden()
x_padded = nn.utils.rnn.pack_padded_sequence(input=sentences, lengths=seq_lengths, batch_first=True, enforce_sorted=False)
LSTM_out, self.hidden = self.gru(x_padded, self.hidden)
LSTM_out = nn.utils.rnn.pad_packed_sequence(LSTM_out, batch_first=True)
LSTM_out = LSTM_out.view(sentence.shape[0], self.hidden_dim)
Linear_out = self.Linear(LSTM_out)
softmax_out = Linear_out - Linear_out.logsumexp(dim=1, keepdims=True)
return softmax_out
def loss(self, P_packed, Y_packed):
P_flatten = P_packed.view(-1)
y_flatten = Y_packed.view(-1)
return -self.CRF(P_flatten, y_flatten)
model = GRU(50, 100, 57).cuda()
optimizer = optim.Adam(model.parameters(), lr=0.0008)
# criterion = nn.NLLLoss()
# In[5]:
# torch.backends.cudnn.benchmark = True
# torch.backends.cudnn.deterministic = False
# torch.backends.cudnn.enabled = True
classify_dict = {}
def make_tenxor(fo):
text = fo.read()
text_in_line = text.split("\n") # 将所有数据按行分开
training_list = []
for i1 in text_in_line:
if len(i1) != 0:
m = i1.split(" ")
if m[1] not in classify_dict:
classify_dict[m[1]] = len(classify_dict)
else:
m = "flag"
training_list.append(m)
print(training_list[0:50])
print("-"*50)
training_x = []
training_y = []
training_len = []
training_list = iter(training_list)
# print(training_list)
for i in training_list:
x_part=[]
y_part=[]
while i != "flag":
i[0] = Converter('zh-hans').convert(i[0])
if i[0] in dict:
llist = list_embedding[dict[i[0]]][1:51]
else:
llist = []
for ii in range(51):
llist.append(random.random())
list_embedding.append(llist)
dict[i[0]] = len(list_embedding) - 1
llist = llist[1:51]
typ = classify_dict[i[1]]
x_part.append(llist)
y_part.append(typ)
try:
i = next(training_list)
except StopIteration:
break
training_x.append(torch.Tensor(x_part).cuda())
training_y.append(torch.Tensor(y_part).cuda())
training_len.append(len(x_part))
return training_x, training_y, training_len
def check_f1(out, testing_y):
total = 0
correct = 0
for i in range(len(out)):
total += 1
if out[i] == testing_y[i]:
correct += 1
if correct != 0:
f1 = correct / total
else:
f1 = 0
return f1
# In[6]:
filename = 'ctb.50d.vec'
word_embedding = open(filename, mode="r", encoding='utf-8')
word_embedding = word_embedding.read()
list_embedding = word_embedding.split("\n")
for i in range(len(list_embedding) - 3):
list_embedding[i] = list_embedding[i].split(" ")
for i1 in range(1, 51):
list_embedding[i][i1] = float(list_embedding[i][i1])
dict[list_embedding[i][0]] = i
if i % 10000 == 0:
print(i)
# 打开文件并读入所有行
start_time = time.time()
# In[7]:
f0 = open("train.txt", encoding='utf-8')
f1 = open("dev.txt", encoding='utf-8')
f2 = open("final_test.txt", encoding="utf-8")
# In[8]:
f0 = open("train.txt", encoding='utf-8')
training_x, training_y, training_lengths = make_tenxor(f0)
f1 = open("dev.txt", encoding='utf-8')
dev_x, dev_y, dev_lengths = make_tenxor(f1)
text = f2.read()
text_in_line = text.split("\n")
testing_x = []
for i1 in text_in_line:
if len(i1) != 0:
m = i1.split("\x01")
for ii in m[1]:
if ii in dict:
llist = list_embedding[dict[ii]][1:51]
testing_x.append(llist)
epoch = 0
start_time = time.time()
batch_all = len(training_x) // (sql_length * batch_size)
epoch_all = 100
roll = 0
# In[9]:
for epoch in range(epoch_all):
loss1 = 0
for batch in range(batch_all):
training_xx = pad_sequence(training_x[batch * sql_length * batch_size: (batch + 1) * sql_length * batch_size], batch_first=True, padding_value=0).cuda()
training_yy = pad_sequence(training_y[batch * sql_length * batch_size: (batch + 1) * sql_length * batch_size], batch_first=True, padding_value=0).cuda()
training_ll = training_lengths[batch * sql_length * batch_size: (batch + 1) * sql_length * batch_size]
output = model(training_xx, training_ll)
# out = out.view(sql_length, 4, batch_size) # 多batch
training_yy = training_yy.reshape(training_yy.shape[0]).cuda() # 单batch
# loss = criterion(output, training_yy)
loss = -model.CRF(output.view(1, output.shape[0], -1), training_yy.view(1, -1))
optimizer.zero_grad()
loss.backward()
optimizer.step()
loss1 += loss.item()
# print(loss1)
if batch % 20 == 0:
with torch.no_grad():
out = model(dev_x)
# loss1 = criterion(out, dev_y)
# loss1 = -model.CRF(out.view(1, out.shape[0], -1), dev_y.view(1, -1))
loss1 = model.loss()
out = torch.argmax(out, dim=1)
f1 = check_f1(out, dev_y)
print("epoch= %d, loss= %f, f1= %f" %
(epoch, loss1.item(), f1))
roll += 1
"""
if roll != 1:
plt.plot(roll, loss1.item(), '*')
plt.pause(0.01)
"""
# print("batch= %d,f1= %f" % (batch, f1))
m = 0
# In[ ]:
# 读取测试集
f2 = open("final_test.txt", encoding="utf-8")
text = f2.read()
text_in_line = text.split("\n")
testing_x = []
# 生成测试集输入矩阵
for i1 in text_in_line:
if len(i1) != 0:
m = i1.split("\x01")
for ii in m[1]:
# ii = Converter('zh-hans').convert(ii)
if ii in dict:
llist = list_embedding[dict[ii]][1:51]
else:
llist = []
for i2 in range(51):
llist.append(float(random.random()))
list_embedding.append(llist)
dict[ii] = len(list_embedding) - 1
llist = llist[1:51]
testing_x.append(llist)
# In[ ]:
def generateOutText(tx):
tx = tx.cuda()
out1 = model(tx).argmax(1)
return out1.tolist()
# In[ ]:
classify_dict1 = list(classify_dict.keys())
testing_x = torch.Tensor(testing_x)
testing_x = testing_x.view(-1, batch_size, input_size)
out_batch_size = 5000
out_batchnum = len(testing_x) // out_batch_size
out_tag = []
with torch.no_grad():
for batch in range(out_batchnum):
out_tag += generateOutText(testing_x[out_batch_size * batch : (batch + 1) * out_batch_size])
if batch % 10 == 0:
print("batch %d/%d" % (batch, out_batchnum))
out_tag += generateOutText(testing_x[out_batch_size * (len(testing_x) // out_batch_size) ::])
# In[ ]:
out_text = []
with torch.no_grad():
pos = 0
for i in text_in_line:
i = i.split("\x01")
if len(i) <= 1:
continue
for i1 in range(len(i[1])):
typ = classify_dict1[out_tag[pos]]
pos += 1
i.append(typ)
out_text.append(i)
# In[ ]:
f3 = open("output.txt", "w", encoding="utf-8")
for i in range(len(out_text)):
out_str = ""
for i1 in range(len(out_text[i])):
if i1 == 0 or i1 == 1:
out_str += out_text[i][i1]
out_str += "\x01"
elif i1 == len(out_text[i]) - 1:
out_str += out_text[i][i1]
else:
out_str += out_text[i][i1]
out_str += " "
out_str += "\n"
f3.write(out_str)
# In[ ]:
f3 = open("output.txt", "w", encoding="utf-8")
for i in range(len(out_text)):
out_str = ""
for i1 in range(len(out_text[i])):
if i1 == 0 or i1 == 1:
out_str += out_text[i][i1]
out_str += "\x01"
elif i1 == len(out_text[i]) - 1:
out_str += out_text[i][i1]
else:
out_str += out_text[i][i1]
out_str += " "
out_str += "\n"
f3.write(out_str)
# In[ ]:
Python
1
https://gitee.com/jasonliu233/project.git
git@gitee.com:jasonliu233/project.git
jasonliu233
project
知识工程大作业__地址信息
master

搜索帮助