forked from gy910210/rnn-from-scratch
-
Notifications
You must be signed in to change notification settings - Fork 3
/
rnn.py
111 lines (101 loc) · 4.27 KB
/
rnn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from datetime import datetime
import numpy as np
import sys
from layer import RNNLayer
from output import Softmax
class Model:
def __init__(self, word_dim, hidden_dim=100, bptt_truncate=4):
self.word_dim = word_dim
self.hidden_dim = hidden_dim
self.bptt_truncate = bptt_truncate
self.U = np.random.uniform(-np.sqrt(1. / word_dim), np.sqrt(1. / word_dim), (hidden_dim, word_dim))
self.W = np.random.uniform(-np.sqrt(1. / hidden_dim), np.sqrt(1. / hidden_dim), (hidden_dim, hidden_dim))
self.V = np.random.uniform(-np.sqrt(1. / hidden_dim), np.sqrt(1. / hidden_dim), (word_dim, hidden_dim))
'''
forward propagation (predicting word probabilities)
x is one single data, and a batch of data
for example x = [0, 179, 341, 416], then its y = [179, 341, 416, 1]
'''
def forward_propagation(self, x):
# The total number of time steps
T = len(x)
layers = []
prev_s = np.zeros(self.hidden_dim)
# For each time step...
for t in range(T):
layer = RNNLayer()
input = np.zeros(self.word_dim)
input[x[t]] = 1
layer.forward(input, prev_s, self.U, self.W, self.V)
prev_s = layer.s
layers.append(layer)
return layers
def predict(self, x):
output = Softmax()
layers = self.forward_propagation(x)
return [np.argmax(output.predict(layer.mulv)) for layer in layers]
def calculate_loss(self, x, y):
assert len(x) == len(y)
output = Softmax()
layers = self.forward_propagation(x)
loss = 0.0
for i, layer in enumerate(layers):
loss += output.loss(layer.mulv, y[i])
return loss / float(len(y))
def calculate_total_loss(self, X, Y):
loss = 0.0
for i in range(len(Y)):
loss += self.calculate_loss(X[i], Y[i])
return loss / float(len(Y))
def bptt(self, x, y):
assert len(x) == len(y)
output = Softmax()
layers = self.forward_propagation(x)
dU = np.zeros(self.U.shape)
dV = np.zeros(self.V.shape)
dW = np.zeros(self.W.shape)
T = len(layers)
prev_s_t = np.zeros(self.hidden_dim)
diff_s = np.zeros(self.hidden_dim)
for t in range(0, T):
dmulv = output.diff(layers[t].mulv, y[t])
input = np.zeros(self.word_dim)
input[x[t]] = 1
dprev_s, dU_t, dW_t, dV_t = layers[t].backward(input, prev_s_t, self.U, self.W, self.V, diff_s, dmulv)
prev_s_t = layers[t].s
dmulv = np.zeros(self.word_dim)
for i in range(t-1, max(-1, t-self.bptt_truncate-1), -1):
input = np.zeros(self.word_dim)
input[x[i]] = 1
prev_s_i = np.zeros(self.hidden_dim) if i == 0 else layers[i-1].s
dprev_s, dU_i, dW_i, dV_i = layers[i].backward(input, prev_s_i, self.U, self.W, self.V, dprev_s, dmulv)
dU_t += dU_i
dW_t += dW_i
dV += dV_t
dU += dU_t
dW += dW_t
return (dU, dW, dV)
def sgd_step(self, x, y, learning_rate):
dU, dW, dV = self.bptt(x, y)
self.U -= learning_rate * dU
self.V -= learning_rate * dV
self.W -= learning_rate * dW
def train(self, X, Y, learning_rate=0.005, nepoch=100, evaluate_loss_after=5):
num_examples_seen = 0
losses = []
for epoch in range(nepoch):
if (epoch % evaluate_loss_after == 0):
loss = self.calculate_total_loss(X, Y)
losses.append((num_examples_seen, loss))
time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print("%s: Loss after num_examples_seen=%d epoch=%d: %f" % (time, num_examples_seen, epoch, loss))
# Adjust the learning rate if loss increases
if len(losses) > 1 and losses[-1][1] > losses[-2][1]:
learning_rate = learning_rate * 0.5
print("Setting learning rate to %f" % learning_rate)
sys.stdout.flush()
# For each training example...
for i in range(len(Y)):
self.sgd_step(X[i], Y[i], learning_rate)
num_examples_seen += 1
return losses