From 7a6bce132e9e928d86e68e8f22f0b7b68ddf461e Mon Sep 17 00:00:00 2001 From: MoFHeka Date: Thu, 14 Dec 2023 10:52:18 +0800 Subject: [PATCH] [fix] Now demos of DE Keras Embedding are able to run normally. --- .../movielens-1m-keras-with-horovod.py | 39 ++++++++++++------- .../movielens-1m-keras/movielens-1m-keras.py | 4 +- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py b/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py index c16ab35a3..b1918ef7a 100644 --- a/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py +++ b/demo/dynamic_embedding/movielens-1m-keras-with-horovod/movielens-1m-keras-with-horovod.py @@ -11,6 +11,8 @@ except: from tensorflow.keras.optimizers import Adam +import horovod.tensorflow as hvd + os.environ["TF_FORCE_GPU_ALLOW_GROWTH"] = "true" #VERY IMPORTANT! os.environ["TF_GPU_THREAD_MODE"] = "gpu_private" @@ -29,7 +31,6 @@ # optimal performance os.environ['TF_XLA_FLAGS'] = '--tf_xla_auto_jit=2 --tf_xla_cpu_global_jit' -tf.config.experimental.set_synchronous_execution(False) flags.DEFINE_string('mode', 'train', 'Select the running mode: train or test.') flags.DEFINE_string('model_dir', 'model_dir', @@ -181,7 +182,7 @@ def embedding_out_split(embedding_out_concat, input_split_dims): return embedding_out -class ChannelEmbeddingLayers(): +class ChannelEmbeddingLayers(tf.keras.layers.Layer): def __init__(self, name='', @@ -191,6 +192,8 @@ def __init__(self, mpi_size=1, mpi_rank=0): + super(ChannelEmbeddingLayers, self).__init__() + self.gpu_device = ["GPU:0"] self.cpu_device = ["CPU:0"] @@ -227,6 +230,9 @@ def __init__(self, kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1), bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1)) + def build(self, input_shape): + super(ChannelEmbeddingLayers, self).build(input_shape) + def __call__(self, features_info): dense_inputs = [] dense_input_dims = [] @@ -301,14 +307,14 @@ def __init__(self, self.user_embedding = ChannelEmbeddingLayers( name='user', dense_embedding_size=user_embedding_size, - user_embedding_size=user_embedding_size * 2, + sparse_embedding_size=user_embedding_size * 2, embedding_initializer=embedding_initializer, mpi_size=mpi_size, mpi_rank=mpi_rank) self.movie_embedding = ChannelEmbeddingLayers( name='movie', dense_embedding_size=movie_embedding_size, - user_embedding_size=movie_embedding_size * 2, + sparse_embedding_size=movie_embedding_size * 2, embedding_initializer=embedding_initializer, mpi_size=mpi_size, mpi_rank=mpi_rank) @@ -339,13 +345,14 @@ def call(self, features): # Construct input layers for fea_name in features.keys(): fea_info = feature_info_spec[fea_name] - input_tensor = tf.keras.layers.Input(shape=(fea_info['dim'],), - dtype=fea_info['dtype'], - name=fea_name) + input_tensor = features[fea_name] + input_tensor = tf.keras.layers.Lambda(lambda x: x, + name=fea_name)(input_tensor) + input_tensor = tf.reshape(input_tensor, (-1, fea_info['dim'])) fea_info['input_tensor'] = input_tensor if fea_info.__contains__('boundaries'): - input_tensor = tf.raw_ops.Bucketize(input=input_tensor, - boundaries=fea_info['boundaries']) + input_tensor = Bucketize( + boundaries=fea_info['boundaries'])(input_tensor) # To prepare for GPU table combined queries, use a prefix to distinguish different features in a table. if fea_info['ptype'] == 'normal_gpu': if fea_info['dtype'] == tf.int64: @@ -361,6 +368,7 @@ def call(self, features): fea_info['pretreated_tensor'] = input_tensor user_fea = ['user_id', 'user_gender', 'user_occupation_label'] + user_fea = [i for i in features.keys() if i in user_fea] user_fea_info = { key: value for key, value in feature_info_spec.items() @@ -368,6 +376,7 @@ def call(self, features): } user_latent = self.user_embedding(user_fea_info) movie_fea = ['movie_id', 'movie_genres', 'user_occupation_label'] + movie_fea = [i for i in features.keys() if i in movie_fea] movie_fea_info = { key: value for key, value in feature_info_spec.items() @@ -382,7 +391,8 @@ def call(self, features): bias = self.bias_net(latent) x = 0.2 * x + 0.8 * bias - return x + user_rating = tf.keras.layers.Lambda(lambda x: x, name='user_rating')(x) + return {'user_rating': user_rating} def get_dataset(batch_size=1): @@ -408,7 +418,10 @@ def get_dataset(batch_size=1): tf.cast(x["timestamp"] - 880000000, tf.int32), }) - ratings = ds.map(lambda x: {"user_rating": x["user_rating"]}) + ratings = ds.map(lambda x: { + "user_rating": + tf.one_hot(tf.cast(x["user_rating"] - 1, dtype=tf.int64), 5) + }) dataset = tf.data.Dataset.zip((features, ratings)) shuffled = dataset.shuffle(1_000_000, seed=2021, @@ -551,8 +564,8 @@ def train(): auc, ]) - if os.path.exists(FLAGS.model_dir): - model.load_weights(FLAGS.model_dir) + if os.path.exists(FLAGS.model_dir + '/variables'): + model.load_weights(FLAGS.model_dir + '/variables/variables') tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=FLAGS.model_dir) save_options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA']) diff --git a/demo/dynamic_embedding/movielens-1m-keras/movielens-1m-keras.py b/demo/dynamic_embedding/movielens-1m-keras/movielens-1m-keras.py index 91b4b66d7..5c32745c4 100644 --- a/demo/dynamic_embedding/movielens-1m-keras/movielens-1m-keras.py +++ b/demo/dynamic_embedding/movielens-1m-keras/movielens-1m-keras.py @@ -129,8 +129,8 @@ def train(): auc, ]) - if os.path.exists(FLAGS.model_dir): - model.load_weights(FLAGS.model_dir) + if os.path.exists(FLAGS.model_dir + '/variables'): + model.load_weights(FLAGS.model_dir + '/variables/variables') model.fit(dataset, epochs=FLAGS.epochs, steps_per_epoch=FLAGS.steps_per_epoch)