import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import pickle
dataset_train = pickle.load(open("dataset_train.p", "rb"))
dataset_test = pickle.load(open("dataset_test.p", "rb"))
class Sampling(layers.Layer):
def call(self, inputs):
z_mean, z_log_var = inputs
batch = tf.shape(z_mean)[0]
dim = tf.shape(z_mean)[1]
epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
return z_mean + tf.exp(0.5 * z_log_var) * epsilon
latent_dim = 2
encoder_inputs = keras.Input(shape=(128, 16))
x = layers.Conv1D(16, 3, activation="relu", padding="causal")(encoder_inputs)
x = layers.Flatten()(x)
z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])
encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
encoder.summary()
latent_inputs = keras.Input(shape=(latent_dim,))
x = layers.Dense(2048, activation="relu")(latent_inputs)
x = layers.Reshape((128, 16))(x)
decoder_outputs = layers.Conv1DTranspose(16, 3, activation="sigmoid", padding="same")(x)
decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
decoder.summary()
class VAE(keras.Model):
def __init__(self, encoder, decoder, **kwargs):
super(VAE, self).__init__(**kwargs)
self.encoder = encoder
self.decoder = decoder
self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
self.reconstruction_loss_tracker = keras.metrics.Mean(
name="reconstruction_loss"
)
self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")
self.f_loss_trackers = {
"recycling": keras.metrics.Mean(name = "recycling_loss"),
"mobility": keras.metrics.Mean(name = "mobility_loss"),
"diet": keras.metrics.Mean(name = "diet_loss"),
"co2": keras.metrics.Mean(name = "co2_loss")
}
@property
def metrics(self):
return [
self.total_loss_tracker,
self.reconstruction_loss_tracker,
self.kl_loss_tracker,
self.f_loss_trackers["recycling"],
self.f_loss_trackers["mobility"],
self.f_loss_trackers["diet"],
self.f_loss_trackers["co2"]
]
def train_step(self, data):
with tf.GradientTape() as tape:
z_mean, z_log_var, z = self.encoder(data)
reconstruction = self.decoder(z)
# Binary cross-entropy loss for recycling preferences
recycling_loss = tf.reduce_mean(
keras.losses.binary_crossentropy(
data[:,:,0:5],
reconstruction[:,:,0:5]
),
axis = 1
)
# MSE loss for mobility
mobility_loss = tf.reduce_mean(
keras.losses.mean_squared_error(data[:,:,5:9], reconstruction[:,:,5:9]),
axis = 1
)
# Categorical cross-entropy loss for Co2 votes
co2_loss = tf.reduce_mean(
keras.losses.categorical_crossentropy(
data[:,:,9:13],
reconstruction[:,:,9:13]
),
axis = 1
)
# MSE loss for diet preferences
mse = keras.losses.mean_squared_error(data[:,:,13], reconstruction[:,:,13])
diet_loss = tf.reduce_mean(
tf.reduce_sum(
mse
)
)
reconstruction_loss = recycling_loss + diet_loss + mobility_loss + co2_loss
kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
total_loss = reconstruction_loss + kl_loss
grads = tape.gradient(total_loss, self.trainable_weights)
self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
self.total_loss_tracker.update_state(total_loss)
self.reconstruction_loss_tracker.update_state(reconstruction_loss)
self.kl_loss_tracker.update_state(kl_loss)
# Updates the loss trackers
self.f_loss_trackers["recycling"].update_state(recycling_loss)
self.f_loss_trackers["mobility"].update_state(mobility_loss)
self.f_loss_trackers["diet"].update_state(diet_loss)
self.f_loss_trackers["co2"].update_state(diet_loss)
return {
"loss": self.total_loss_tracker.result(),
"f_recycling_loss": self.f_loss_trackers["recycling"].result(),
"f_mobility_loss": self.f_loss_trackers["mobility"].result(),
"f_diet_loss": self.f_loss_trackers["diet"].result(),
"f_co2_loss": self.f_loss_trackers["co2"].result(),
"reconstruction_loss": self.reconstruction_loss_tracker.result(),
"kl_loss": self.kl_loss_tracker.result(),
}
def test_step(self, data):
if isinstance(data, tuple):
data = data[0]
z_mean, z_log_var, z = self.encoder(data)
reconstruction = self.decoder(z)
# Binary cross-entropy loss for recycling preferences
recycling_loss = tf.reduce_mean(
keras.losses.binary_crossentropy(
data[:,:,0:5],
reconstruction[:,:,0:5]
),
axis = 1
)
# MSE loss for mobility
mobility_loss = tf.reduce_mean(
keras.losses.mean_squared_error(data[:,:,5:9], reconstruction[:,:,5:9]),
axis = 1
)
# Categorical cross-entropy loss for Co2 votes
co2_loss = tf.reduce_mean(
keras.losses.categorical_crossentropy(
data[:,:,9:13],
reconstruction[:,:,9:13]
),
axis = 1
)
# MSE loss for diet preferences
mse = keras.losses.mean_squared_error(data[:,:,13], reconstruction[:,:,13])
diet_loss = tf.reduce_mean(
tf.reduce_sum(
mse
)
)
reconstruction_loss = tf.reduce_mean(recycling_loss + diet_loss + mobility_loss + co2_loss)
kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
total_loss = reconstruction_loss + kl_loss
return {
"reconstruction_loss": reconstruction_loss,
"kl_loss": kl_loss
}
vae = VAE(encoder, decoder)
csv_logger = keras.callbacks.CSVLogger('training.log')
vae.compile(loss = None, optimizer=keras.optimizers.Adam(learning_rate = 0.0001))
vae.fit(dataset_train, epochs=20, batch_size = 64, callbacks = [csv_logger], validation_data = (dataset_train, dataset_train))
encoder.save("encoder_v1.pb")
decoder.save("decoder_v1.pb")
import numpy as np
from tensorflow import keras
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import pickle
# Loads the datasets
dataset_train = pickle.load(open("dataset_train.p", "rb"))
dataset_test = pickle.load(open("dataset_test.p", "rb"))
# Loads the encoder
encoder_imported = keras.models.load_model("encoder_v1.pb")
# Generates latent space embeddings of training set
z_mean, _, _ = encoder_imported.predict(dataset_train)
plt.figure(figsize=(10, 7))
plt.scatter(z_mean[:, 0], z_mean[:, 1], c = "blue", label = "Training set")
# Generates latent space embeddings of test set
z_mean, _, _ = encoder_imported.predict(dataset_test)
plt.scatter(z_mean[:, 0], z_mean[:, 1], c = "lightgreen", alpha = 0.8, label = "Test set")
plt.xlabel("Latent dimension 1")
plt.ylabel("Latent dimension 2")
plt.legend()
fname = 'images/latent_space_baseline.png'
plt.tight_layout()
plt.savefig(fname)
fname
import numpy as np
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
from tensorflow import keras
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
encoder_imported = keras.models.load_model("encoder_v1.pb")
z_mean, _, _ = encoder_imported.predict(dataset)
z_embedded = PCA().fit_transform(z_mean)
plt.figure(figsize=(12, 10))
plt.scatter(z_embedded[:, 0], z_embedded[:, 1], c = np.mean(dataset[:,:,13], axis = 1))
fname = 'images/latent_space_tsne.png'
plt.savefig(fname)
fname
from tensorflow.keras.utils import plot_model
plot_model(encoder, to_file='encoder_plot.png', show_shapes=True, show_layer_names=True)
plot_model(decoder, to_file='decoder_plot.png', show_shapes=True, show_layer_names=True)