Skip to content

Commit

Permalink
Save Scalars With Mirrored Strategy (aws#259)
Browse files Browse the repository at this point in the history
  • Loading branch information
NihalHarish authored May 28, 2020
1 parent e635944 commit 055b71e
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 20 deletions.
17 changes: 14 additions & 3 deletions smdebug/core/hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ def _get_worker_name(self):
def _get_num_workers(self):
pass

@abstractmethod
def _is_not_supported(self):
pass

#### Save Manager methods ####

def _should_collection_be_saved(self, coll_name: str) -> bool:
Expand Down Expand Up @@ -418,7 +422,10 @@ def _close_writers(self) -> None:
for mode in to_delete_writers:
del self.tb_writers[mode]

def _initialize_writers(self) -> None:
def _initialize_writers(self, only_initialize_if_missing=False) -> None:
# Function is overridden in smdebug/tensorflow/base_hook.py
if only_initialize_if_missing and self.writer:
return
if self.dry_run:
return
if self.first_process is False:
Expand Down Expand Up @@ -630,6 +637,11 @@ def _write_scalars(self):
If sm_metric is set to True for certain scalars, then that scalar is written to
SageMaker as well. By default, loss values are sm_metric.
"""
if self._is_not_supported():
# Do not log scalars if smdebug hook is not supported
# Like when TFDistributionStrategy.UNSUPPORTED
self.scalar_cache = []
return
for scalar_obj in self.scalar_cache:
scalar_name = scalar_obj.name
scalar_val = scalar_obj.value
Expand All @@ -652,8 +664,7 @@ def _write_scalars(self):
scalar_name, scalar_val, self.step, timestamp=timestamp
)
if write_event:
if self.writer is None:
self._initialize_writers()
self._initialize_writers(only_initialize_if_missing=True)
self._write_raw_tensor_simple(scalar_name, scalar_val, timestamp=timestamp)

self.scalar_cache = []
Expand Down
5 changes: 4 additions & 1 deletion smdebug/tensorflow/base_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,10 @@ def _get_writers(self, tensor_name, tensor_ref) -> List[FileWriter]:
elif self.distribution_strategy == TFDistributionStrategy.MIRRORED:
if len(self.device_map):
# else is for metrics in Keras
worker = tensor_ref.tf_obj.device if tensor_ref.tf_obj is not None else "CPU"
if tensor_ref is not None and tensor_ref.tf_obj is not None:
worker = tensor_ref.tf_obj.device
else:
worker = "CPU"
# if device str is empty or cpu in worker
if not bool(worker) or "CPU" in worker:
if self.save_all_workers:
Expand Down
15 changes: 12 additions & 3 deletions tests/tensorflow2/test_keras.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
This was tested with TensorFlow 2.1, by running
`python tests/tensorflow2/test_keras.py` from the main directory.
"""
# Standard Library
import time

# Third Party
import pytest
import tensorflow.compat.v2 as tf
Expand Down Expand Up @@ -394,6 +397,10 @@ def test_gradtape_persistent(out_dir, saveall):
@pytest.mark.parametrize("saveall", [True, False])
def test_keras_fit(out_dir, tf_eager_mode, saveall):
hook = smd.KerasHook(out_dir=out_dir, save_all=saveall)
ts = time.time()
hook.save_scalar("foobar", 1, sm_metric=True, timestamp=ts)
scalars_to_be_saved = dict()
scalars_to_be_saved["scalar/foobar"] = (ts, 0)
helper_keras_fit(
trial_dir=out_dir,
hook=hook,
Expand All @@ -403,9 +410,9 @@ def test_keras_fit(out_dir, tf_eager_mode, saveall):

trial = smd.create_trial(path=out_dir)
# can't save gradients in TF 2.x eager mode
if saveall: # save losses, metrics, weights, biases
if saveall: # save losses, metrics, weights, biases, scalar
if tf_eager_mode:
assert len(trial.tensor_names()) == (12 if is_tf_2_2() else 13)
assert len(trial.tensor_names()) == (13 if is_tf_2_2() else 14)
else:
assert len(trial.tensor_names()) == 21
assert len(trial.tensor_names(collection=CollectionKeys.BIASES)) == 2
Expand All @@ -421,11 +428,13 @@ def test_keras_fit(out_dir, tf_eager_mode, saveall):
"No Optimizer Variables Should be Saved in EVAL Mode",
)
else: # save the default losses and metrics
assert len(trial.tensor_names()) == (3 if is_tf_2_2() and tf_eager_mode else 4)
assert len(trial.tensor_names()) == (4 if is_tf_2_2() and tf_eager_mode else 5)
assert len(trial.tensor_names(collection=CollectionKeys.LOSSES)) == 1
assert len(trial.tensor_names(collection=CollectionKeys.METRICS)) == (
2 if is_tf_2_2() and tf_eager_mode else 3
)
for tname in trial.tensor_names():
assert trial.tensor(tname).value(0) is not None


@pytest.mark.slow
Expand Down
36 changes: 23 additions & 13 deletions tests/tensorflow2/test_keras_mirrored.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@

# Standard Library
import os
import time

# Third Party
import pytest
import tensorflow.compat.v2 as tf
import tensorflow_datasets as tfds
from tensorflow.python.client import device_lib
from tests.core.utils import verify_files
from tests.tensorflow2.utils import is_tf_2_2
from tests.tensorflow.utils import create_trial_fast_refresh

Expand Down Expand Up @@ -119,6 +121,10 @@ def scale(image, label):
)

hooks.append(hook)
scalars_to_be_saved = dict()
ts = time.time()
scalars_to_be_saved["scalar/foobar"] = (ts, steps)
hook.save_scalar("foobar", 1, sm_metric=True, timestamp=ts)

if steps is None:
steps = ["train"]
Expand All @@ -131,7 +137,7 @@ def scale(image, label):
model.predict(train_dataset, steps=4, callbacks=hooks, verbose=0)

smd.get_hook().close()
return strategy
return strategy, scalars_to_be_saved


def exhaustive_check(trial_dir, include_workers="one", eager=True):
Expand All @@ -144,7 +150,7 @@ def exhaustive_check(trial_dir, include_workers="one", eager=True):
CollectionKeys.METRICS,
CollectionKeys.OPTIMIZER_VARIABLES,
]
strategy = train_model(
strategy, _ = train_model(
trial_dir,
include_collections=include_collections,
steps=["train", "eval", "predict", "train"],
Expand All @@ -158,9 +164,11 @@ def exhaustive_check(trial_dir, include_workers="one", eager=True):
if include_workers == "all":
assert len(tr.workers()) == strategy.num_replicas_in_sync
if eager:
assert len(tr.tensor_names()) == (6 + 1 + 2 + 5 if is_tf_2_2() else 6 + 1 + 3 + 5)
# 6 weights, 1 loss, 3 metrics, 5 optimizer variables for Tf 2.1
# 6 weights, 1 loss, 2 metrics, 5 optimizer variables for Tf 2.2
assert len(tr.tensor_names()) == (
6 + 1 + 2 + 5 + 1 if is_tf_2_2() else 6 + 1 + 3 + 5 + 1
)
# 6 weights, 1 loss, 3 metrics, 5 optimizer variables for Tf 2.1, 1 scalar
# 6 weights, 1 loss, 2 metrics, 5 optimizer variables for Tf 2.2, 1 scalar
else:
assert len(tr.tensor_names()) == (6 + 6 + 1 + 3 + strategy.num_replicas_in_sync * 3 + 5)
else:
Expand Down Expand Up @@ -235,20 +243,21 @@ def test_tf_keras(out_dir, tf_eager_mode, include_workers="all"):
@pytest.mark.slow
@pytest.mark.parametrize("workers", ["one", "all"])
def test_save_all(out_dir, tf_eager_mode, workers):
strategy = train_model(
save_config = SaveConfig(save_steps=[5])
strategy, saved_scalars = train_model(
out_dir,
include_collections=None,
save_all=True,
save_config=SaveConfig(save_steps=[5]),
save_config=save_config,
steps=["train"],
eager=tf_eager_mode,
include_workers=workers,
)
tr = create_trial_fast_refresh(out_dir)
print(tr.tensor_names())
if tf_eager_mode:
assert len(tr.tensor_names()) == (6 + 2 + 1 + 5 if is_tf_2_2() else 6 + 3 + 1 + 5)
# weights, metrics, losses, optimizer variables
assert len(tr.tensor_names()) == (6 + 2 + 1 + 5 + 1 if is_tf_2_2() else 6 + 3 + 1 + 5 + 1)
# weights, metrics, losses, optimizer variables, scalar
else:
assert (
len(tr.tensor_names())
Expand All @@ -266,6 +275,7 @@ def test_save_all(out_dir, tf_eager_mode, workers):
assert len(tr.tensor(tname).workers(0)) == (
1 if workers == "one" else strategy.num_replicas_in_sync
)
verify_files(out_dir, save_config, saved_scalars)


@pytest.mark.slow
Expand Down Expand Up @@ -350,7 +360,7 @@ def test_include_regex(out_dir, tf_eager_mode, workers):
include_workers=workers,
)
hook.get_collection("custom_coll").include("dense")
strategy = train_model(out_dir, hook=hook, steps=["train"], eager=tf_eager_mode)
strategy, _ = train_model(out_dir, hook=hook, steps=["train"], eager=tf_eager_mode)

tr = create_trial_fast_refresh(out_dir)
tnames = tr.tensor_names(collection="custom_coll")
Expand Down Expand Up @@ -378,7 +388,7 @@ def test_include_regex_opt_var(out_dir, tf_eager_mode, workers):
include_workers=workers,
)
hook.get_collection("custom_optimizer_variables").include("Adam")
strategy = train_model(out_dir, hook=hook, steps=["train"], eager=tf_eager_mode)
strategy, _ = train_model(out_dir, hook=hook, steps=["train"], eager=tf_eager_mode)

tr = create_trial_fast_refresh(out_dir)
tnames = tr.tensor_names(collection="custom_optimizer_variables")
Expand Down Expand Up @@ -411,11 +421,11 @@ def test_clash_with_tb_callback(out_dir):
add_callbacks=["tensorboard"],
)
tr = create_trial_fast_refresh(out_dir)
assert len(tr.tensor_names()) == (9 if is_tf_2_2() else 10)
assert len(tr.tensor_names()) == (10 if is_tf_2_2() else 11)


def test_one_device(out_dir, tf_eager_mode):
strategy = train_model(
strategy, _ = train_model(
out_dir,
include_collections=[
CollectionKeys.WEIGHTS,
Expand Down

0 comments on commit 055b71e

Please sign in to comment.