Skip to content

Commit

Permalink
[fix] Now demos of DE Keras Embedding are able to run normally.
Browse files Browse the repository at this point in the history
  • Loading branch information
MoFHeka authored and rhdong committed Dec 18, 2023
1 parent 4aecc4c commit 7a6bce1
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
except:
from tensorflow.keras.optimizers import Adam

import horovod.tensorflow as hvd

os.environ["TF_FORCE_GPU_ALLOW_GROWTH"] = "true" #VERY IMPORTANT!

os.environ["TF_GPU_THREAD_MODE"] = "gpu_private"
Expand All @@ -29,7 +31,6 @@

# optimal performance
os.environ['TF_XLA_FLAGS'] = '--tf_xla_auto_jit=2 --tf_xla_cpu_global_jit'
tf.config.experimental.set_synchronous_execution(False)

flags.DEFINE_string('mode', 'train', 'Select the running mode: train or test.')
flags.DEFINE_string('model_dir', 'model_dir',
Expand Down Expand Up @@ -181,7 +182,7 @@ def embedding_out_split(embedding_out_concat, input_split_dims):
return embedding_out


class ChannelEmbeddingLayers():
class ChannelEmbeddingLayers(tf.keras.layers.Layer):

def __init__(self,
name='',
Expand All @@ -191,6 +192,8 @@ def __init__(self,
mpi_size=1,
mpi_rank=0):

super(ChannelEmbeddingLayers, self).__init__()

self.gpu_device = ["GPU:0"]
self.cpu_device = ["CPU:0"]

Expand Down Expand Up @@ -227,6 +230,9 @@ def __init__(self,
kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))

def build(self, input_shape):
super(ChannelEmbeddingLayers, self).build(input_shape)

def __call__(self, features_info):
dense_inputs = []
dense_input_dims = []
Expand Down Expand Up @@ -301,14 +307,14 @@ def __init__(self,
self.user_embedding = ChannelEmbeddingLayers(
name='user',
dense_embedding_size=user_embedding_size,
user_embedding_size=user_embedding_size * 2,
sparse_embedding_size=user_embedding_size * 2,
embedding_initializer=embedding_initializer,
mpi_size=mpi_size,
mpi_rank=mpi_rank)
self.movie_embedding = ChannelEmbeddingLayers(
name='movie',
dense_embedding_size=movie_embedding_size,
user_embedding_size=movie_embedding_size * 2,
sparse_embedding_size=movie_embedding_size * 2,
embedding_initializer=embedding_initializer,
mpi_size=mpi_size,
mpi_rank=mpi_rank)
Expand Down Expand Up @@ -339,13 +345,14 @@ def call(self, features):
# Construct input layers
for fea_name in features.keys():
fea_info = feature_info_spec[fea_name]
input_tensor = tf.keras.layers.Input(shape=(fea_info['dim'],),
dtype=fea_info['dtype'],
name=fea_name)
input_tensor = features[fea_name]
input_tensor = tf.keras.layers.Lambda(lambda x: x,
name=fea_name)(input_tensor)
input_tensor = tf.reshape(input_tensor, (-1, fea_info['dim']))
fea_info['input_tensor'] = input_tensor
if fea_info.__contains__('boundaries'):
input_tensor = tf.raw_ops.Bucketize(input=input_tensor,
boundaries=fea_info['boundaries'])
input_tensor = Bucketize(
boundaries=fea_info['boundaries'])(input_tensor)
# To prepare for GPU table combined queries, use a prefix to distinguish different features in a table.
if fea_info['ptype'] == 'normal_gpu':
if fea_info['dtype'] == tf.int64:
Expand All @@ -361,13 +368,15 @@ def call(self, features):
fea_info['pretreated_tensor'] = input_tensor

user_fea = ['user_id', 'user_gender', 'user_occupation_label']
user_fea = [i for i in features.keys() if i in user_fea]
user_fea_info = {
key: value
for key, value in feature_info_spec.items()
if key in user_fea
}
user_latent = self.user_embedding(user_fea_info)
movie_fea = ['movie_id', 'movie_genres', 'user_occupation_label']
movie_fea = [i for i in features.keys() if i in movie_fea]
movie_fea_info = {
key: value
for key, value in feature_info_spec.items()
Expand All @@ -382,7 +391,8 @@ def call(self, features):

bias = self.bias_net(latent)
x = 0.2 * x + 0.8 * bias
return x
user_rating = tf.keras.layers.Lambda(lambda x: x, name='user_rating')(x)
return {'user_rating': user_rating}


def get_dataset(batch_size=1):
Expand All @@ -408,7 +418,10 @@ def get_dataset(batch_size=1):
tf.cast(x["timestamp"] - 880000000, tf.int32),
})

ratings = ds.map(lambda x: {"user_rating": x["user_rating"]})
ratings = ds.map(lambda x: {
"user_rating":
tf.one_hot(tf.cast(x["user_rating"] - 1, dtype=tf.int64), 5)
})
dataset = tf.data.Dataset.zip((features, ratings))
shuffled = dataset.shuffle(1_000_000,
seed=2021,
Expand Down Expand Up @@ -551,8 +564,8 @@ def train():
auc,
])

if os.path.exists(FLAGS.model_dir):
model.load_weights(FLAGS.model_dir)
if os.path.exists(FLAGS.model_dir + '/variables'):
model.load_weights(FLAGS.model_dir + '/variables/variables')

tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=FLAGS.model_dir)
save_options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA'])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ def train():
auc,
])

if os.path.exists(FLAGS.model_dir):
model.load_weights(FLAGS.model_dir)
if os.path.exists(FLAGS.model_dir + '/variables'):
model.load_weights(FLAGS.model_dir + '/variables/variables')

model.fit(dataset, epochs=FLAGS.epochs, steps_per_epoch=FLAGS.steps_per_epoch)

Expand Down

0 comments on commit 7a6bce1

Please sign in to comment.