forked from danstowell/autoencoder-specgram
-
Notifications
You must be signed in to change notification settings - Fork 1
/
autoencoder-specgram.py
238 lines (204 loc) · 10.7 KB
/
autoencoder-specgram.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# Spectrogram auto-encoder
# Dan Stowell 2016.
#
# Unusual things about this implementation:
# * Data is not pre-whitened, instead we use a custom layer (NormalisationLayer) to normalise the mean-and-variance of the data for us. This is because I want the spectrogram to be normalised when it is input but not normalised when it is output.
# * It's a convolutional net but only along the time axis; along the frequency axis it's fully-connected.
import numpy as np
import theano
import theano.tensor as T
import lasagne
#import downhill
from lasagne.nonlinearities import rectify, leaky_rectify, very_leaky_rectify
from numpy import float32
try:
from lasagne.layers import InverseLayer as _
use_maxpool = True
except ImportError:
print("""**********************
WARNING: InverseLayer not found in Lasagne. Please use a more recent version of Lasagne.
WARNING: We'll deactivate the maxpooling part of the network (since we can't use InverseLayer to undo it)""")
use_maxpool = False
import matplotlib
#matplotlib.use('PDF') # http://www.astrobetter.com/plotting-to-a-file-in-python/
import matplotlib.pyplot as plt
import matplotlib.cm as cm
from matplotlib.backends.backend_pdf import PdfPages
plt.rcParams.update({'font.size': 6})
from userconfig import *
import util
from layers_custom import *
###################################################################################################################
# create Theano variables for input minibatch
input_var = T.tensor4('X')
# note that in general, the main data tensors will have these axes:
# - minibatchsize
# - numchannels (always 1 for us, since spectrograms)
# - numfilts (or specbinnum for input)
# - numtimebins
if example_is_audio:
# load our example audio file as a specgram
foregroundgram = util.standard_specgram((util.load_soundfile(foregroundwavpath, 0)))
backgroundgram = util.standard_specgram((util.load_soundfile(backgroundwavpath, 0)))
print("foregroundgram is of shape %s" % str(np.shape(foregroundgram)))
print("backgroundgram is of shape %s" % str(np.shape(backgroundgram)))
###################################################################################################################
# here we define our "semi-convolutional" autoencoder
# NOTE: lasagne assumes pooling is on the TRAILING axis of the tensor, so we always use time as the trailing axis
def make_custom_convlayer(network, in_num_chans, out_num_chans):
"Applies our special padding and reshaping to do 1D convolution on 2D data"
network = lasagne.layers.PadLayer(network, width=(featframe_len-1)/2, batch_ndim=3) # NOTE: the "batch_ndim" is used to stop batch dims being padded, but here ALSO to skip first data dim
print("shape after pad layer: %s" % str(network.output_shape))
network = lasagne.layers.Conv2DLayer(network, out_num_chans, (in_num_chans, featframe_len), stride=(1,1), pad=0, nonlinearity=very_leaky_rectify, W=lasagne.init.Orthogonal()) # we pad "manually" in order to do it in one dimension only
filters = network.W
network = lasagne.layers.ReshapeLayer(network, ([0], [2], [1], [3])) # reinterpret channels as rows
print("shape after conv layer: %s" % str(network.output_shape))
return network, filters
network = lasagne.layers.InputLayer((None, 1, specbinnum, numtimebins), input_var)
print("shape after input layer: %s" % str(network.output_shape))
#
# normalisation layer
# -- note that we deliberately normalise the input but do not undo that at the output.
# -- note that the normalisation params are not set by the training procedure, they need to be set before training begins.
network = NormalisationLayer(network, specbinnum)
normlayer = network # we need to remember this one so we can set its parameters
#
network, filters_enc = make_custom_convlayer(network, in_num_chans=specbinnum, out_num_chans=numfilters)
#
# NOTE: here we're using max-pooling, along the time axis only, and then
# using Lasagne's "InverseLayer" to undo the maxpooling in one-hot fashion.
# There's a side-effect of this: if you use *overlapping* maxpooling windows,
# the InverseLayer may behave slightly unexpectedly, adding some points with
# double magnitude. It's OK here since we're not overlapping the windows
if use_maxpool:
network = lasagne.layers.MaxPool2DLayer(network, pool_size=(1,2), stride=(1,2))
maxpool_layer = network # store a pointer to this one
# NOTE: HERE is the "middle" of the autoencoder!
latents = network # we remember the "latents" at the midpoint of the net, since we'll want to inspect them, and maybe regularise them too
if use_maxpool:
network = lasagne.layers.InverseLayer(network, maxpool_layer)
network, filters_dec = make_custom_convlayer(network, in_num_chans=numfilters, out_num_chans=specbinnum)
network = lasagne.layers.NonlinearityLayer(network, nonlinearity=rectify) # finally a standard rectify since nonneg (specgram) target
###################################################################################################################
# define simple L2 loss function with a mild touch of regularisation
prediction = lasagne.layers.get_output(network)
loss = lasagne.objectives.squared_error(prediction, input_var)
# loss = loss.mean() + 1e-4 * lasagne.regularization.regularize_network_params(network, lasagne.regularization.l2)
lamb = 0.75
# loss =
###################################################################################################################
plot_probedata_data = None
def plot_probedata(outpostfix, plottitle=None):
"""Visualises the network behaviour.
NOTE: currently accesses globals. Should really be passed in the network, filters etc"""
global plot_probedata_data
if plottitle==None:
plottitle = outpostfix
if np.shape(plot_probedata_data)==():
if example_is_audio:
plot_probedata_data = np.array([[foregroundgram[:, examplegram_startindex:examplegram_startindex+numtimebins]]], float32)
else:
plot_probedata_data = np.zeros((minibatchsize, 1, specbinnum, numtimebins), dtype=float32)
for _ in range(5):
plot_probedata_data[:, :, np.random.randint(specbinnum), np.random.randint(numtimebins)] = 1
test_prediction = lasagne.layers.get_output(network, deterministic=True)
test_latents = lasagne.layers.get_output(latents, deterministic=True)
predict_fn = theano.function([input_var], test_prediction)
latents_fn = theano.function([input_var], test_latents)
prediction = predict_fn(plot_probedata_data)
latentsval = latents_fn(plot_probedata_data)
if False:
print("Probedata has shape %s and meanabs %g" % ( plot_probedata_data.shape, np.mean(np.abs(plot_probedata_data ))))
print("Latents has shape %s and meanabs %g" % (latentsval.shape, np.mean(np.abs(latentsval))))
print("Prediction has shape %s and meanabs %g" % (prediction.shape, np.mean(np.abs(prediction))))
print("Ratio %g" % (np.mean(np.abs(prediction)) / np.mean(np.abs(plot_probedata_data))))
util.mkdir_p('pdf')
pdf = PdfPages('pdf/autoenc_probe_%s.pdf' % outpostfix)
plt.figure(frameon=False)
#
plt.subplot(3, 1, 1)
plotdata = plot_probedata_data[0,0,:,:]
plt.imshow(plotdata, origin='lower', interpolation='nearest', cmap='RdBu', aspect='auto', vmin=-np.max(np.abs(plotdata)), vmax=np.max(np.abs(plotdata)))
plt.ylabel('Input')
plt.title("%s" % (plottitle))
#
plt.subplot(3, 1, 2)
plotdata = latentsval[0,0,:,:]
plt.imshow(plotdata, origin='lower', interpolation='nearest', cmap='RdBu', aspect='auto', vmin=-np.max(np.abs(plotdata)), vmax=np.max(np.abs(plotdata)))
plt.ylabel('Latents')
#
plt.subplot(3, 1, 3)
plotdata = prediction[0,0,:,:]
plt.imshow(plotdata, origin='lower', interpolation='nearest', cmap='RdBu', aspect='auto', vmin=-np.max(np.abs(plotdata)), vmax=np.max(np.abs(plotdata)))
plt.ylabel('Output')
#
pdf.savefig()
plt.close()
##
for filtvar, filtlbl, isenc in [
(filters_enc, 'encoding', True),
(filters_dec, 'decoding', False),
]:
plt.figure(frameon=False)
vals = filtvar.get_value()
#print(" %s filters have shape %s" % (filtlbl, vals.shape))
vlim = np.max(np.abs(vals))
for whichfilt in range(numfilters):
plt.subplot(3, 8, whichfilt+1)
# NOTE: for encoding/decoding filters, we grab the "slice" of interest from the tensor in different ways: different axes, and flipped.
if isenc:
plotdata = vals[numfilters-(1+whichfilt),0,::-1,::-1]
else:
plotdata = vals[:,0,whichfilt,:]
plt.imshow(plotdata, origin='lower', interpolation='nearest', cmap='RdBu', aspect='auto', vmin=-vlim, vmax=vlim)
plt.xticks([])
if whichfilt==0:
plt.title("%s filters (%s)" % (filtlbl, outpostfix))
else:
plt.yticks([])
pdf.savefig()
plt.close()
##
pdf.close()
plot_probedata('init')
###################################################################################################################
if True:
###################################
# here we set up some training data. this is ALL A BIT SIMPLE - for a proper experiment we'd prepare a full dataset, and it might be too big to be all in memory.
training_data_size=100
training_data = np.zeros((training_data_size, minibatchsize, 1, specbinnum, numtimebins), dtype=float32)
if example_is_audio:
# manually grab a load of random subsets of the training audio
training_data_size=100
for which_training_batch in range(training_data_size):
for which_training_datum in range(minibatchsize):
startindex = np.random.randint(foregroundgram.shape[1]-numtimebins)
training_data[which_training_batch, which_training_datum, :, :, :] = foregroundgram[:, startindex:startindex+numtimebins]
else:
# make some simple (sparse) data that we can train with
for which_training_batch in range(training_data_size):
for which_training_datum in range(minibatchsize):
for _ in range(5):
training_data[which_training_batch, which_training_datum, :, np.random.randint(specbinnum), np.random.randint(numtimebins)] = 1
###################################
# pre-training setup
# set the normalisation parameters manually as an estimate from the training data
normlayer.set_normalisation(training_data)
###################################
# training
# compile training function that updates parameters and returns training loss
params = lasagne.layers.get_all_params(network, trainable=True)
updates = lasagne.updates.nesterov_momentum(loss, params, learning_rate=0.01, momentum=0.9)
train_fn = theano.function([input_var], loss, updates=updates)
# train network
numepochs = 2048 # 3 # 100 # 5000
for epoch in range(numepochs):
loss = 0
for input_batch in training_data:
loss += train_fn(input_batch)
if epoch==0 or epoch==numepochs-1 or (2 ** int(np.log2(epoch)) == epoch):
lossreadout = loss / len(training_data)
infostring = "Epoch %d/%d: Loss %g" % (epoch, numepochs, lossreadout)
print(infostring)
plot_probedata('progress', plottitle="progress (%s)" % infostring)
plot_probedata('trained', plottitle="trained (%d epochs; Loss %g)" % (numepochs, lossreadout))