-
Notifications
You must be signed in to change notification settings - Fork 0
/
SAM.py
139 lines (113 loc) · 4.75 KB
/
SAM.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from keras import models
from keras import layers
from keras import Input, Model
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras import regularizers
from keras import optimizers
from keras import backend as K
from keras.callbacks import EarlyStopping
from keras.engine.topology import Layer
import numpy as np
import json
class AttentionLayer(Layer):
def __init__(self, **kwargs):
super(AttentionLayer, self).__init__(** kwargs)
def build(self, input_shape):
assert len(input_shape)==3
# W.shape = (time_steps, time_steps)
self.W = self.add_weight(name='att_weight',
shape=(input_shape[1], input_shape[1]),
initializer='uniform',
trainable=True)
self.b = self.add_weight(name='att_bias',
shape=(input_shape[1],),
initializer='uniform',
trainable=True)
super(AttentionLayer, self).build(input_shape)
def call(self, inputs):
# inputs.shape = (batch_size, time_steps, seq_len)
x = K.permute_dimensions(inputs, (0, 2, 1))
# x.shape = (batch_size, seq_len, time_steps)
a = K.softmax(K.tanh(K.dot(x, self.W) + self.b))
outputs = K.permute_dimensions(a * x, (0, 2, 1))
outputs = K.sum(outputs, axis=1)
return outputs
def compute_output_shape(self, input_shape):
return input_shape[0], input_shape[2]
# 读取数据
target_file = 'desc.json'
fr = open(target_file, 'r', encoding='utf8')
desc_dict = json.load(fr)
desc = []
ids = []
label = []
for key, value in desc_dict.items():
desc.append(value)
label_and_id = key.split('_')
label.append(int(label_and_id[0]))
ids.append(label_and_id[1])
max_len = 100
max_words = 10000
# 构建单词索引
tokenizer = Tokenizer(num_words = max_words)
tokenizer.fit_on_texts(desc)
word_index = tokenizer.word_index
# 将文本转化为单词索引
sequences = tokenizer.texts_to_sequences(desc)
data = pad_sequences(sequences, maxlen = max_len)
label = np.asarray(label)
train_num = 4000
val_num = 500
test_num = 500
x_train = data[:train_num]
y_train = label[:train_num]
x_val = data[train_num : train_num + val_num]
y_val = label[train_num : train_num + val_num]
x_test = data[train_num + val_num : train_num + val_num + test_num]
y_test = label[train_num + val_num : train_num + val_num + test_num]
# 构建模型
glove_dir = 'pre_data/glove.6B.100d.txt'
embedding_index = {}
f = open(glove_dir, 'r', encoding='utf8')
for line in f:
values = line.split(' ')
word = values[0]
vector = np.asarray(values[1:], dtype = 'float32')
embedding_index[word] = vector
f.close()
# 构建一个可以加载到Embedding层的嵌入矩阵
embedding_dim = 100
embedding_matrix = np.zeros((max_words, embedding_dim))
for word, i in word_index.items():
if i < max_words:
embedding_vector = embedding_index.get(word)
if embedding_vector is not None:
embedding_matrix[i] = embedding_vector
text_input = Input(shape=(None,), dtype='int32')
embedded_text = layers.Embedding(max_words, embedding_dim, input_length = max_len)(text_input)
encoded_text = layers.LSTM(256, dropout = 0.5, recurrent_dropout = 0.5, return_sequences=True)(embedded_text)
att_dense = layers.TimeDistributed(layers.Dense(256))(encoded_text)
att = AttentionLayer()(att_dense)
# kernel_regularizer = regularizers.l2(0.01)
# flatten_w = layers.Flatten()(flatten_w)
dense_1 = layers.Dense(1024, activation='relu', kernel_regularizer = regularizers.l2(0.01))(att)
drop_1 = layers.Dropout(0.5)(dense_1)
dense_2 = layers.Dense(512, activation='relu', kernel_regularizer = regularizers.l2(0.01))(drop_1)
drop_2 = layers.Dropout(0.5)(dense_2)
dense_3 = layers.Dense(256, activation='relu', kernel_regularizer = regularizers.l2(0.01))(drop_2)
drop_3 = layers.Dropout(0.5)(dense_3)
output_layer = layers.Dense(1, activation='sigmoid')(drop_3)
model = Model(inputs = text_input, outputs = output_layer)
model.summary()
# # 问题:收敛的很慢 最高只能到0.75左右
# model.layers[1].set_weights([embedding_matrix])
# model.layers[1].trainable = False
# model.compile(optimizer='rmsprop', loss='binary_crossentropy', metrics=['accuracy'])
# # 添加EarlyStopping
# my_callbacks = [EarlyStopping(monitor='val_loss', min_delta=0, patience=10, verbose=0, mode='auto')]
# model.fit(x_train, y_train, epochs = 100, batch_size = 128, callbacks=my_callbacks, validation_data=(x_val, y_val))
# # model.fit(x_train, y_train, epochs = 150, batch_size = 128, validation_data=(x_val, y_val))
# test_loss, test_acc = model.evaluate(x_test, y_test)
# print('test_loss:', test_loss)
# print('test_acc:', test_acc)