Skip to content

Commit

Permalink
Fix logging issues - initial commit
Browse files Browse the repository at this point in the history
Use asyncio to read stdout and stderr streams in realtime
Report Exit code on failures
Convey user informative message if process gets OOM Killed
Filter out stderr to look for error messages and report
Prepend tags to the log files to enable easy filtering in CloudWatch
Update Amazon Licensing
Update SM doc urls
  • Loading branch information
satishpasumarthi committed Sep 1, 2021
1 parent 447e8f3 commit 254bfe8
Show file tree
Hide file tree
Showing 20 changed files with 536 additions and 154 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.idea
.vscode
.cache/
build/
dist/
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2018-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# Copyright 2018-2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the 'License'). You
# may not use this file except in compliance with the License. A copy of
Expand Down Expand Up @@ -77,6 +77,7 @@ def read_version():
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
],
install_requires=required_packages,
extras_require={
Expand Down
2 changes: 1 addition & 1 deletion src/sagemaker_training/entry_point.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2018-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# Copyright 2018-2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the 'License'). You
# may not use this file except in compliance with the License. A copy of
Expand Down
26 changes: 14 additions & 12 deletions src/sagemaker_training/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
SAGEMAKER_BASE_PATH = os.path.join("/opt", "ml") # type: str
BASE_PATH_ENV = "SAGEMAKER_BASE_DIR" # type: str

HYPERPARAMETERS_FILE = "hyperparameters.json" # type: str
RESOURCE_CONFIG_FILE = "resourceconfig.json" # type: str
INPUT_DATA_CONFIG_FILE = "inputdataconfig.json" # type: str


def _write_json(obj, path): # type: (object, str) -> None
"""Write a serializeable object as a JSON file."""
Expand Down Expand Up @@ -65,10 +69,11 @@ def _is_training_path_configured(): # type: () -> bool

def _set_base_path_env(): # type: () -> None
"""Set the environment variable SAGEMAKER_BASE_DIR as
~/sagemaker_local/{timestamp}/opt/ml
~/sagemaker_local/jobs/{timestamp}/opt/ml
"""
timestamp = str(time.time())
local_config_dir = os.path.join(
os.path.expanduser("~"), "sagemaker_local", "jobs", str(time.time()), "opt", "ml"
os.path.expanduser("~"), "sagemaker_local", "jobs", timestamp, "opt", "ml"
)

logger.info("Setting environment variable SAGEMAKER_BASE_DIR as %s ." % local_config_dir)
Expand Down Expand Up @@ -139,10 +144,6 @@ def _set_base_path_env(): # type: () -> None
str: the path to the intermediate output directory, e.g. /opt/ml/output/intermediate.
"""

HYPERPARAMETERS_FILE = "hyperparameters.json" # type: str
RESOURCE_CONFIG_FILE = "resourceconfig.json" # type: str
INPUT_DATA_CONFIG_FILE = "inputdataconfig.json" # type: str

hyperparameters_file_dir = os.path.join(input_config_dir, HYPERPARAMETERS_FILE) # type: str
input_data_config_file_dir = os.path.join(input_config_dir, INPUT_DATA_CONFIG_FILE) # type: str
resource_config_file_dir = os.path.join(input_config_dir, RESOURCE_CONFIG_FILE) # type: str
Expand Down Expand Up @@ -196,7 +197,7 @@ def read_hyperparameters(): # type: () -> dict
"""Read the hyperparameters from /opt/ml/input/config/hyperparameters.json.
For more information about hyperparameters.json:
https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-training-algo.html#your-algorithms-training-algo-running-container-hyperparameters
https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-training-algo-running-container.html#your-algorithms-training-algo-running-container-hyperparameters
Returns:
(dict[string, object]): A dictionary containing the hyperparameters.
Expand Down Expand Up @@ -225,7 +226,7 @@ def read_resource_config(): # type: () -> dict
"""Read the resource configuration from /opt/ml/input/config/resourceconfig.json.
For more information about resourceconfig.json:
https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-training-algo.html#your-algorithms-training-algo-running-container-dist-training
https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-training-algo-running-container.html#your-algorithms-training-algo-running-container-dist-training
Returns:
resource_config (dict[string, object]): the contents from /opt/ml/input/config/resourceconfig.json.
Expand Down Expand Up @@ -264,7 +265,7 @@ def read_input_data_config(): # type: () -> dict
}}
For more information about inpudataconfig.json:
https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-training-algo.html#your-algorithms-training-algo-running-container-dist-training
https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-training-algo-running-container.html#your-algorithms-training-algo-running-container-inputdataconfig
Returns:
input_data_config (dict[string, object]): Contents from /opt/ml/input/config/inputdataconfig.json.
Expand Down Expand Up @@ -305,6 +306,7 @@ def num_cpus(): # type: () -> int
Returns:
int: Number of CPUs available in the current container.
"""
# TODO: https://stackoverflow.com/questions/1006289/how-to-find-out-the-number-of-cpus-using-python
return multiprocessing.cpu_count()


Expand All @@ -326,7 +328,7 @@ class Environment(mapping.MappingMixin): # pylint:disable=too-many-public-metho
get the path of the channel 'training' from the inputdataconfig.json file
>>>training_dir = environment.channel_input_dirs['training']
get a the hyperparameter 'training_data_file' from hyperparameters.json file
get the hyperparameter 'training_data_file' from hyperparameters.json file
>>>file_name = environment.hyperparameters['training_data_file']
get the folder where the model should be saved
Expand Down Expand Up @@ -407,7 +409,7 @@ class Environment(mapping.MappingMixin): # pylint:disable=too-many-public-metho
}}
You can find more information about /opt/ml/input/config/inputdataconfig.json here:
https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-training-algo.html#your-algorithms-training-algo-running-container-inputdataconfig
https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-training-algo-running-container.html#your-algorithms-training-algo-running-container-inputdataconfig
output_data_dir (str): The dir to write non-model training artifacts (e.g. evaluation
results) which will be retained by SageMaker,
Expand Down Expand Up @@ -476,7 +478,7 @@ def __init__(self, resource_config=None, input_data_config=None, hyperparameters
}}
You can find more information about /opt/ml/input/config/inputdataconfig.json here:
https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-training-algo.html#your-algorithms-training-algo-running-container-inputdataconfig
https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-training-algo-running-container.html#your-algorithms-training-algo-running-container-inputdataconfig
hyperparameters (dict[string, object]): An instance of `HyperParameters` containing the
training job hyperparameters.
Expand Down
33 changes: 27 additions & 6 deletions src/sagemaker_training/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,42 @@ class _CalledProcessError(ClientError):
cmd, return_code, output
"""

def __init__(self, cmd, return_code=None, output=None):
self.return_code = return_code
def __init__(self, cmd, return_code=None, output=None, info=None):
self.return_code = str(return_code)
self.cmd = cmd
self.output = output
self.extra_info = info
super(_CalledProcessError, self).__init__()

def __str__(self):
if six.PY3 and self.output:
error_msg = "\n%s" % self.output.decode("latin1")
# error_msg = "%s" % self.output.decode("latin1")
if isinstance(self.output, bytes):
error_msg = "%s" % self.output.decode("utf-8")
else:
error_msg = "%s" % self.output
elif self.output:
error_msg = "\n%s" % self.output
error_msg = "%s" % self.output
else:
error_msg = ""

message = '%s:\nCommand "%s"%s' % (type(self).__name__, self.cmd, error_msg)
if self.extra_info is None:
message = '%s:\nExitCode %s\nErrorMessage "%s"\nCommand "%s"' % (
type(self).__name__,
self.return_code,
error_msg,
self.cmd,
)
else:
message = (
'%s:\nExitCode %s\nErrorMessage "%s"\nExtraInfo "%s"\nCommand "%s"'
% (
type(self).__name__,
self.return_code,
error_msg,
self.extra_info,
self.cmd,
)
)
return message.strip()


Expand Down
20 changes: 16 additions & 4 deletions src/sagemaker_training/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ def prepare(path, name): # type: (str, str) -> None
% name
)

logger.info("Module %s does not provide a setup.py. \nGenerating setup.py" % name)
logger.info(
"Module %s does not provide a setup.py. \nGenerating setup.py" % name
)

files.write_file(setup_path, data)

Expand Down Expand Up @@ -125,7 +127,11 @@ def install(path, capture_error=False): # type: (str, bool) -> None
logger.info("Installing module with the following command:\n%s", cmd)

process.check_error(
shlex.split(cmd), errors.InstallModuleError, cwd=path, capture_error=capture_error
shlex.split(cmd),
errors.InstallModuleError,
1,
cwd=path,
capture_error=capture_error,
)


Expand All @@ -142,7 +148,11 @@ def install_requirements(path, capture_error=False): # type: (str, bool) -> Non
logger.info("Installing dependencies from requirements.txt:\n{}".format(cmd))

process.check_error(
shlex.split(cmd), errors.InstallRequirementsError, cwd=path, capture_error=capture_error
shlex.split(cmd),
errors.InstallRequirementsError,
1,
cwd=path,
capture_error=capture_error,
)


Expand Down Expand Up @@ -171,4 +181,6 @@ def import_module(uri, name=DEFAULT_MODULE_NAME): # type: (str, str) -> module

return module
except Exception as e: # pylint: disable=broad-except
six.reraise(errors.ImportModuleError, errors.ImportModuleError(e), sys.exc_info()[2])
six.reraise(
errors.ImportModuleError, errors.ImportModuleError(e), sys.exc_info()[2]
)
55 changes: 36 additions & 19 deletions src/sagemaker_training/mpi.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2018-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# Copyright 2018-2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the 'License'). You
# may not use this file except in compliance with the License. A copy of
Expand Down Expand Up @@ -31,10 +31,12 @@

class WorkerRunner(process.ProcessRunner):
"""Runner responsible for preparing MPI distributed training and waiting for MPI
master execution to finish.
master execution to finish.
"""

def __init__(self, user_entry_point, args, env_vars, master_hostname):
def __init__(
self, user_entry_point, args, env_vars, processes_per_host, master_hostname
):
"""Initialize a WorkerRunner, which is responsible for preparing distributed
training with MPI and waiting for MPI master execution to finish.
Expand All @@ -44,7 +46,9 @@ def __init__(self, user_entry_point, args, env_vars, master_hostname):
env_vars (dict(str,str)): A dictionary of environment variables.
master_hostname (str): The master hostname.
"""
super(WorkerRunner, self).__init__(user_entry_point, args, env_vars)
super(WorkerRunner, self).__init__(
user_entry_point, args, env_vars, processes_per_host
)
self._master_hostname = str(master_hostname)

def run(
Expand All @@ -62,7 +66,9 @@ def run(
self._wait_master_to_start()
logger.info("MPI Master online, creating SSH daemon.")

logger.info("Writing environment variables to /etc/environment for the MPI process.")
logger.info(
"Writing environment variables to /etc/environment for the MPI process."
)
_write_env_vars_to_file()

_start_sshd_daemon()
Expand Down Expand Up @@ -99,7 +105,9 @@ def _wait_orted_process_to_finish(): # type: () -> None
def _orted_process(): # pylint: disable=inconsistent-return-statements
"""Wait a maximum of 5 minutes for orted process to start."""
for _ in range(5 * 60):
procs = [p for p in psutil.process_iter(attrs=["name"]) if p.info["name"] == "orted"]
procs = [
p for p in psutil.process_iter(attrs=["name"]) if p.info["name"] == "orted"
]
if procs:
logger.info("Process[es]: %s", procs)
return procs
Expand All @@ -116,14 +124,14 @@ def __init__(
user_entry_point,
args,
env_vars,
processes_per_host,
master_hostname,
hosts,
process_per_host,
custom_mpi_options,
network_interface_name,
interval=1,
timeout_in_seconds=60 * 60,
num_processes=None,
num_processes=1,
):
"""Initialize a MasterRunner, which is responsible for preparing distributed
training with MPI and synchronizing work among the Workers.
Expand All @@ -134,7 +142,7 @@ def __init__(
env_vars (dict(str,str)): A dictionary of environment variables.
master_hostname (str): The master hostname.
hosts ([str]): A list of hosts.
process_per_host (int): Number of processes per host.
processes_per_host (int): Number of processes per host.
custom_mpi_options (str): A string of custom MPI options to be parsed.
network_interface_name (str): The network interface name.
interval (int or float): The interval at which to check the connection in seconds.
Expand All @@ -144,11 +152,12 @@ def __init__(
num_processes (int): The total number of processes.
"""

super(MasterRunner, self).__init__(user_entry_point, args, env_vars)
super(MasterRunner, self).__init__(
user_entry_point, args, env_vars, processes_per_host
)

self._master_hostname = master_hostname
self._hosts = hosts
self._process_per_host = process_per_host
self._num_processes = num_processes
self._custom_mpi_options = custom_mpi_options
self._network_interface_name = network_interface_name
Expand All @@ -174,16 +183,20 @@ def _wait_for_workers(self): # type: () -> None

def _create_command(self):
num_hosts = len(self._hosts)
num_processes = self._num_processes or self._process_per_host * num_hosts
num_processes = self._num_processes or self._processes_per_host * num_hosts

# By default, use one process per GPU, or one process per node (if training with CPU).
if self._process_per_host == 1:
if self._processes_per_host == 1:
host_list = self._hosts
else:
host_list = ["%s:%s" % (host, self._process_per_host) for host in self._hosts]
host_list = [
"%s:%s" % (host, self._processes_per_host) for host in self._hosts
]

msg = "Env Hosts: %s Hosts: %s process_per_hosts: %s num_processes: %s"
logger.info(msg, self._hosts, host_list, self._process_per_host, num_processes)
logger.info(
msg, self._hosts, host_list, self._processes_per_host, num_processes
)

overridden_known_options, additional_options = _parse_custom_mpi_options(
self._custom_mpi_options
Expand Down Expand Up @@ -241,7 +254,11 @@ def _create_command(self):

command.extend(additional_options)

for credential in ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_SESSION_TOKEN"]:
for credential in [
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
"AWS_SESSION_TOKEN",
]:
if credential in os.environ:
command.extend(["-x", credential])

Expand Down Expand Up @@ -291,9 +308,9 @@ def _start_sshd_daemon(): # type: () -> None
def _can_connect(host, port=22): # type: (str, int) -> bool
"""Check if the connection to provided ``host`` and ``port`` is possible.
Args:
host (str): Hostname for the host to check connection.
port (int): Port name of the host to check connection on.
Args:
host (str): Hostname for the host to check connection.
port (int): Port name of the host to check connection on.
"""
try:
logger.debug("Testing connection to host %s", host)
Expand Down
Loading

0 comments on commit 254bfe8

Please sign in to comment.