diff --git a/docs/tensorflow/examples/distributed_training/horovod_mnist_estimator.md b/docs/tensorflow/examples/distributed_training/horovod_mnist_estimator.md new file mode 100644 index 0000000000..2135d79150 --- /dev/null +++ b/docs/tensorflow/examples/distributed_training/horovod_mnist_estimator.md @@ -0,0 +1,91 @@ +#Horovod MNIST Example +We provide an example script `horovod_mnist_estimator.py` which is a Tornasole-enabled Horovod training script +that uses the Estimator interface of TensorFlow. + +This is an example of how you can log a distributed training job with Tornasole. + +## Integrating Tornasole +Below we call out the changes for Tornasole in the above script and describe them + +**Importing TornasoleTF** +``` +import tornasole.tensorflow as ts +``` +**Saving gradients** + +We need to wrap our optimizer with TornasoleOptimizer, and use this optimizer to minimize loss. +This will also enable us to access the gradients during analysis without having to identify which tensors out of the saved ones are the gradients. +``` +opt = TornasoleOptimizer(opt) +``` + + +**Setting save interval** + +You can set different save intervals for different modes. +This can be done by passing a dictionary as save_config to the hook. +This dictionary should have the mode as key and a SaveConfig object as value. +``` +ts.TornasoleHook(..., + save_config=ts.SaveConfig(save_interval=FLAGS.tornasole_frequency), +``` +**Setting the right mode** + +Notice the calls to `hook.set_mode` at various places in the code. +``` +ts_hook.set_mode(ts.modes.TRAIN) +``` + +``` +ts_hook.set_mode(ts.modes.EVAL) +``` +**Passing the hook** + +We need to pass this hook to a monitored session and use this session for running the job. +``` +ts_hook = ts.TornasoleHook(...) +mnist_classifier.train(..., hooks=[...,ts_hook]) +``` + +``` +mnist_classifier.evaluate(..., hooks=[..., ts_hook]) +``` +## Running the example +### Environment +Ensure you are in a python environment which has tornasole_core installed. If you followed the recommended instructions of using Amazon Deep Learning AMI, then you might want to activate the tensorflow_p36 environment as follows. +``` +source activate tensorflow_p36 +``` +### Tornasole Path +We recommend saving tornasole outputs on S3 by passing the +flag `--tornasole_path` in the format `s3://bucket_name/prefix`. +The commands below will be shown with local path however so you can +run them immediately without having to setup S3 permissions. + +### Command + +To run on a machine with 4 GPUs: +``` +horovodrun -np 4 -H localhost:4 python horovod_mnist_estimator.py\ + --tornasole_path ~/ts_output/ts_horovod/logs\ + --steps 5000\ + --tornasole_frequency 100\ + --reductions False + --save_all True +``` + +To run on 4 machines with 4 GPUs each: +``` +horovodrun -np 16 -H server1:4,server2:4,server3:4,server4:4 python horovod_mnist_estimator.py\ + --tornasole_path ~/ts_output/ts_horovod/logs\ + --steps 5000\ + --tornasole_frequency 100\ + --reductions False + --save_all True +``` + +### Analysis +Refer [this page](../../../rules/README.md) for more details on analysis. + +### More +Please refer to [Tornasole Tensorflow page](../../README.md). diff --git a/docs/tensorflow/examples/distributed_training/mirrored_strategy_mnist.md b/docs/tensorflow/examples/distributed_training/mirrored_strategy_mnist.md new file mode 100644 index 0000000000..43b070ac53 --- /dev/null +++ b/docs/tensorflow/examples/distributed_training/mirrored_strategy_mnist.md @@ -0,0 +1,84 @@ +# MirroredStrategy MNIST Example +We provide an example script `mirrored_strategy_mnist.py` which is a Tornasole-enabled TensorFlow training script +that uses the MirroredStrategy to perform distributed training. + +It uses the Estimator interface of TensorFlow. + +This is an example of how you can log a distributed training job with Tornasole. + +## Integrating Tornasole +Below we call out the changes for Tornasole in the above script and describe them + +**Importing TornasoleTF** +``` +import tornasole.tensorflow as ts +``` +**Saving gradients** + +We need to wrap our optimizer with TornasoleOptimizer, and use this optimizer to minimize loss. +This will also enable us to access the gradients during analysis without having to identify which tensors out of the saved ones are the gradients. +``` +optimizer = ts.TornasoleOptimizer(optimizer) +``` + + +**Setting save interval** + +You can set different save intervals for different modes. +This can be done by passing a dictionary as save_config to the hook. +This dictionary should have the mode as key and a SaveConfig object as value. +``` +ts.TornasoleHook(..., + save_config=ts.SaveConfig(save_interval=FLAGS.tornasole_frequency), +``` +**Setting the right mode** + +Notice the calls to `hook.set_mode` at various places in the code. +``` +hook.set_mode(ts.modes.TRAIN) +``` + +``` +hook.set_mode(ts.modes.EVAL) +``` +**Passing the hook** + +We need to pass this hook to a monitored session and use this session for running the job. +``` +ts_hook = ts.TornasoleHook(...) +mnist_classifier.train(..., hooks=[ts_hook]) +``` + +``` +mnist_classifier.evaluate(..., hooks=[ts_hook]) +``` +## Running the example +### Environment +Ensure you are in a python environment which has tornasole_core installed. If you followed the recommended instructions of using Amazon Deep Learning AMI, then you might want to activate the tensorflow_p36 environment as follows. +``` +source activate tensorflow_p36 +``` +### Tornasole Path +We recommend saving tornasole outputs on S3 by passing the +flag `--tornasole_path` in the format `s3://bucket_name/prefix`. +The commands below will be shown with local path however so you can +run them immediately without having to setup S3 permissions. + +### Command + +To run on a machine with GPUs: +``` +python mirrored_strategy_mnist.py \ +--tornasole_path ~/ts_outputs/mirrored_strategy_mnist \ + --steps 5000\ + --tornasole_frequency 100\ + --reductions False + --save_all True + +``` + +### Analysis +Refer [this page](../../../rules/README.md) for more details on analysis. + +### More +Please refer to [Tornasole Tensorflow page](../../README.md). diff --git a/docs/tensorflow/examples/distributed_training/parameter_server_training/parameter_server_mnist.md b/docs/tensorflow/examples/distributed_training/parameter_server_training/parameter_server_mnist.md new file mode 100644 index 0000000000..cd889fdaf0 --- /dev/null +++ b/docs/tensorflow/examples/distributed_training/parameter_server_training/parameter_server_mnist.md @@ -0,0 +1,146 @@ +# ParameterServerStrategy MNIST Example +We provide an example script `parameter_server_mnist.py` which is a Tornasole-enabled TensorFlow training script +that uses the ParameterServer to perform distributed training. + +It uses the Estimator interface of TensorFlow. + +This is an example of how you can log a distributed training job with Tornasole. + +## Integrating Tornasole +Below we call out the changes for Tornasole in the above script and describe them + +**Importing TornasoleTF** +``` +import tornasole.tensorflow as ts +``` +**Saving gradients** + +We need to wrap our optimizer with TornasoleOptimizer, and use this optimizer to minimize loss. +This will also enable us to access the gradients during analysis without having to identify which tensors out of the saved ones are the gradients. +``` +optimizer = ts.TornasoleOptimizer(optimizer) +``` + + +**Setting save interval** + +You can set different save intervals for different modes. +This can be done by passing a dictionary as save_config to the hook. +This dictionary should have the mode as key and a SaveConfig object as value. +``` +ts.TornasoleHook(..., + save_config=ts.SaveConfig(save_interval=FLAGS.tornasole_frequency), +``` +**Setting the right mode** + +Notice the calls to `hook.set_mode` at various places in the code. +``` +hook.set_mode(ts.modes.TRAIN) +``` + +``` +hook.set_mode(ts.modes.EVAL) +``` +**Passing the hook** + +We need to pass this hook to a monitored session and use this session for running the job. +``` +ts_hook = ts.TornasoleHook(...) +mnist_classifier.train(..., hooks=[ts_hook]) +``` + +``` +mnist_classifier.evaluate(..., hooks=[ts_hook]) +``` +## Running the example +### Environment +Ensure you are in a python environment which has tornasole_core installed. If you followed the recommended instructions of using Amazon Deep Learning AMI, then you might want to activate the tensorflow_p36 environment as follows. +``` +source activate tensorflow_p36 +``` +### Tornasole Path +We recommend saving tornasole outputs on S3 by passing the +flag `--tornasole_path` in the format `s3://bucket_name/prefix`. +The commands below will be shown with local path however so you can +run them immediately without having to setup S3 permissions. + +### Command + +The example script performs distributed training 2 workers and 1 parameter server by default. + +The cluster config used by the demo can be overriden by setting the + +TF_CONFIG environment variable before running the script. Details for that can be found here. [link](https://www.tensorflow.org/guide/distributed_training#setting_up_tf_config_environment_variable) + + +The default cluster specification used by the script is given below: + +``` + os.environ["TF_CONFIG"] = json.dumps( + { + "cluster": {"worker": [nodes[0], nodes[1]], "ps": [nodes[2]]}, + "task": {"type": FLAGS.node_type, "index": FLAGS.task_index}, + } + ) +``` + +The values of the nodes is populated by this snippet in the script: + +``` + try: + f = open(FLAGS.hostfile) + for line in f.readlines(): + nodes.append(line.strip()) + except OSError as e: + print(e.errno) +``` + +The script uses a hostfile as an input, where each line corresponds to a node. + +A sample hostfile can be found here [hostfile.txt](../../../../../examples/tensorflow/scripts/distributed_training/parameter_server_training/hostfile.txt) + +To setup the parameter server: + +``` +python parameter_server_mnist.py \ +--hostfile hostfile.txt \ +--steps 1000 \ +--tornasole_path ~/ts_output/ps_training \ +--tornasole_frequency 100 \ +--node_type ps --task_index 0 +``` + +To setup the first worker server: + +``` +python parameter_server_mnist.py \ +--hostfile hostfile.txt \ +--steps 1000 \ +--tornasole_path ~/ts_output/ps_training \ +--tornasole_frequency 100 \ +--node_type worker --task_index 0 +``` + +To setup the second worker server: + +``` +python parameter_server_mnist.py \ +--hostfile hostfile.txt \ +--steps 1000 \ +--tornasole_path ~/ts_output/ps_training \ +--tornasole_frequency 100 \ +--node_type worker --task_index 1 +``` + + +Note: You can limit the number of GPUs used by each worker by setting. See [link](https://stackoverflow.com/questions/39649102/how-do-i-select-which-gpu-to-run-a-job-on) +``` +export CUDA_VISIBLE_DEVICES=0,1 +``` + + +### Analysis +Refer [this page](../../../../rules/README.md) for more details on analysis. + +### More +Please refer to [Tornasole Tensorflow page](../../../README.md). diff --git a/examples/tensorflow/scripts/distributed_training/horovod_mnist_estimator.py b/examples/tensorflow/scripts/distributed_training/horovod_mnist_estimator.py new file mode 100644 index 0000000000..a875f48395 --- /dev/null +++ b/examples/tensorflow/scripts/distributed_training/horovod_mnist_estimator.py @@ -0,0 +1,268 @@ +# Copyright 2018 Uber Technologies, Inc. All Rights Reserved. +# Copyright 2016 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Convolutional Neural Network Estimator for MNIST, built with tf.layers.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse +import os +import errno +import numpy as np +import tensorflow as tf +import horovod.tensorflow as hvd + +from tensorflow import keras + +import tornasole.tensorflow as ts + +tf.logging.set_verbosity(tf.logging.INFO) + + +def cnn_model_fn(features, labels, mode): + """Model function for CNN.""" + # Input Layer + # Reshape X to 4-D tensor: [batch_size, width, height, channels] + # MNIST images are 28x28 pixels, and have one color channel + input_layer = tf.reshape(features["x"], [-1, 28, 28, 1]) + + # Convolutional Layer #1 + # Computes 32 features using a 5x5 filter with ReLU activation. + # Padding is added to preserve width and height. + # Input Tensor Shape: [batch_size, 28, 28, 1] + # Output Tensor Shape: [batch_size, 28, 28, 32] + conv1 = tf.layers.conv2d( + inputs=input_layer, filters=32, kernel_size=[5, 5], padding="same", activation=tf.nn.relu + ) + + # Pooling Layer #1 + # First max pooling layer with a 2x2 filter and stride of 2 + # Input Tensor Shape: [batch_size, 28, 28, 32] + # Output Tensor Shape: [batch_size, 14, 14, 32] + pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2, 2], strides=2) + + # Convolutional Layer #2 + # Computes 64 features using a 5x5 filter. + # Padding is added to preserve width and height. + # Input Tensor Shape: [batch_size, 14, 14, 32] + # Output Tensor Shape: [batch_size, 14, 14, 64] + conv2 = tf.layers.conv2d( + inputs=pool1, filters=64, kernel_size=[5, 5], padding="same", activation=tf.nn.relu + ) + + # Pooling Layer #2 + # Second max pooling layer with a 2x2 filter and stride of 2 + # Input Tensor Shape: [batch_size, 14, 14, 64] + # Output Tensor Shape: [batch_size, 7, 7, 64] + pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2, 2], strides=2) + + # Flatten tensor into a batch of vectors + # Input Tensor Shape: [batch_size, 7, 7, 64] + # Output Tensor Shape: [batch_size, 7 * 7 * 64] + pool2_flat = tf.reshape(pool2, [-1, 7 * 7 * 64]) + + # Dense Layer + # Densely connected layer with 1024 neurons + # Input Tensor Shape: [batch_size, 7 * 7 * 64] + # Output Tensor Shape: [batch_size, 1024] + dense = tf.layers.dense(inputs=pool2_flat, units=1024, activation=tf.nn.relu) + + # Add dropout operation; 0.6 probability that element will be kept + dropout = tf.layers.dropout( + inputs=dense, rate=0.4, training=mode == tf.estimator.ModeKeys.TRAIN + ) + + # Logits layer + # Input Tensor Shape: [batch_size, 1024] + # Output Tensor Shape: [batch_size, 10] + logits = tf.layers.dense(inputs=dropout, units=10) + + predictions = { + # Generate predictions (for PREDICT and EVAL mode) + "classes": tf.argmax(input=logits, axis=1), + # Add `softmax_tensor` to the graph. It is used for PREDICT and by the + # `logging_hook`. + "probabilities": tf.nn.softmax(logits, name="softmax_tensor"), + } + if mode == tf.estimator.ModeKeys.PREDICT: + return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions) + + # Calculate Loss (for both TRAIN and EVAL modes) + onehot_labels = tf.one_hot(indices=tf.cast(labels, tf.int32), depth=10) + loss = tf.losses.softmax_cross_entropy(onehot_labels=onehot_labels, logits=logits) + + # Configure the Training Op (for TRAIN mode) + if mode == tf.estimator.ModeKeys.TRAIN: + # Horovod: scale learning rate by the number of workers. + optimizer = tf.train.MomentumOptimizer(learning_rate=0.001 * hvd.size(), momentum=0.9) + + # Horovod: add Horovod Distributed Optimizer. + optimizer = hvd.DistributedOptimizer(optimizer) + + # Tornasole: add Tornasole Optimizer + optimizer = ts.TornasoleOptimizer(optimizer) + + train_op = optimizer.minimize(loss=loss, global_step=tf.train.get_global_step()) + return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op) + + # Add evaluation metrics (for EVAL mode) + eval_metric_ops = { + "accuracy": tf.metrics.accuracy(labels=labels, predictions=predictions["classes"]) + } + return tf.estimator.EstimatorSpec(mode=mode, loss=loss, eval_metric_ops=eval_metric_ops) + + +def str2bool(v): + if isinstance(v, bool): + return v + + if v.lower() in ("yes", "true", "t", "y", "1"): + return True + elif v.lower() in ("no", "false", "f", "n", "0"): + return False + else: + raise argparse.ArgumentTypeError("Boolean value expected.") + + +def add_cli_args(): + cmdline = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + cmdline.add_argument( + "--steps", type=int, default=20000, help="""Number of training steps to run.""" + ) + + cmdline.add_argument("--save_all", type=str2bool, default=True) + cmdline.add_argument("--tornasole_path", type=str, default="/opt/ml/output/tensors") + cmdline.add_argument( + "--tornasole_frequency", type=int, help="How often to save TS data", default=10 + ) + cmdline.add_argument( + "--reductions", + type=str2bool, + dest="reductions", + default=False, + help="save reductions of tensors instead of saving full tensors", + ) + + return cmdline + + +def main(unused_argv): + # Get commandline args + + cmdline = add_cli_args() + FLAGS, unknown_args = cmdline.parse_known_args() + + # Horovod: initialize Horovod. + hvd.init() + + # Keras automatically creates a cache directory in ~/.keras/datasets for + # storing the downloaded MNIST data. This creates a race + # condition among the workers that share the same filesystem. If the + # directory already exists by the time this worker gets around to creating + # it, ignore the resulting exception and continue. + cache_dir = os.path.join(os.path.expanduser("~"), ".keras", "datasets") + if not os.path.exists(cache_dir): + try: + os.mkdir(cache_dir) + except OSError as e: + if e.errno == errno.EEXIST and os.path.isdir(cache_dir): + pass + else: + raise + + # Download and load MNIST dataset. + (train_data, train_labels), (eval_data, eval_labels) = keras.datasets.mnist.load_data( + "MNIST-data-%d" % hvd.rank() + ) + + # The shape of downloaded data is (-1, 28, 28), hence we need to reshape it + # into (-1, 784) to feed into our network. Also, need to normalize the + # features between 0 and 1. + train_data = np.reshape(train_data, (-1, 784)) / 255.0 + eval_data = np.reshape(eval_data, (-1, 784)) / 255.0 + + # Horovod: pin GPU to be used to process local rank (one GPU per process) + config = tf.ConfigProto() + config.gpu_options.allow_growth = True + config.gpu_options.visible_device_list = str(hvd.local_rank()) + + # Horovod: save checkpoints only on worker 0 to prevent other workers from + # corrupting them. + model_dir = "./mnist_convnet_model" if hvd.rank() == 0 else None + + # Create the Estimator + mnist_classifier = tf.estimator.Estimator( + model_fn=cnn_model_fn, + model_dir=model_dir, + config=tf.estimator.RunConfig(session_config=config), + ) + + # Set up logging for predictions + # Log the values in the "Softmax" tensor with label "probabilities" + tensors_to_log = {"probabilities": "softmax_tensor"} + logging_hook = tf.train.LoggingTensorHook(tensors=tensors_to_log, every_n_iter=500) + + # Horovod: BroadcastGlobalVariablesHook broadcasts initial variable states from + # rank 0 to all other processes. This is necessary to ensure consistent + # initialization of all workers when training is started with random weights or + # restored from a checkpoint. + bcast_hook = hvd.BroadcastGlobalVariablesHook(0) + + # Train the model + train_input_fn = tf.estimator.inputs.numpy_input_fn( + x={"x": train_data}, y=train_labels, batch_size=100, num_epochs=None, shuffle=True + ) + + # Setup the Tornasole Hook + + # save tensors as reductions if necessary + rdnc = ( + ts.ReductionConfig(reductions=["mean"], abs_reductions=["max"], norms=["l1"]) + if FLAGS.reductions + else None + ) + + ts_hook = ts.TornasoleHook( + out_dir=FLAGS.tornasole_path, + save_all=FLAGS.save_all, + include_collections=["weights", "gradients", "losses", "biases"], + save_config=ts.SaveConfig(save_interval=FLAGS.tornasole_frequency), + reduction_config=rdnc, + ) + + ts_hook.set_mode(ts.modes.TRAIN) + + # Horovod: adjust number of steps based on number of GPUs. + mnist_classifier.train( + input_fn=train_input_fn, + steps=FLAGS.steps // hvd.size(), + hooks=[logging_hook, bcast_hook, ts_hook], + ) + + # Evaluate the model and print results + eval_input_fn = tf.estimator.inputs.numpy_input_fn( + x={"x": eval_data}, y=eval_labels, num_epochs=1, shuffle=False + ) + + ts_hook.set_mode(ts.modes.EVAL) + + eval_results = mnist_classifier.evaluate(input_fn=eval_input_fn, hooks=[ts_hook]) + print(eval_results) + + +if __name__ == "__main__": + tf.app.run() diff --git a/examples/tensorflow/scripts/distributed_training/mirrored_strategy_mnist.py b/examples/tensorflow/scripts/distributed_training/mirrored_strategy_mnist.py new file mode 100644 index 0000000000..82d4c30667 --- /dev/null +++ b/examples/tensorflow/scripts/distributed_training/mirrored_strategy_mnist.py @@ -0,0 +1,267 @@ +# Copyright 2016 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Convolutional Neural Network Estimator for MNIST, built with tf.layers.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse +import numpy as np +import tensorflow as tf +import tornasole.tensorflow as ts + +from tensorflow.python.client import device_lib + +tf.logging.set_verbosity(tf.logging.INFO) + + +def cnn_model_fn(features, labels, mode): + """Model function for CNN.""" + # Input Layer + # Reshape X to 4-D tensor: [batch_size, width, height, channels] + # MNIST images are 28x28 pixels, and have one color channel + input_layer = tf.reshape(features["x"], [-1, 28, 28, 1]) + + # Convolutional Layer #1 + # Computes 32 features using a 5x5 filter with ReLU activation. + # Padding is added to preserve width and height. + # Input Tensor Shape: [batch_size, 28, 28, 1] + # Output Tensor Shape: [batch_size, 28, 28, 32] + conv1 = tf.layers.conv2d( + inputs=input_layer, filters=32, kernel_size=[5, 5], padding="same", activation=tf.nn.relu + ) + + # Pooling Layer #1 + # First max pooling layer with a 2x2 filter and stride of 2 + # Input Tensor Shape: [batch_size, 28, 28, 32] + # Output Tensor Shape: [batch_size, 14, 14, 32] + pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2, 2], strides=2) + + # Convolutional Layer #2 + # Computes 64 features using a 5x5 filter. + # Padding is added to preserve width and height. + # Input Tensor Shape: [batch_size, 14, 14, 32] + # Output Tensor Shape: [batch_size, 14, 14, 64] + conv2 = tf.layers.conv2d( + inputs=pool1, filters=64, kernel_size=[5, 5], padding="same", activation=tf.nn.relu + ) + + # Pooling Layer #2 + # Second max pooling layer with a 2x2 filter and stride of 2 + # Input Tensor Shape: [batch_size, 14, 14, 64] + # Output Tensor Shape: [batch_size, 7, 7, 64] + pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2, 2], strides=2) + + # Flatten tensor into a batch of vectors + # Input Tensor Shape: [batch_size, 7, 7, 64] + # Output Tensor Shape: [batch_size, 7 * 7 * 64] + pool2_flat = tf.reshape(pool2, [-1, 7 * 7 * 64]) + + # Dense Layer + # Densely connected layer with 1024 neurons + # Input Tensor Shape: [batch_size, 7 * 7 * 64] + # Output Tensor Shape: [batch_size, 1024] + dense = tf.layers.dense(inputs=pool2_flat, units=1024, activation=tf.nn.relu) + + # Add dropout operation; 0.6 probability that element will be kept + dropout = tf.layers.dropout( + inputs=dense, rate=0.4, training=mode == tf.estimator.ModeKeys.TRAIN + ) + + # Logits layer + # Input Tensor Shape: [batch_size, 1024] + # Output Tensor Shape: [batch_size, 10] + logits = tf.layers.dense(inputs=dropout, units=10) + + predictions = { + # Generate predictions (for PREDICT and EVAL mode) + "classes": tf.argmax(input=logits, axis=1), + # Add `softmax_tensor` to the graph. It is used for PREDICT and by the + # `logging_hook`. + "probabilities": tf.nn.softmax(logits, name="softmax_tensor"), + } + if mode == tf.estimator.ModeKeys.PREDICT: + return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions) + + # Calculate Loss (for both TRAIN and EVAL modes) + loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits) + + # Configure the Training Op (for TRAIN mode) + if mode == tf.estimator.ModeKeys.TRAIN: + optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.001) + optimizer = ts.TornasoleOptimizer(optimizer) + train_op = optimizer.minimize(loss=loss, global_step=tf.train.get_global_step()) + return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op) + + # Add evaluation metrics (for EVAL mode) + eval_metric_ops = { + "accuracy": tf.metrics.accuracy(labels=labels, predictions=predictions["classes"]) + } + return tf.estimator.EstimatorSpec(mode=mode, loss=loss, eval_metric_ops=eval_metric_ops) + + +def per_device_batch_size(batch_size, num_gpus): + """For multi-gpu, batch-size must be a multiple of the number of GPUs. + Note that this should eventually be handled by DistributionStrategies + directly. Multi-GPU support is currently experimental, however, + so doing the work here until that feature is in place. + Args: + batch_size: Global batch size to be divided among devices. This should be + equal to num_gpus times the single-GPU batch_size for multi-gpu training. + num_gpus: How many GPUs are used with DistributionStrategies. + Returns: + Batch size per device. + Raises: + ValueError: if batch_size is not divisible by number of devices + """ + if num_gpus <= 1: + return batch_size + + remainder = batch_size % num_gpus + if remainder: + err = ( + "When running with multiple GPUs, batch size " + "must be a multiple of the number of available GPUs. Found {} " + "GPUs with a batch size of {}; try --batch_size={} instead." + ).format(num_gpus, batch_size, batch_size - remainder) + raise ValueError(err) + return int(batch_size / num_gpus) + + +class InputFnProvider: + def __init__(self, train_batch_size): + self.train_batch_size = train_batch_size + self.__load_data() + + def __load_data(self): + # Load training and eval data + mnist = tf.contrib.learn.datasets.load_dataset("mnist") + self.train_data = mnist.train.images # Returns np.array + self.train_labels = np.asarray(mnist.train.labels, dtype=np.int32) + self.eval_data = mnist.test.images # Returns np.array + self.eval_labels = np.asarray(mnist.test.labels, dtype=np.int32) + + def train_input_fn(self): + """An input function for training""" + # Shuffle, repeat, and batch the examples. + dataset = tf.data.Dataset.from_tensor_slices(({"x": self.train_data}, self.train_labels)) + dataset = dataset.shuffle(1000).repeat().batch(self.train_batch_size) + return dataset + + def eval_input_fn(self): + """An input function for evaluation or prediction""" + dataset = tf.data.Dataset.from_tensor_slices(({"x": self.eval_data}, self.eval_labels)) + dataset = dataset.batch(1) + return dataset + + +def str2bool(v): + if isinstance(v, bool): + return v + + if v.lower() in ("yes", "true", "t", "y", "1"): + return True + elif v.lower() in ("no", "false", "f", "n", "0"): + return False + else: + raise argparse.ArgumentTypeError("Boolean value expected.") + + +def add_cli_args(): + cmdline = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + cmdline.add_argument( + "--steps", type=int, default=20000, help="""Number of training steps to run.""" + ) + + cmdline.add_argument("--save_all", type=str2bool, default=True) + cmdline.add_argument("--tornasole_path", type=str, default="/opt/ml/output/tensors") + cmdline.add_argument( + "--tornasole_frequency", type=int, help="How often to save TS data", default=10 + ) + cmdline.add_argument( + "--reductions", + type=str2bool, + dest="reductions", + default=False, + help="save reductions of tensors instead of saving full tensors", + ) + + return cmdline + + +def get_available_gpus(): + local_device_protos = device_lib.list_local_devices() + return len([x.name for x in local_device_protos if x.device_type == "GPU"]) + + +def main(unused_argv): + num_gpus = get_available_gpus() + batch_size = 10 * num_gpus + + cmdline = add_cli_args() + FLAGS, unknown_args = cmdline.parse_known_args() + + # input_fn which serves Dataset + input_fn_provider = InputFnProvider(per_device_batch_size(batch_size, num_gpus)) + + # Use multiple GPUs by MirroredStragtegy. + # All avaiable GPUs will be used if `num_gpus` is omitted. + if num_gpus > 1: + distribution = tf.contrib.distribute.MirroredStrategy(num_gpus=num_gpus) + print("### Doing Multi GPU Training") + else: + distribution = None + # Pass to RunConfig + config = tf.estimator.RunConfig( + train_distribute=distribution, model_dir="/tmp/mnist_convnet_model" + ) + + # save tensors as reductions if necessary + rdnc = ( + ts.ReductionConfig(reductions=["mean"], abs_reductions=["max"], norms=["l1"]) + if FLAGS.reductions + else None + ) + + ts_hook = ts.TornasoleHook( + out_dir=FLAGS.tornasole_path, + save_all=FLAGS.save_all, + include_collections=["weights", "gradients", "losses", "biases"], + save_config=ts.SaveConfig(save_interval=FLAGS.tornasole_frequency), + reduction_config=rdnc, + ) + + ts_hook.set_mode(ts.modes.TRAIN) + + # Create the Estimator + # pass RunConfig + mnist_classifier = tf.estimator.Estimator(model_fn=cnn_model_fn, config=config) + + # Train the model + mnist_classifier.train( + input_fn=input_fn_provider.train_input_fn, steps=FLAGS.steps, hooks=[ts_hook] + ) + + ts_hook.set_mode(ts.modes.EVAL) + # Evaluate the model and print results + eval_results = mnist_classifier.evaluate( + input_fn=input_fn_provider.eval_input_fn, hooks=[ts_hook] + ) + print(eval_results) + + +if __name__ == "__main__": + tf.app.run() diff --git a/examples/tensorflow/scripts/distributed_training/parameter_server_training/hostfile.txt b/examples/tensorflow/scripts/distributed_training/parameter_server_training/hostfile.txt new file mode 100644 index 0000000000..abe5779932 --- /dev/null +++ b/examples/tensorflow/scripts/distributed_training/parameter_server_training/hostfile.txt @@ -0,0 +1,3 @@ +172.31.26.105:6665 +172.31.26.105:6666 +172.31.26.105:6667 diff --git a/examples/tensorflow/scripts/distributed_training/parameter_server_training/parameter_server_mnist.py b/examples/tensorflow/scripts/distributed_training/parameter_server_training/parameter_server_mnist.py new file mode 100644 index 0000000000..a756b804bf --- /dev/null +++ b/examples/tensorflow/scripts/distributed_training/parameter_server_training/parameter_server_mnist.py @@ -0,0 +1,275 @@ +# Copyright 2016 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Convolutional Neural Network Estimator for MNIST, built with tf.layers.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np +import tensorflow as tf +import tornasole.tensorflow as ts +import argparse +import json +import os + +from tensorflow.python.client import device_lib + +tf.logging.set_verbosity(tf.logging.INFO) + + +def cnn_model_fn(features, labels, mode): + """Model function for CNN.""" + input_layer = tf.reshape(features["x"], [-1, 28, 28, 1]) + + conv1 = tf.layers.conv2d( + inputs=input_layer, filters=32, kernel_size=[5, 5], padding="same", activation=tf.nn.relu + ) + + pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2, 2], strides=2) + + conv2 = tf.layers.conv2d( + inputs=pool1, filters=64, kernel_size=[5, 5], padding="same", activation=tf.nn.relu + ) + + pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2, 2], strides=2) + + pool2_flat = tf.reshape(pool2, [-1, 7 * 7 * 64]) + + dense = tf.layers.dense(inputs=pool2_flat, units=1024, activation=tf.nn.relu) + + dropout = tf.layers.dropout( + inputs=dense, rate=0.4, training=mode == tf.estimator.ModeKeys.TRAIN + ) + + logits = tf.layers.dense(inputs=dropout, units=10) + + predictions = { + # Generate predictions (for PREDICT and EVAL mode) + "classes": tf.argmax(input=logits, axis=1), + # Add `softmax_tensor` to the graph. It is used for PREDICT and by the + # `logging_hook`. + "probabilities": tf.nn.softmax(logits, name="softmax_tensor"), + } + if mode == tf.estimator.ModeKeys.PREDICT: + return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions) + + # Calculate Loss (for both TRAIN and EVAL modes) + loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits) + + # Configure the Training Op (for TRAIN mode) + if mode == tf.estimator.ModeKeys.TRAIN: + optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.001) + optimizer = ts.TornasoleOptimizer(optimizer) + train_op = optimizer.minimize(loss=loss, global_step=tf.train.get_global_step()) + return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op) + + # Add evaluation metrics (for EVAL mode) + eval_metric_ops = { + "accuracy": tf.metrics.accuracy(labels=labels, predictions=predictions["classes"]) + } + return tf.estimator.EstimatorSpec(mode=mode, loss=loss, eval_metric_ops=eval_metric_ops) + + +def per_device_batch_size(batch_size, num_gpus): + """For multi-gpu, batch-size must be a multiple of the number of GPUs. + Note that this should eventually be handled by DistributionStrategies + directly. Multi-GPU support is currently experimental, however, + so doing the work here until that feature is in place. + Args: + batch_size: Global batch size to be divided among devices. This should be + equal to num_gpus times the single-GPU batch_size for multi-gpu training. + num_gpus: How many GPUs are used with DistributionStrategies. + Returns: + Batch size per device. + Raises: + ValueError: if batch_size is not divisible by number of devices + """ + if num_gpus <= 1: + return batch_size + + remainder = batch_size % num_gpus + if remainder: + err = ( + "When running with multiple GPUs, batch size " + "must be a multiple of the number of available GPUs. Found {} " + "GPUs with a batch size of {}; try --batch_size={} instead." + ).format(num_gpus, batch_size, batch_size - remainder) + raise ValueError(err) + return int(batch_size / num_gpus) + + +class InputFnProvider: + def __init__(self, train_batch_size): + self.train_batch_size = train_batch_size + self.__load_data() + + def __load_data(self): + # Load training and eval data + mnist = tf.contrib.learn.datasets.load_dataset("mnist") + self.train_data = mnist.train.images # Returns np.array + self.train_labels = np.asarray(mnist.train.labels, dtype=np.int32) + self.eval_data = mnist.test.images # Returns np.array + self.eval_labels = np.asarray(mnist.test.labels, dtype=np.int32) + + def train_input_fn(self): + """An input function for training""" + # Shuffle, repeat, and batch the examples. + dataset = tf.data.Dataset.from_tensor_slices(({"x": self.train_data}, self.train_labels)) + dataset = dataset.shuffle(1000).repeat().batch(self.train_batch_size) + return dataset + + def eval_input_fn(self): + """An input function for evaluation or prediction""" + dataset = tf.data.Dataset.from_tensor_slices(({"x": self.eval_data}, self.eval_labels)) + dataset = dataset.batch(1) + return dataset + + +def str2bool(v): + if isinstance(v, bool): + return v + + if v.lower() in ("yes", "true", "t", "y", "1"): + return True + elif v.lower() in ("no", "false", "f", "n", "0"): + return False + else: + raise argparse.ArgumentTypeError("Boolean value expected.") + + +def add_cli_args(): + cmdline = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + cmdline.add_argument( + "--steps", type=int, default=20000, help="""Number of training steps to run.""" + ) + + cmdline.add_argument("--save_all", type=str2bool, default=True) + cmdline.add_argument("--tornasole_path", type=str, default="/opt/ml/output/tensors") + cmdline.add_argument( + "--tornasole_frequency", type=int, help="How often to save TS data", default=10 + ) + cmdline.add_argument( + "--reductions", + type=str2bool, + dest="reductions", + default=False, + help="save reductions of tensors instead of saving full tensors", + ) + + cmdline.add_argument( + "--node_type", type=str, required=True, dest="node_type", help="node type: worker or ps" + ) + + cmdline.add_argument( + "--task_index", type=int, required=True, dest="task_index", help="task index" + ) + + cmdline.add_argument( + "--hostfile", + default=None, + type=str, + required=False, + dest="hostfile", + help="Path to hostfile", + ) + + return cmdline + + +def get_available_gpus(): + local_device_protos = device_lib.list_local_devices() + return len([x.name for x in local_device_protos if x.device_type == "GPU"]) + + +def main(unused_argv): + num_gpus = get_available_gpus() + batch_size = 10 * num_gpus + + cmdline = add_cli_args() + FLAGS, unknown_args = cmdline.parse_known_args() + + # input_fn which serves Dataset + input_fn_provider = InputFnProvider(per_device_batch_size(batch_size, num_gpus)) + + # Use multiple GPUs by ParameterServerStrategy. + # All avaiable GPUs will be used if `num_gpus` is omitted. + + if num_gpus > 1: + strategy = tf.distribute.experimental.ParameterServerStrategy() + if not os.getenv("TF_CONFIG"): + if FLAGS.hostfile is None: + raise Exception("--hostfile not provided and TF_CONFIG not set. Please do either.") + nodes = list() + try: + f = open(FLAGS.hostfile) + for line in f.readlines(): + nodes.append(line.strip()) + except OSError as e: + print(e.errno) + + os.environ["TF_CONFIG"] = json.dumps( + { + "cluster": {"worker": [nodes[0], nodes[1]], "ps": [nodes[2]]}, + "task": {"type": FLAGS.node_type, "index": FLAGS.task_index}, + } + ) + + print("### Doing Multi GPU Training") + else: + strategy = None + # Pass to RunConfig + config = tf.estimator.RunConfig(train_distribute=strategy) + + # save tensors as reductions if necessary + rdnc = ( + ts.ReductionConfig(reductions=["mean"], abs_reductions=["max"], norms=["l1"]) + if FLAGS.reductions + else None + ) + + ts_hook = ts.TornasoleHook( + out_dir=FLAGS.tornasole_path, + save_all=FLAGS.save_all, + include_collections=["weights", "gradients", "losses", "biases"], + save_config=ts.SaveConfig(save_interval=FLAGS.tornasole_frequency), + reduction_config=rdnc, + ) + + ts_hook.set_mode(ts.modes.TRAIN) + + # Create the Estimator + # pass RunConfig + mnist_classifier = tf.estimator.Estimator(model_fn=cnn_model_fn, config=config) + + hooks = list() + hooks.append(ts_hook) + + train_spec = tf.estimator.TrainSpec( + input_fn=input_fn_provider.train_input_fn, max_steps=FLAGS.steps, hooks=hooks + ) + eval_spec = tf.estimator.EvalSpec( + input_fn=input_fn_provider.eval_input_fn, steps=FLAGS.steps, hooks=hooks + ) + + tf.estimator.train_and_evaluate(mnist_classifier, train_spec, eval_spec) + + # Evaluate the model and print results + eval_results = mnist_classifier.evaluate(input_fn=input_fn_provider.eval_input_fn) + print(eval_results) + + +if __name__ == "__main__": + tf.app.run() diff --git a/tests/analysis/trials/test_modes.py b/tests/analysis/trials/test_modes.py index c751ade85d..bf6b81a266 100644 --- a/tests/analysis/trials/test_modes.py +++ b/tests/analysis/trials/test_modes.py @@ -1,5 +1,6 @@ from tornasole import modes import shutil, os +import socket import numpy as np from tornasole.trials import create_trial from tornasole.core.tensor import StepState @@ -22,8 +23,9 @@ def test_mode_data(): c.get("default").tensor_names = ["arr"] c.export(os.path.join(trial_dir, TORNASOLE_DEFAULT_COLLECTIONS_FILE_NAME)) tr = create_trial(trial_dir) + worker = socket.gethostname() for s in range(0, 10): - fw = FileWriter(trial_dir=trial_dir, step=s) + fw = FileWriter(trial_dir=trial_dir, step=s, worker=worker) if s % 2 == 0: fw.write_tensor( tdata=np.array([[1.0, 2.0], [3.0, 4.0]], dtype=np.float32), diff --git a/tests/core/test_index_utils.py b/tests/core/test_index_utils.py new file mode 100644 index 0000000000..dd739e4262 --- /dev/null +++ b/tests/core/test_index_utils.py @@ -0,0 +1,64 @@ +import pytest + +from tornasole.core.utils import ( + is_s3, + serialize_tf_device, + deserialize_tf_device, + parse_worker_name_from_file, + get_worker_name_from_collection_file, +) +from tornasole.core.index_reader import S3IndexReader +from tornasole.core.s3_utils import list_s3_objects, parse_collection_files_from_s3_objects + + +def test_tf_device_name_serialize_and_deserialize(): + import tensorflow as tf + + device_name = tf.test.gpu_device_name() + if not bool(device_name): + device_name = "/device:GPU:0" + + serialized_device_name = serialize_tf_device(device_name) + assert deserialize_tf_device(serialized_device_name) == device_name + + device_name = "/replica:0/task:0/device:GPU:0" + serialized_device_name = serialize_tf_device(device_name) + assert deserialize_tf_device(serialized_device_name) == device_name + + +def test_parse_worker_name_from_index_file(): + filename = "/tmp/ts-logs/index/000000001/000000001230_worker_2.json" + worker_name = parse_worker_name_from_file(filename) + assert worker_name == "worker_2" + + filename = "/tmp/ts-logs/index/000000000499__job-worker_replica-0_task-1_device-GPU-6.json" + worker_name = parse_worker_name_from_file(filename) + assert worker_name == "/job:worker/replica:0/task:1/device:GPU:6" + + path = "s3://tornasole-testing/one-index-file" + + _, bucket, prefix = is_s3(path) + + index_files, _ = S3IndexReader.list_all_index_files_from_s3(bucket, prefix) + + filename = index_files[0] + worker_name = parse_worker_name_from_file(filename) + assert worker_name == "/job:worker/replica:0/task:1/device:GPU:4" + + +def test_parse_worker_name_from_collection_file(): + path = "s3://tornasole-testing/one-index-file" + _, bucket_name, key_name = is_s3(path) + + s3_objects, _ = list_s3_objects(bucket_name, key_name) + collection_files = parse_collection_files_from_s3_objects(s3_objects) + + assert collection_files == ["_job-worker_replica-0_task-1_device-GPU-0_collections.json"] + + collection_file = collection_files[0] + worker_name = get_worker_name_from_collection_file(collection_file) + assert worker_name == "/job:worker/replica:0/task:1/device:GPU:0" + + file_name = "job-worker_1_collections.json" + worker_name = get_worker_name_from_collection_file(file_name) + assert worker_name == "job-worker_1" diff --git a/tests/core/test_modes.py b/tests/core/test_modes.py index 6ec9819ae0..9d205669ab 100644 --- a/tests/core/test_modes.py +++ b/tests/core/test_modes.py @@ -3,14 +3,16 @@ import numpy as np from tornasole.core.modes import ModeKeys from datetime import datetime +import socket import glob import shutil def test_mode_writing(): run_id = "trial_" + datetime.now().strftime("%Y%m%d-%H%M%S%f") + worker = socket.gethostname() for s in range(0, 10): - fw = FileWriter(trial_dir="ts_outputs/" + run_id, step=s) + fw = FileWriter(trial_dir="ts_outputs/" + run_id, step=s, worker=worker) if s % 2 == 0: fw.write_tensor( tdata=np.array([[1.0, 2.0], [3.0, 4.0]], dtype=np.float32), diff --git a/tests/core/test_paths.py b/tests/core/test_paths.py index 30fa1f3058..0863f4b566 100644 --- a/tests/core/test_paths.py +++ b/tests/core/test_paths.py @@ -5,6 +5,7 @@ SAGEMAKER_TEMP_PATH_SUFFIX, NON_SAGEMAKER_TEMP_PATH_PREFIX, ) +from tornasole.core.access_layer.utils import training_has_ended import os import uuid @@ -15,6 +16,7 @@ def test_outdir_non_sagemaker(): out_dir = verify_and_get_out_dir(path) assert out_dir == path os.makedirs(path) + training_has_ended(path) try: verify_and_get_out_dir(path) # should raise exception as dir present @@ -47,3 +49,13 @@ def test_temp_paths(): tp = get_temp_path(path) assert not SAGEMAKER_TEMP_PATH_SUFFIX in tp assert tp.startswith(NON_SAGEMAKER_TEMP_PATH_PREFIX) + + +def test_s3_path_that_exists_without_end_of_job(): + path = "s3://tornasole-testing/s3-path-without-end-of-job" + verify_and_get_out_dir(path) + try: + verify_and_get_out_dir(path) + # should not raise as dir present but does not have the end of job file + except RuntimeError as e: + assert False diff --git a/tests/core/test_training_end.py b/tests/core/test_training_end.py index 4df6c9b262..ebf89a8f04 100644 --- a/tests/core/test_training_end.py +++ b/tests/core/test_training_end.py @@ -14,13 +14,13 @@ def test_local_training_end(): localdir = "/tmp/training_end_test_dir" ensure_dir(localdir, is_file=False) training_has_ended(localdir) - assert has_training_ended(localdir) == True + assert has_training_ended(localdir) is True shutil.rmtree(localdir) def test_negative_local_training_end(): localdir = "/tmp/training_end_test_dir_negative" - assert has_training_ended(localdir) == False + assert has_training_ended(localdir) is False @pytest.mark.slow # 0:04 to run @@ -30,11 +30,11 @@ def test_s3_training_end(): f = TSAccessS3(bucket_name=bucket, key_name=key) f.close() training_has_ended(s3dir) - assert has_training_ended(s3dir) == True + assert has_training_ended(s3dir) is True delete_s3_prefixes(bucket, key) @pytest.mark.slow # 0:05 to run def test_negative_s3_training_end(): s3dir = "s3://tornasolecodebuildtest/training_end_test_dir_negative" - assert has_training_ended(s3dir) == False + assert has_training_ended(s3dir) is False diff --git a/tests/core/test_utils.py b/tests/core/test_utils.py index 2b91375fd5..7b9209d918 100644 --- a/tests/core/test_utils.py +++ b/tests/core/test_utils.py @@ -1,6 +1,7 @@ import pytest -from tornasole.core.utils import is_s3, check_dir_exists +from tornasole.core.utils import is_s3 +from tornasole.core.access_layer import check_dir_exists from tornasole.core.json_config import ( DEFAULT_SAGEMAKER_TORNASOLE_PATH, collect_tornasole_config_params, diff --git a/tests/tensorflow/test_utils.py b/tests/tensorflow/test_utils.py new file mode 100644 index 0000000000..270fd37bd7 --- /dev/null +++ b/tests/tensorflow/test_utils.py @@ -0,0 +1,143 @@ +import os +import json + +from tornasole.tensorflow.hook import TornasoleHook +from tornasole.tensorflow.utils import ( + TFDistributionStrategy, + get_worker_id_from_tf_config, + get_num_workers_from_tf_config, +) + + +def test_read_tf_config(): + # Case 1: No TF_CONFIG + distibution_strategy = TornasoleHook.get_distribution_strategy() + assert distibution_strategy == TFDistributionStrategy.NONE + + # Case 2: TF_CONFIG present but empty + os.environ["TF_CONFIG"] = json.dumps({}) + + distibution_strategy = TornasoleHook.get_distribution_strategy() + assert distibution_strategy == TFDistributionStrategy.NONE + + # Case 2: TF_CONFIG present but invalid because of missing ps field + os.environ["TF_CONFIG"] = json.dumps( + { + "cluster": {"worker": ["host1:port", "host2:port", "host3:port"]}, + "task": {"type": "worker", "index": 1}, + } + ) + + distibution_strategy = TornasoleHook.get_distribution_strategy() + assert distibution_strategy == TFDistributionStrategy.NONE + + # Case 2: TF_CONFIG present and valid + os.environ["TF_CONFIG"] = json.dumps( + { + "cluster": { + "worker": ["host1:port", "host2:port", "host3:port"], + "ps": ["host4:port", "host5:port"], + }, + "task": {"type": "worker", "index": 1}, + } + ) + + distibution_strategy = TornasoleHook.get_distribution_strategy() + assert distibution_strategy == TFDistributionStrategy.PARAMETER_SERVER_STRATEGY + + del os.environ["TF_CONFIG"] + + +def test_get_worker_id_from_tf_config(): + os.environ["TF_CONFIG"] = json.dumps( + { + "cluster": { + "worker": ["host1:port", "host2:port", "host3:port"], + "ps": ["host4:port", "host5:port"], + }, + "task": {"type": "worker", "index": 1}, + } + ) + + worker_id = get_worker_id_from_tf_config(os.getenv("TF_CONFIG")) + assert worker_id == "worker_1" + + del os.environ["TF_CONFIG"] + + os.environ["TF_CONFIG"] = json.dumps( + { + "cluster": { + "worker": ["host1:port", "host2:port", "host3:port"], + "ps": ["host4:port", "host5:port"], + }, + "task": {"type": "ps", "index": 0}, + } + ) + + worker_id = get_worker_id_from_tf_config(os.getenv("TF_CONFIG")) + assert worker_id == "ps_0" + + del os.environ["TF_CONFIG"] + + os.environ["TF_CONFIG"] = json.dumps( + { + "cluster": { + "chief": ["host0:port"], + "worker": ["host1:port", "host2:port", "host3:port"], + "ps": ["host4:port", "host5:port"], + }, + "task": {"type": "chief", "index": 0}, + } + ) + + worker_id = get_worker_id_from_tf_config(os.getenv("TF_CONFIG")) + assert worker_id == "chief_0" + + del os.environ["TF_CONFIG"] + + os.environ["TF_CONFIG"] = json.dumps( + { + "cluster": { + "chief": ["host0:port"], + "worker": ["host1:port", "host2:port", "host3:port"], + "ps": ["host4:port", "host5:port"], + }, + "task": {"type": "evaluator", "index": 0}, + } + ) + + worker_id = get_worker_id_from_tf_config(os.getenv("TF_CONFIG")) + assert worker_id == "evaluator_0" + del os.environ["TF_CONFIG"] + + +def test_get_num_workers_from_tf_config(): + os.environ["TF_CONFIG"] = json.dumps( + { + "cluster": { + "worker": ["host1:port", "host2:port", "host3:port"], + "ps": ["host4:port", "host5:port"], + }, + "task": {"type": "worker", "index": 1}, + } + ) + + num_workers = get_num_workers_from_tf_config(os.getenv("TF_CONFIG")) + assert num_workers == 3 + + del os.environ["TF_CONFIG"] + + os.environ["TF_CONFIG"] = json.dumps( + { + "cluster": { + "chief": ["host0:port"], + "worker": ["host1:port", "host2:port", "host3:port"], + "ps": ["host4:port", "host5:port"], + }, + "task": {"type": "worker", "index": 1}, + } + ) + + num_workers = get_num_workers_from_tf_config(os.getenv("TF_CONFIG")) + assert num_workers == 4 + del os.environ["TF_CONFIG"] diff --git a/tornasole/core/access_layer/__init__.py b/tornasole/core/access_layer/__init__.py index ceb55658d7..73ad5e5dfb 100644 --- a/tornasole/core/access_layer/__init__.py +++ b/tornasole/core/access_layer/__init__.py @@ -1,3 +1,3 @@ from .file import TSAccessFile from .s3 import TSAccessS3 -from .utils import training_has_ended +from .utils import training_has_ended, check_dir_exists diff --git a/tornasole/core/access_layer/utils.py b/tornasole/core/access_layer/utils.py index f6f5b331c5..8614cbd9e5 100644 --- a/tornasole/core/access_layer/utils.py +++ b/tornasole/core/access_layer/utils.py @@ -2,7 +2,7 @@ from botocore.exceptions import ClientError from .file import TSAccessFile from .s3 import TSAccessS3 -from tornasole.core.utils import is_s3, check_dir_exists, get_region +from tornasole.core.utils import is_s3, get_region from tornasole.core.logger import get_logger from tornasole.core.access_layer.s3handler import S3Handler, ListRequest import asyncio @@ -16,7 +16,6 @@ def training_has_ended(trial_prefix): try: check_dir_exists(trial_prefix) # if path does not exist, then we don't need to write a file - return except RuntimeError: # dir exists pass @@ -26,9 +25,16 @@ def training_has_ended(trial_prefix): writer = TSAccessS3(bucket_name, key_name, binary=False) else: writer = TSAccessFile(file_path, "a+") - writer.write("end of training job") writer.flush() - writer.close() + try: + writer.close() + except OSError: + """ + In the case of distributed training in local mode, + another worker may have already moved the END_OF_JOB file + from the /tmp directory. + """ + pass def has_training_ended(trial_prefix): @@ -72,3 +78,33 @@ async def del_folder(bucket, keys): task = loop.create_task(del_folder(bucket, prefixes)) loop.run_until_complete(task) + + +def check_dir_exists(path): + from tornasole.core.access_layer.s3handler import S3Handler, ListRequest + + s3, bucket_name, key_name = is_s3(path) + if s3: + try: + s3_handler = S3Handler() + request = ListRequest(bucket_name, key_name) + folder = s3_handler.list_prefixes([request])[0] + if len(folder) > 0 and has_training_ended(folder[-1]): + raise RuntimeError( + "The path:{} already exists on s3. " + "Please provide a directory path that does " + "not already exist.".format(path) + ) + except ClientError as ex: + if ex.response["Error"]["Code"] == "NoSuchBucket": + # then we do not need to raise any error + pass + else: + # do not know the error + raise ex + elif os.path.exists(path) and has_training_ended(path): + raise RuntimeError( + "The path:{} already exists on local disk. " + "Please provide a directory path that does " + "not already exist".format(path) + ) diff --git a/tornasole/core/collection_manager.py b/tornasole/core/collection_manager.py index 633d210c14..c66308cf6b 100644 --- a/tornasole/core/collection_manager.py +++ b/tornasole/core/collection_manager.py @@ -17,8 +17,7 @@ def __init__(self, collections=None, create_default=False): if collections is None: collections = {} self.collections = collections - self._meta = {} - self._meta["num_workers"] = 1 + self._meta = {"num_workers": 1} def create_collection(self, name, cls=Collection): if name not in self.collections: diff --git a/tornasole/core/hook.py b/tornasole/core/hook.py index 187546cdc3..e1f6d3d8a4 100644 --- a/tornasole/core/hook.py +++ b/tornasole/core/hook.py @@ -17,7 +17,7 @@ from tornasole.core.access_layer import training_has_ended from tornasole.core.hook_utils import verify_and_get_out_dir from tornasole.core.modes import ModeKeys, ALLOWED_MODES -from tornasole.core.utils import flatten +from tornasole.core.utils import flatten, get_tb_worker from tornasole.core.logger import get_logger from tornasole.core.reductions import get_reduction_tensor_name from tornasole.core.writer import FileWriter @@ -83,7 +83,7 @@ def __init__( self.out_dir = verify_and_get_out_dir(out_dir) self.dry_run = dry_run - self.worker = self.get_worker_name() + self.worker = None if include_collections is None: include_collections = default_include_collections self.default_include_collections = default_include_collections @@ -240,6 +240,8 @@ def _close_writers(self) -> None: self._close_writer() to_delete_writers = [] + + # Delete all the tb writers for mode, writer in self.tb_writers.items(): if writer is not None: writer.flush() @@ -253,6 +255,13 @@ def _initialize_writer(self) -> None: return self.writer = FileWriter(trial_dir=self.out_dir, step=self.step, worker=self.worker) + def get_writers(self, tensor_name) -> List[FileWriter]: + """ + :param tensor_name: + :return: List[FileWriter] + """ + return [self.writer] + def _get_tb_writer(self): if self.mode in self.tb_writers: assert self.tb_writers[self.mode] is not None @@ -264,7 +273,7 @@ def _get_tb_writer(self): self.tb_writers[self.mode] = FileWriter( trial_dir=self.out_dir, step=self.step, - worker=self.worker, + worker=get_tb_worker(), write_checksum=True, wtype="tensorboard", mode=self.mode, @@ -303,6 +312,7 @@ def set_mode(self, mode): self._collections_to_save_for_step = None def export_collections(self): + self.collection_manager.set_num_workers(self.get_num_workers()) collection_file_name = f"{self.worker}_collections.json" self.collection_manager.export(os.path.join(self.out_dir, collection_file_name)) @@ -390,12 +400,14 @@ def _write_raw_tensor_simple(self, tensor_name, tensor_value): numpy_tensor_value = self._make_numpy_array(tensor_value) this_size, this_shape = size_and_shape(numpy_tensor_value) if self.dry_run is False and this_size > 0: - self.writer.write_tensor( - tdata=numpy_tensor_value, - tname=tensor_name, - mode=self.mode, - mode_step=self.mode_steps[self.mode], - ) + writers = self.get_writers(tensor_name) + for writer in writers: + writer.write_tensor( + tdata=numpy_tensor_value, + tname=tensor_name, + mode=self.mode, + mode_step=self.mode_steps[self.mode], + ) def _save_for_tensor(self, tensor_name, tensor_value, check_before_write=True): # for TF, the tensor_name coming in will the name of object in graph diff --git a/tornasole/core/hook_utils.py b/tornasole/core/hook_utils.py index 4f1bd1194e..a20f0292ca 100644 --- a/tornasole/core/hook_utils.py +++ b/tornasole/core/hook_utils.py @@ -2,8 +2,9 @@ from .json_config import DEFAULT_SAGEMAKER_TORNASOLE_PATH from .sagemaker_utils import is_sagemaker_job, get_sagemaker_out_dir -from .utils import is_s3, check_dir_exists +from .utils import is_s3 from .logger import get_logger +from .access_layer.utils import check_dir_exists logger = get_logger() diff --git a/tornasole/core/index_reader.py b/tornasole/core/index_reader.py index 8904330e92..87dc3243b0 100644 --- a/tornasole/core/index_reader.py +++ b/tornasole/core/index_reader.py @@ -77,7 +77,7 @@ def list_index_files_in_dir(dirname): @staticmethod def get_disk_responses( path, start_after_key=0, range_steps=None - ) -> Tuple[List[bytes], List[int], int]: + ) -> Tuple[List[bytes], List[int], int, List[str]]: """Read files like `trial_{datetime}/index/000/{step}_{worker}.json. Returns: diff --git a/tornasole/core/utils.py b/tornasole/core/utils.py index bdd8f659ad..abc08b041f 100644 --- a/tornasole/core/utils.py +++ b/tornasole/core/utils.py @@ -1,6 +1,7 @@ import os import re import bisect +import socket from botocore.exceptions import ClientError import json from pathlib import Path @@ -62,36 +63,6 @@ def is_s3(path): return False, None, None -def check_dir_exists(path): - from tornasole.core.access_layer.s3handler import S3Handler, ListRequest - - s3, bucket_name, key_name = is_s3(path) - if s3: - try: - s3_handler = S3Handler() - request = ListRequest(bucket_name, key_name) - folder = s3_handler.list_prefixes([request])[0] - if len(folder) > 0: - raise RuntimeError( - "The path:{} already exists on s3. " - "Please provide a directory path that does " - "not already exist.".format(path) - ) - except ClientError as ex: - if ex.response["Error"]["Code"] == "NoSuchBucket": - # then we do not need to raise any error - pass - else: - # do not know the error - raise ex - elif os.path.exists(path): - raise RuntimeError( - "The path:{} already exists on local disk. " - "Please provide a directory path that does " - "not already exist".format(path) - ) - - def list_files_in_directory(directory): files = [] for root, dir_name, filename in os.walk(directory): @@ -108,9 +79,29 @@ def list_collection_files_in_directory(directory): return files -def get_worker_name_from_collection_file(filename): - worker_name_regex = re.compile("(.+)_collections.(json|ts)") - return re.match(worker_name_regex, filename).group(1) +def serialize_tf_device(device: str) -> str: + """ + TF device strings have special characters that cannot be used in filenames. + This function is used to convert those special characters/ + :param device:str + :return: device:str + """ + # _replica-0_task-0_device-GPU-0 = /replica:0/task:0/device:GPU:0 + device = device.replace("/", "_") + device = device.replace(":", "-") + return device + + +def deserialize_tf_device(device_name: str) -> str: + """ + This function converts filenames back into tf device strings + :param device_name: str + :return: device_name: str + """ + # /replica:0/task:0/device:GPU:0 = _replica-0_task-0_device-GPU-0 + device_name = device_name.replace("_", "/") + device_name = device_name.replace("-", ":") + return device_name def match_inc(tname, include): @@ -161,7 +152,44 @@ def size_and_shape(t): return (t.nbytes, t.shape) -def parse_worker_name_from_file(filename): +def get_worker_name_from_collection_file(filename: str) -> str: + """ + Extracts the worker name from the collection file. + Collection files can currently have two formats: + 1. worker_0_collections.json + 2. _job-worker_replica-0_task-1_device-GPU-0_collections.json + The leading underscore is used to indicate + a distributed TF job worker in MirroredStrategy that needs to be deserialized. + :param filename: str + :return: worker_name: str + """ + worker_name_regex = re.compile("(.+)_collections.(json|ts)") + worker_name = re.match(worker_name_regex, filename).group(1) + if worker_name[0] == "_": + worker_name = deserialize_tf_device(worker_name) + return worker_name + + +def parse_worker_name_from_file(filename: str) -> str: + """ + Extracts the worker name from the index or event file. + Index / Event files can currently have two formats: + 1. (path_prefix)/(step_prefix)_worker_0.json + 2. (path_prefix)/(step_prefix)__replica-0_task-1_device-GPU-0.json + The double underscore after step prefix is used to indicate + a distributed TF job worker in MirroredStrategy that needs to be deserialized. + :param filename: str + :return: worker_name: str + """ # worker_2 = /tmp/ts-logs/index/000000001/000000001230_worker_2.json worker_name_regex = re.compile(".+\/\d+_(.+)\.(json|csv|tfevents)$") - return re.match(worker_name_regex, filename).group(1) + worker_name = re.match(worker_name_regex, filename).group(1) + if "__" in filename: + # /replica:0/task:0/device:GPU:0 = replica-0_task-0_device-GPU-0.json + worker_name = deserialize_tf_device(worker_name) + return worker_name + + +def get_tb_worker(): + """Generates a unique string to be used as a worker name for tensorboard writers""" + return f"{os.getpid()}_{socket.gethostname()}" diff --git a/tornasole/core/writer.py b/tornasole/core/writer.py index 3ac2097bcf..9a6d87ec17 100644 --- a/tornasole/core/writer.py +++ b/tornasole/core/writer.py @@ -42,8 +42,8 @@ class FileWriter: def __init__( self, trial_dir, + worker, step=0, - worker=None, wtype="events", mode=ModeKeys.GLOBAL, max_queue=10, @@ -58,10 +58,10 @@ def __init__( ---------- trial_dir : str Directory where event file will be written. - step: int - Global step number worker: str Worker name + step: int + Global step number wtype: str Used to denote what sort of data we are writing max_queue : int @@ -75,8 +75,7 @@ def __init__( self.step = step self.worker = worker if worker is None: - self.worker = socket.gethostname() - + assert False, "Worker should not be none. Check worker name initialization" self.mode = mode if wtype == "events": el = TensorFileLocation(step_num=self.step, worker_name=self.worker) diff --git a/tornasole/mxnet/hook.py b/tornasole/mxnet/hook.py index 446e21df2a..f8d1fdba82 100644 --- a/tornasole/mxnet/hook.py +++ b/tornasole/mxnet/hook.py @@ -53,7 +53,7 @@ def __init__( self.exported_model = False # Keep the set of blocks to which this hook is registered. The blocks include loss blocks as well. self.registered_blocks = set() - + self.worker = self.get_worker_name() set_hook(self) def get_worker_name(self): diff --git a/tornasole/pytorch/hook.py b/tornasole/pytorch/hook.py index 82daf24744..947e3d6b2b 100644 --- a/tornasole/pytorch/hook.py +++ b/tornasole/pytorch/hook.py @@ -54,10 +54,9 @@ def __init__( self.model = None self.exported_model = False - self.has_registered_module = False self.has_registered_loss_module = False - + self.worker = self.get_worker_name() set_hook(self) def get_num_workers(self): diff --git a/tornasole/tensorflow/collection.py b/tornasole/tensorflow/collection.py index bb7f6d2fd2..94aa95fab2 100644 --- a/tornasole/tensorflow/collection.py +++ b/tornasole/tensorflow/collection.py @@ -45,6 +45,8 @@ def add_tensor(self, arg, name=None): elif isinstance(arg, values.MirroredVariable): for value in arg._values: self._store_ts_tensor(Tensor.from_variable(value)) + elif isinstance(arg, values.AggregatingVariable): + self._store_ts_tensor(Tensor.from_variable(arg.get())) else: logger.error( f"Could not add {arg} of type {arg.__class__} to collection {self.name}." @@ -56,7 +58,16 @@ def add(self, arg): if isinstance(arg, list) or isinstance(arg, set): for a in arg: self.add(a) - elif isinstance(arg, (tf.Tensor, tf.Operation, tf.Variable, values.MirroredVariable)): + elif isinstance( + arg, + ( + tf.Tensor, + tf.Operation, + tf.Variable, + values.MirroredVariable, + values.AggregatingVariable, + ), + ): self.add_tensor(arg) else: logger.error( diff --git a/tornasole/tensorflow/hook.py b/tornasole/tensorflow/hook.py index 441014c0e8..7982e071a5 100644 --- a/tornasole/tensorflow/hook.py +++ b/tornasole/tensorflow/hook.py @@ -1,20 +1,29 @@ -from typing import Set - +import os import tensorflow as tf -from .utils import node_name, extract_graph_summary, get_original_fetch_ops +from .utils import ( + node_name, + extract_graph_summary, + is_parameter_server_strategy, + get_original_fetch_ops, + get_num_workers_from_tf_config, + get_worker_id_from_tf_config, + TFDistributionStrategy, +) from .reductions import get_tensorflow_reduction from .collection import get_collection_manager, Tensor, TensorType from tornasole.core.tfevent.proto.summary_pb2 import Summary +from tornasole.core.utils import match_inc, serialize_tf_device from tornasole.core.tfevent.util import make_numpy_array -from tornasole.core.utils import match_inc from tornasole.core.collection import CollectionKeys, SUMMARIES_COLLECTIONS from tornasole.core.hook import BaseHook +from tornasole.core.writer import FileWriter from tornasole.core.reductions import get_reduction_tensor_name from tornasole.core.json_config import ( TORNASOLE_CONFIG_DEFAULT_WORKER_NAME, create_hook_from_json_config, ) from tornasole.tensorflow.singleton_utils import set_hook +from typing import Optional, List, Union, Tuple, Dict, Set DEFAULT_INCLUDE_COLLECTIONS = [ @@ -51,10 +60,6 @@ def __init__( dry_run : bool when dry run is set, behavior is only described in the log file. tensors are not actually saved. - worker: string - name of worker in a multi process training job - outputs and tensors are organized by this name during retrieval. - save_config: SaveConfig object Takes save config object which is applied as default for all included tensors. A collection can optionally have its own saveconfig object @@ -95,17 +100,53 @@ def __init__( self.graph = None self.tensors_to_save_this_step = None - + self.tensor_cache = {} + """self.device_map is a mapping between a tf device string to a serialized (filename-friendly) device string + Example -> /job:worker/replica:0/task:1/device:GPU:0 : _job-worker_replica-0_task-1_device-GPU-0""" + self.device_map = {} + self.writer_map = {} + self.distribution_strategy = None set_hook(self) - def get_worker_name(self): + @staticmethod + def get_distribution_strategy() -> TFDistributionStrategy: + try: + import horovod.tensorflow as hvd + + if hvd.size(): + return TFDistributionStrategy.HOROVOD + except (ModuleNotFoundError, ValueError, ImportError): + pass + tf_config = os.getenv("TF_CONFIG") + if tf_config and is_parameter_server_strategy(tf_config): + return TFDistributionStrategy.PARAMETER_SERVER_STRATEGY + if tf.distribute.get_strategy().num_replicas_in_sync > 1: + return TFDistributionStrategy.MIRRORED_STRATEGY + return TFDistributionStrategy.NONE + + def get_worker_name(self) -> str: + """ + This function returns the name of the worker based on + the distribution strategy. + + We do not use this function for MirroredStrategy. + Device names are used as worker names for this MirroredStrategy. + The names of the workers are managed by device_map in the case of this strategy. + + It is safe to return the TORNASOLE_CONFIG_DEFAULT_WORKER_NAME in this case. + :return: str + """ try: import horovod.tensorflow as hvd if hvd.size(): return f"worker_{hvd.rank()}" except (ModuleNotFoundError, ValueError, ImportError): - return TORNASOLE_CONFIG_DEFAULT_WORKER_NAME + pass + tf_config = os.getenv("TF_CONFIG") + if tf_config and is_parameter_server_strategy(tf_config): + return get_worker_id_from_tf_config(tf_config) + return TORNASOLE_CONFIG_DEFAULT_WORKER_NAME def get_num_workers(self): try: @@ -114,7 +155,55 @@ def get_num_workers(self): if hvd.size(): return hvd.size() except (ModuleNotFoundError, ValueError, ImportError): - return 1 + pass + tf_config = os.getenv("TF_CONFIG") + if tf_config and is_parameter_server_strategy(tf_config): + return get_num_workers_from_tf_config(tf_config) + strategy = tf.distribute.get_strategy() + return strategy.num_replicas_in_sync + + def export_collections(self): + self.collection_manager.set_num_workers(self.get_num_workers()) + if self.distribution_strategy == TFDistributionStrategy.MIRRORED_STRATEGY: + for device, serialized_device in self.device_map.items(): + collection_file_name = f"{serialized_device}_collections.json" + self.collection_manager.export(os.path.join(self.out_dir, collection_file_name)) + else: + collection_file_name = f"{self.worker}_collections.json" + self.collection_manager.export(os.path.join(self.out_dir, collection_file_name)) + + def _initialize_writer(self) -> None: + if self.dry_run: + return + if self.distribution_strategy == TFDistributionStrategy.MIRRORED_STRATEGY: + """ + Initialize one writer per device string + """ + for device, device_string in self.device_map.items(): + self.writer_map[device_string] = FileWriter( + trial_dir=self.out_dir, step=self.step, worker=device_string + ) + return + self.writer = FileWriter(trial_dir=self.out_dir, step=self.step, worker=self.worker) + + def _close_writer(self) -> None: + if self.dry_run: + return + + if self.writer is not None: + self.writer.flush() + self.writer.close() + self.writer = None + + # Delete all the dist training writers + to_delete_writers = [] + for device, writer in self.writer_map.items(): + writer.flush() + writer.close() + to_delete_writers.append(device) + + for device in to_delete_writers: + del self.writer_map[device] @classmethod def hook_from_config(cls, json_config_path=None): @@ -186,6 +275,14 @@ def _check_and_add_tensor(self, tensor): ts_tensor.obj_in_graph = tensor colls_with_tensor.add(coll) + if ( + self.distribution_strategy == TFDistributionStrategy.MIRRORED_STRATEGY + and len(tensor.device) + and "CPU" not in tensor.device + and tensor.device not in self.device_map + ): + self.device_map[tensor.device] = serialize_tf_device(tensor.device) + if colls_with_tensor: # this is mapping from graph name self.tensor_to_collections[tensor.name] = colls_with_tensor @@ -215,6 +312,8 @@ def _add_summaries_tensors(self): def begin(self): # todo: should this be called first time a mode changes # todo: handle multiple graphs in the model + self.distribution_strategy = TornasoleHook.get_distribution_strategy() + self.worker = self.get_worker_name() self.graph = tf.get_default_graph() wts = tf.trainable_variables() @@ -347,6 +446,23 @@ def _get_all_tensors_values(self, results): for i in range(len(value)): yield item[i], value[i] + def get_writers(self, tensor_name) -> List[FileWriter]: + """ + For tensors generated during distributed tf jobs, we map the tensor to a writer + with its device attribute. + If the device attribute is CPU, we map it to all the writers. + For all other frameworks and single worker jobs we return a list with a single worker. + :param tensor_name: + :return: List[FileWriter] + """ + if self.distribution_strategy == TFDistributionStrategy.MIRRORED_STRATEGY: + worker = self.tensor_cache.get(tensor_name).device + if not bool(worker) or "CPU" in worker: + return list(self.writer_map.values()) + worker = self.device_map[worker] + return [self.writer_map[worker]] + return [self.writer] + def after_run(self, run_context, run_values): if self.tensors_to_save_this_step: self._initialize_writer() @@ -354,8 +470,11 @@ def after_run(self, run_context, run_values): if tensor.dtype == tf.string: self._write_tf_summary(tensor, value) else: + # todo: need to use ts_tensor for this and remove tensor_cache + self.tensor_cache[tensor.name] = tensor self._save_for_tensor(tensor.name, value, check_before_write=False) - self._close_writer() + self.tensor_cache.clear() # cleanup to remove memory footprint + self._close_writers() self._close_tb_writer() self._increment_step() diff --git a/tornasole/tensorflow/utils.py b/tornasole/tensorflow/utils.py index 09bfd80a9f..6b57c9a9b1 100644 --- a/tornasole/tensorflow/utils.py +++ b/tornasole/tensorflow/utils.py @@ -1,5 +1,15 @@ +import json import tensorflow as tf +from enum import Enum + + +class TFDistributionStrategy(Enum): + NONE = 0 + HOROVOD = 1 + MIRRORED_STRATEGY = 2 + PARAMETER_SERVER_STRATEGY = 3 + def node_name(n): if n.startswith("^"): @@ -43,3 +53,103 @@ def get_original_fetch_ops(fetches): return rval else: raise RuntimeError("Invalid fetches") + + +"""" +The TF_CONFIG environment variable is the standard way to specify the cluster configuration +to each worker that is part of the cluster. + + +Given below some examples of TF_CONFIG: + + + Example of `TF_CONFIG` for chief training worker (must have one and only one): + + Note that the chief worker also does the model training job, similar to other + non-chief training workers (see next paragraph). In addition to the model + training, it manages some extra work, e.g., checkpoint saving and restoring, + writing summaries, etc. + + TF_CONFIG='{ + "cluster": { + "chief": ["host0:2222"], + "worker": ["host1:2222", "host2:2222", "host3:2222"], + "ps": ["host4:2222", "host5:2222"] + }, + "task": {"type": "chief", "index": 0} + }' + + + Example of `TF_CONFIG` for non-chief training worker (optional, could be + multiple): + + TF_CONFIG='{ + "cluster": { + "chief": ["host0:2222"], + "worker": ["host1:2222", "host2:2222", "host3:2222"], + "ps": ["host4:2222", "host5:2222"] + }, + "task": {"type": "worker", "index": 0} + }' + + where the `task.index` should be set as 0, 1, 2, in this example, respectively + for non-chief training workers. + + + Example of `TF_CONFIG` for parameter server, aka ps (could be multiple): + + TF_CONFIG='{ + "cluster": { + "chief": ["host0:2222"], + "worker": ["host1:2222", "host2:2222", "host3:2222"], + "ps": ["host4:2222", "host5:2222"] + }, + "task": {"type": "ps", "index": 0} + }' + + where the `task.index` should be set as 0 and 1, in this example, respectively + for parameter servers. + + Example of `TF_CONFIG` for evaluator task. Evaluator is a special task that is + not part of the training cluster. There could be only one. It is used for + model evaluation. + + TF_CONFIG='{ + "cluster": { + + "chief": ["host0:2222"], + "worker": ["host1:2222", "host2:2222", "host3:2222"], + "ps": ["host4:2222", "host5:2222"] + }, + "task": {"type": "evaluator", "index": 0} + }' + + NOTE: If the "chief" is missing in TF_CONFIG["cluster"], the worker with index 0 assumes this role. + +See https://www.tensorflow.org/guide/distributed_training#setting_up_tf_config_environment_variable +""" + + +def is_parameter_server_strategy(tf_config: str) -> bool: + try: + tf_config = json.loads(tf_config) + except json.JSONDecodeError: + return False # Do not break for incorrectly set tf_config + return "cluster" in tf_config and "ps" in tf_config["cluster"] + + +def get_worker_id_from_tf_config(tf_config: str) -> str: + """Valid roles in a cluster is "chief", "worker", "ps" and "evaluator".""" + tf_config = json.loads(tf_config) + task = tf_config["task"] + worker_type = task["type"] + worker_index = task["index"] + return f"{worker_type}_{worker_index}" + + +def get_num_workers_from_tf_config(tf_config: str) -> int: + tf_config = json.loads(tf_config) + workers = tf_config["cluster"]["worker"] + if "chief" in tf_config["cluster"]: + workers.extend(tf_config["cluster"]["chief"]) + return len(workers) diff --git a/tornasole/trials/trial.py b/tornasole/trials/trial.py index a31d9aedea..32709ec32e 100644 --- a/tornasole/trials/trial.py +++ b/tornasole/trials/trial.py @@ -209,6 +209,7 @@ def _populate_step_dict(self, tensor_object, step_num): def add_tensor(self, step_num, worker, tensor_object): to = tensor_object + self.worker_set.add(worker) # todo, use worker_name here if TORNASOLE_REDUCTIONS_PREFIX in to.tensorname: tname, red_name, abs = reverse_reduction_tensor_name(to.tensorname) diff --git a/tornasole/xgboost/hook.py b/tornasole/xgboost/hook.py index 29b8069c58..b996cb56bc 100644 --- a/tornasole/xgboost/hook.py +++ b/tornasole/xgboost/hook.py @@ -10,7 +10,10 @@ from tornasole.core.hook import CallbackHook from tornasole.core.tfevent.util import make_numpy_array from tornasole.core.access_layer.utils import training_has_ended -from tornasole.core.json_config import create_hook_from_json_config +from tornasole.core.json_config import ( + create_hook_from_json_config, + TORNASOLE_CONFIG_DEFAULT_WORKER_NAME, +) from tornasole.xgboost.singleton_utils import set_hook @@ -95,6 +98,7 @@ def __init__( self.train_data = self._validate_data(train_data) self.validation_data = self._validate_data(validation_data) # as we do cleanup ourselves at end of job + self.worker = self.get_worker_name() atexit.unregister(self._cleanup) set_hook(self)