Skip to content

Commit

Permalink
feat: add support for distributed custom training (googleapis#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
sasha-gitg authored and dizcology committed Dec 22, 2020
1 parent 1ee6c92 commit 9399ead
Show file tree
Hide file tree
Showing 2 changed files with 579 additions and 39 deletions.
250 changes: 211 additions & 39 deletions google/cloud/aiplatform/training_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import sys
import tempfile
import time
from typing import Callable, List, Optional, Sequence, Union
from typing import Callable, Dict, List, Optional, NamedTuple, Sequence, Union


from google.auth import credentials as auth_credentials
Expand Down Expand Up @@ -339,6 +339,190 @@ def package_and_copy_to_gcs(
return self.package_and_copy(copy_method=copy_method)


class _MachineSpec(NamedTuple):
"""Specification container for Machine specs used for distributed training.
Usage:
spec = _MachineSpec(
replica_count=10,
machine_type='n1-standard-4',
accelerator_count=2,
accelerator_type='NVIDIA_TESLA_K80')
Note that container and python package specs are not stored with this spec.
"""

replica_count: int = 0
machine_type: str = "n1-standard-4"
accelerator_count: int = 0
accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED"

def _get_accelerator_type(self) -> Optional[str]:
"""Validates accelerator_type and returns the name of the accelerator.
Returns:
None if no accelerator or valid accelerator name.
Raise:
ValueError if accelerator type is invalid.
"""

# validate accelerator type
if (
self.accelerator_type
not in gca_accelerator_type.AcceleratorType._member_names_
):
raise ValueError(
f"accelerator_type `{self.accelerator_type}` invalid. "
f"Choose one of {gca_accelerator_type.AcceleratorType._member_names_}"
)

accelerator_enum = getattr(
gca_accelerator_type.AcceleratorType, self.accelerator_type
)

if (
accelerator_enum
!= gca_accelerator_type.AcceleratorType.ACCELERATOR_TYPE_UNSPECIFIED
):
return self.accelerator_type

@property
def spec_dict(self) -> Dict[str, Union[int, str, Dict[str, Union[int, str]]]]:
"""Return specification as a Dict."""
spec = {
"machineSpec": {"machineType": self.machine_type},
"replicaCount": self.replica_count,
}
accelerator_type = self._get_accelerator_type()
if accelerator_type and self.accelerator_count:
spec["machineSpec"]["acceleratorType"] = accelerator_type
spec["machineSpec"]["acceleratorCount"] = self.accelerator_count

return spec

@property
def is_empty(self) -> bool:
"""Returns True is replica_count > 0 False otherwise."""
return self.replica_count <= 0


class _DistributedTrainingSpec(NamedTuple):
"""Configuration for distributed training worker pool specs.
AI Platform Training expects configuration in this order:
[
chief spec, # can only have one replica
worker spec,
parameter server spec,
evaluator spec
]
Usage:
dist_training_spec = _DistributedTrainingSpec(
chief_spec = _MachineSpec(
replica_count=1,
machine_type='n1-standard-4',
accelerator_count=2,
accelerator_type='NVIDIA_TESLA_K80'
),
worker_spec = _MachineSpec(
replica_count=10,
machine_type='n1-standard-4',
accelerator_count=2,
accelerator_type='NVIDIA_TESLA_K80'
)
)
"""

chief_spec: _MachineSpec = _MachineSpec()
worker_spec: _MachineSpec = _MachineSpec()
parameter_server_spec: _MachineSpec = _MachineSpec()
evaluator_spec: _MachineSpec = _MachineSpec()

@property
def pool_specs(
self,
) -> List[Dict[str, Union[int, str, Dict[str, Union[int, str]]]]]:
"""Return each pools spec in correct order for AI Platform as a list of dicts.
Also removes specs if they are empty but leaves specs in if there unusual
specifications to not break the ordering in AI Platform Training.
ie. 0 chief replica, 10 worker replica, 3 ps replica
Returns:
Order list of worker pool specs suitable for AI Platform Training.
"""
if self.chief_spec.replica_count > 1:
raise ValueError("Chief spec replica count cannot be greater than 1.")

spec_order = [
self.chief_spec,
self.worker_spec,
self.parameter_server_spec,
self.evaluator_spec,
]
specs = [s.spec_dict for s in spec_order]
for i in reversed(range(len(spec_order))):
if spec_order[i].is_empty:
specs.pop()
else:
break
return specs

@classmethod
def chief_worker_pool(
cls,
replica_count: int = 0,
machine_type: str = "n1-standard-4",
accelerator_count: int = 0,
accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED",
) -> "_DistributedTrainingSpec":
"""Parameterizes Config to support only chief with worker replicas.
For replica is assigned to chief and the remainder to workers. All spec have the
same machine type, accelerator count, and accelerator type.
Args:
replica_count (int):
The number of worker replicas. Assigns 1 chief replica and
replica_count - 1 worker replicas.
machine_type (str):
The type of machine to use for training.
accelerator_type (str):
Hardware accelerator type. One of ACCELERATOR_TYPE_UNSPECIFIED,
NVIDIA_TESLA_K80, NVIDIA_TESLA_P100, NVIDIA_TESLA_V100, NVIDIA_TESLA_P4,
NVIDIA_TESLA_T4, TPU_V2, TPU_V3
accelerator_count (int):
The number of accelerators to attach to a worker replica.
Returns:
_DistributedTrainingSpec representing one chief and n workers all of same
type. If replica_count <= 0 then an empty spec is returned.
"""
if replica_count <= 0:
return cls()

chief_spec = _MachineSpec(
replica_count=1,
machine_type=machine_type,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
)

worker_spec = _MachineSpec(
replica_count=replica_count - 1,
machine_type=machine_type,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
)

return cls(chief_spec=chief_spec, worker_spec=worker_spec)


# TODO(b/172368325) add scheduling, custom_job.Scheduling
class CustomTrainingJob(base.AiPlatformResourceNoun):
"""Class to launch a Custom Training Job in AI Platform using a script.
Expand Down Expand Up @@ -469,6 +653,12 @@ def run(
) -> Optional[models.Model]:
"""Runs the custom training job.
Distributed Training Support:
If replica count = 1 then one chief replica will be provisioned. If
replica_count > 1 the remainder will be provisioned as a worker replica pool.
ie: replica_count = 10 will result in 1 chief and 9 workers
All replicas have same machine_type, accelerator_type, and accelerator_count
Data fraction splits:
Any of ``training_fraction_split``, ``validation_fraction_split`` and
``test_fraction_split`` may optionally be provided, they must sum to up to 1. If
Expand Down Expand Up @@ -498,7 +688,11 @@ def run(
args (List[Unions[str, int, float]]):
Command line arguments to be passed to the Python script.
replica_count (int):
The number of worker replicas.
The number of worker replicas. If replica count = 1 then one chief
replica will be provisioned. If replica_count > 1 the remainder will be
provisioned as a worker replica pool.
machine_type (str):
The type of machine to use for training.
accelerator_type (str):
Hardware accelerator type. One of ACCELERATOR_TYPE_UNSPECIFIED,
NVIDIA_TESLA_K80, NVIDIA_TESLA_P100, NVIDIA_TESLA_V100, NVIDIA_TESLA_P4,
Expand All @@ -523,23 +717,10 @@ def run(
RuntimeError if Training job has already been run, staging_bucket has not
been set, or model_display_name was provided but required arguments
were not provided in constructor.
NotImplementedError more then one replica.
ValueError if accelerator type is not valid.
"""
if self._has_run:
raise RuntimeError("Custom Training has already run.")

# TODO(b/172369809) Add support for distributed training.
if replica_count > 1:
raise NotImplementedError("Distributed training not supported.")

# validate accelerator type
if accelerator_type not in gca_accelerator_type.AcceleratorType._member_names_:
raise ValueError(
f"accelerator_type {accelerator_type} invalid. "
f"Choose one of {gca_accelerator_type.AcceleratorType._member_names_}"
)

staging_bucket = (
self._staging_bucket or initializer.global_config.staging_bucket
)
Expand All @@ -550,9 +731,7 @@ def run(
"set using aiplatform.init(staging_bucket='gs://my-bucket'"
)

# if args need for model is incomplete
# TODO (b/162273530) lift requirement for predict/health route when
# validation lifted and move these args down
# if args needed for model is incomplete
if model_display_name and not self._model_serving_container_image_uri:
raise RuntimeError(
"""model_display_name was provided but
Expand All @@ -561,6 +740,14 @@ def run(
"""
)

# validates args and will raise
worker_pool_specs = _DistributedTrainingSpec.chief_worker_pool(
replica_count=replica_count,
machine_type=machine_type,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
).pool_specs

# make and copy package
python_packager = _TrainingScriptPythonPackager(
script_path=self._script_path, requirements=self._requirements
Expand All @@ -577,30 +764,15 @@ def run(
staging_bucket, "aiplatform-custom-training"
)

# create worker pool spec
worker_pool_spec = {
"replicaCount": replica_count,
"machineSpec": {"machineType": machine_type},
"pythonPackageSpec": {
for spec in worker_pool_specs:
spec["pythonPackageSpec"] = {
"executorImageUri": self._container_uri,
"pythonModule": python_packager.module_name,
"packageUris": [package_gcs_uri],
},
}

accelerator_enum = getattr(
gca_accelerator_type.AcceleratorType, accelerator_type
)

if (
accelerator_enum
!= gca_accelerator_type.AcceleratorType.ACCELERATOR_TYPE_UNSPECIFIED
):
worker_pool_spec["machineSpec"]["acceleratorType"] = accelerator_type
worker_pool_spec["machineSpec"]["acceleratorCount"] = accelerator_count
}

if args:
worker_pool_spec["pythonPackageSpec"]["args"] = args
if args:
spec["pythonPackageSpec"]["args"] = args

managed_model = None
# create model payload
Expand Down Expand Up @@ -640,7 +812,7 @@ def run(
training_task_definition=schema.training_job.definition.custom_task,
training_task_inputs=json_format.ParseDict(
{
"workerPoolSpecs": [worker_pool_spec],
"workerPoolSpecs": worker_pool_specs,
"baseOutputDirectory": {"output_uri_prefix": base_output_dir},
},
struct_pb2.Value(),
Expand Down
Loading

0 comments on commit 9399ead

Please sign in to comment.