diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index 3ac94dfb65..7345416ceb 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -24,7 +24,7 @@ import sys import tempfile import time -from typing import Callable, List, Optional, Sequence, Union +from typing import Callable, Dict, List, Optional, NamedTuple, Sequence, Union from google.auth import credentials as auth_credentials @@ -339,6 +339,190 @@ def package_and_copy_to_gcs( return self.package_and_copy(copy_method=copy_method) +class _MachineSpec(NamedTuple): + """Specification container for Machine specs used for distributed training. + + Usage: + + spec = _MachineSpec( + replica_count=10, + machine_type='n1-standard-4', + accelerator_count=2, + accelerator_type='NVIDIA_TESLA_K80') + + Note that container and python package specs are not stored with this spec. + """ + + replica_count: int = 0 + machine_type: str = "n1-standard-4" + accelerator_count: int = 0 + accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED" + + def _get_accelerator_type(self) -> Optional[str]: + """Validates accelerator_type and returns the name of the accelerator. + + Returns: + None if no accelerator or valid accelerator name. + + Raise: + ValueError if accelerator type is invalid. + """ + + # validate accelerator type + if ( + self.accelerator_type + not in gca_accelerator_type.AcceleratorType._member_names_ + ): + raise ValueError( + f"accelerator_type `{self.accelerator_type}` invalid. " + f"Choose one of {gca_accelerator_type.AcceleratorType._member_names_}" + ) + + accelerator_enum = getattr( + gca_accelerator_type.AcceleratorType, self.accelerator_type + ) + + if ( + accelerator_enum + != gca_accelerator_type.AcceleratorType.ACCELERATOR_TYPE_UNSPECIFIED + ): + return self.accelerator_type + + @property + def spec_dict(self) -> Dict[str, Union[int, str, Dict[str, Union[int, str]]]]: + """Return specification as a Dict.""" + spec = { + "machineSpec": {"machineType": self.machine_type}, + "replicaCount": self.replica_count, + } + accelerator_type = self._get_accelerator_type() + if accelerator_type and self.accelerator_count: + spec["machineSpec"]["acceleratorType"] = accelerator_type + spec["machineSpec"]["acceleratorCount"] = self.accelerator_count + + return spec + + @property + def is_empty(self) -> bool: + """Returns True is replica_count > 0 False otherwise.""" + return self.replica_count <= 0 + + +class _DistributedTrainingSpec(NamedTuple): + """Configuration for distributed training worker pool specs. + + AI Platform Training expects configuration in this order: + [ + chief spec, # can only have one replica + worker spec, + parameter server spec, + evaluator spec + ] + + Usage: + + dist_training_spec = _DistributedTrainingSpec( + chief_spec = _MachineSpec( + replica_count=1, + machine_type='n1-standard-4', + accelerator_count=2, + accelerator_type='NVIDIA_TESLA_K80' + ), + worker_spec = _MachineSpec( + replica_count=10, + machine_type='n1-standard-4', + accelerator_count=2, + accelerator_type='NVIDIA_TESLA_K80' + ) + ) + + """ + + chief_spec: _MachineSpec = _MachineSpec() + worker_spec: _MachineSpec = _MachineSpec() + parameter_server_spec: _MachineSpec = _MachineSpec() + evaluator_spec: _MachineSpec = _MachineSpec() + + @property + def pool_specs( + self, + ) -> List[Dict[str, Union[int, str, Dict[str, Union[int, str]]]]]: + """Return each pools spec in correct order for AI Platform as a list of dicts. + + Also removes specs if they are empty but leaves specs in if there unusual + specifications to not break the ordering in AI Platform Training. + ie. 0 chief replica, 10 worker replica, 3 ps replica + + Returns: + Order list of worker pool specs suitable for AI Platform Training. + """ + if self.chief_spec.replica_count > 1: + raise ValueError("Chief spec replica count cannot be greater than 1.") + + spec_order = [ + self.chief_spec, + self.worker_spec, + self.parameter_server_spec, + self.evaluator_spec, + ] + specs = [s.spec_dict for s in spec_order] + for i in reversed(range(len(spec_order))): + if spec_order[i].is_empty: + specs.pop() + else: + break + return specs + + @classmethod + def chief_worker_pool( + cls, + replica_count: int = 0, + machine_type: str = "n1-standard-4", + accelerator_count: int = 0, + accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED", + ) -> "_DistributedTrainingSpec": + """Parameterizes Config to support only chief with worker replicas. + + For replica is assigned to chief and the remainder to workers. All spec have the + same machine type, accelerator count, and accelerator type. + + Args: + replica_count (int): + The number of worker replicas. Assigns 1 chief replica and + replica_count - 1 worker replicas. + machine_type (str): + The type of machine to use for training. + accelerator_type (str): + Hardware accelerator type. One of ACCELERATOR_TYPE_UNSPECIFIED, + NVIDIA_TESLA_K80, NVIDIA_TESLA_P100, NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, + NVIDIA_TESLA_T4, TPU_V2, TPU_V3 + accelerator_count (int): + The number of accelerators to attach to a worker replica. + + Returns: + _DistributedTrainingSpec representing one chief and n workers all of same + type. If replica_count <= 0 then an empty spec is returned. + """ + if replica_count <= 0: + return cls() + + chief_spec = _MachineSpec( + replica_count=1, + machine_type=machine_type, + accelerator_count=accelerator_count, + accelerator_type=accelerator_type, + ) + + worker_spec = _MachineSpec( + replica_count=replica_count - 1, + machine_type=machine_type, + accelerator_count=accelerator_count, + accelerator_type=accelerator_type, + ) + + return cls(chief_spec=chief_spec, worker_spec=worker_spec) + + # TODO(b/172368325) add scheduling, custom_job.Scheduling class CustomTrainingJob(base.AiPlatformResourceNoun): """Class to launch a Custom Training Job in AI Platform using a script. @@ -469,6 +653,12 @@ def run( ) -> Optional[models.Model]: """Runs the custom training job. + Distributed Training Support: + If replica count = 1 then one chief replica will be provisioned. If + replica_count > 1 the remainder will be provisioned as a worker replica pool. + ie: replica_count = 10 will result in 1 chief and 9 workers + All replicas have same machine_type, accelerator_type, and accelerator_count + Data fraction splits: Any of ``training_fraction_split``, ``validation_fraction_split`` and ``test_fraction_split`` may optionally be provided, they must sum to up to 1. If @@ -498,7 +688,11 @@ def run( args (List[Unions[str, int, float]]): Command line arguments to be passed to the Python script. replica_count (int): - The number of worker replicas. + The number of worker replicas. If replica count = 1 then one chief + replica will be provisioned. If replica_count > 1 the remainder will be + provisioned as a worker replica pool. + machine_type (str): + The type of machine to use for training. accelerator_type (str): Hardware accelerator type. One of ACCELERATOR_TYPE_UNSPECIFIED, NVIDIA_TESLA_K80, NVIDIA_TESLA_P100, NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, @@ -523,23 +717,10 @@ def run( RuntimeError if Training job has already been run, staging_bucket has not been set, or model_display_name was provided but required arguments were not provided in constructor. - NotImplementedError more then one replica. - ValueError if accelerator type is not valid. """ if self._has_run: raise RuntimeError("Custom Training has already run.") - # TODO(b/172369809) Add support for distributed training. - if replica_count > 1: - raise NotImplementedError("Distributed training not supported.") - - # validate accelerator type - if accelerator_type not in gca_accelerator_type.AcceleratorType._member_names_: - raise ValueError( - f"accelerator_type {accelerator_type} invalid. " - f"Choose one of {gca_accelerator_type.AcceleratorType._member_names_}" - ) - staging_bucket = ( self._staging_bucket or initializer.global_config.staging_bucket ) @@ -550,9 +731,7 @@ def run( "set using aiplatform.init(staging_bucket='gs://my-bucket'" ) - # if args need for model is incomplete - # TODO (b/162273530) lift requirement for predict/health route when - # validation lifted and move these args down + # if args needed for model is incomplete if model_display_name and not self._model_serving_container_image_uri: raise RuntimeError( """model_display_name was provided but @@ -561,6 +740,14 @@ def run( """ ) + # validates args and will raise + worker_pool_specs = _DistributedTrainingSpec.chief_worker_pool( + replica_count=replica_count, + machine_type=machine_type, + accelerator_count=accelerator_count, + accelerator_type=accelerator_type, + ).pool_specs + # make and copy package python_packager = _TrainingScriptPythonPackager( script_path=self._script_path, requirements=self._requirements @@ -577,30 +764,15 @@ def run( staging_bucket, "aiplatform-custom-training" ) - # create worker pool spec - worker_pool_spec = { - "replicaCount": replica_count, - "machineSpec": {"machineType": machine_type}, - "pythonPackageSpec": { + for spec in worker_pool_specs: + spec["pythonPackageSpec"] = { "executorImageUri": self._container_uri, "pythonModule": python_packager.module_name, "packageUris": [package_gcs_uri], - }, - } - - accelerator_enum = getattr( - gca_accelerator_type.AcceleratorType, accelerator_type - ) - - if ( - accelerator_enum - != gca_accelerator_type.AcceleratorType.ACCELERATOR_TYPE_UNSPECIFIED - ): - worker_pool_spec["machineSpec"]["acceleratorType"] = accelerator_type - worker_pool_spec["machineSpec"]["acceleratorCount"] = accelerator_count + } - if args: - worker_pool_spec["pythonPackageSpec"]["args"] = args + if args: + spec["pythonPackageSpec"]["args"] = args managed_model = None # create model payload @@ -640,7 +812,7 @@ def run( training_task_definition=schema.training_job.definition.custom_task, training_task_inputs=json_format.ParseDict( { - "workerPoolSpecs": [worker_pool_spec], + "workerPoolSpecs": worker_pool_specs, "baseOutputDirectory": {"output_uri_prefix": base_output_dir}, }, struct_pb2.Value(), diff --git a/tests/unit/aiplatform/test_training_jobs.py b/tests/unit/aiplatform/test_training_jobs.py index 7c48db4cbe..e1762283dc 100644 --- a/tests/unit/aiplatform/test_training_jobs.py +++ b/tests/unit/aiplatform/test_training_jobs.py @@ -831,3 +831,371 @@ def test_run_raises_if_no_staging_bucket(self): validation_fraction_split=_TEST_VALIDATION_FRACTION_SPLIT, test_fraction_split=_TEST_TEST_FRACTION_SPLIT, ) + + def test_run_call_pipeline_service_create_distributed_training( + self, + mock_pipeline_service_create, + mock_python_package_to_gcs, + mock_dataset, + mock_model_service_get, + ): + aiplatform.init(project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME) + + job = training_jobs.CustomTrainingJob( + display_name=_TEST_DISPLAY_NAME, + script_path=_TEST_LOCAL_SCRIPT_FILE_NAME, + container_uri=_TEST_TRAINING_CONTAINER_IMAGE, + model_serving_container_image_uri=_TEST_SERVING_CONTAINER_IMAGE, + model_serving_container_predict_route=_TEST_SERVING_CONTAINER_PREDICTION_ROUTE, + model_serving_container_health_route=_TEST_SERVING_CONTAINER_HEALTH_ROUTE, + ) + + model_from_job = job.run( + dataset=mock_dataset, + base_output_dir=_TEST_BASE_OUTPUT_DIR, + args=_TEST_RUN_ARGS, + replica_count=10, + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + model_display_name=_TEST_MODEL_DISPLAY_NAME, + training_fraction_split=_TEST_TRAINING_FRACTION_SPLIT, + validation_fraction_split=_TEST_VALIDATION_FRACTION_SPLIT, + test_fraction_split=_TEST_TEST_FRACTION_SPLIT, + ) + + mock_python_package_to_gcs.assert_called_once_with( + gcs_staging_dir=_TEST_BUCKET_NAME, + project=_TEST_PROJECT, + credentials=initializer.global_config.credentials, + ) + + true_args = _TEST_RUN_ARGS + + true_worker_pool_spec = [ + { + "replicaCount": 1, + "machineSpec": { + "machineType": _TEST_MACHINE_TYPE, + "acceleratorType": _TEST_ACCELERATOR_TYPE, + "acceleratorCount": _TEST_ACCELERATOR_COUNT, + }, + "pythonPackageSpec": { + "executorImageUri": _TEST_TRAINING_CONTAINER_IMAGE, + "pythonModule": training_jobs._TrainingScriptPythonPackager.module_name, + "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "args": true_args, + }, + }, + { + "replicaCount": 9, + "machineSpec": { + "machineType": _TEST_MACHINE_TYPE, + "acceleratorType": _TEST_ACCELERATOR_TYPE, + "acceleratorCount": _TEST_ACCELERATOR_COUNT, + }, + "pythonPackageSpec": { + "executorImageUri": _TEST_TRAINING_CONTAINER_IMAGE, + "pythonModule": training_jobs._TrainingScriptPythonPackager.module_name, + "packageUris": [_TEST_OUTPUT_PYTHON_PACKAGE_PATH], + "args": true_args, + }, + }, + ] + + true_fraction_split = gca_training_pipeline.FractionSplit( + training_fraction=_TEST_TRAINING_FRACTION_SPLIT, + validation_fraction=_TEST_VALIDATION_FRACTION_SPLIT, + test_fraction=_TEST_TEST_FRACTION_SPLIT, + ) + + true_container_spec = gca_model.ModelContainerSpec( + image_uri=_TEST_SERVING_CONTAINER_IMAGE, + predict_route=_TEST_SERVING_CONTAINER_PREDICTION_ROUTE, + health_route=_TEST_SERVING_CONTAINER_HEALTH_ROUTE, + ) + + true_managed_model = gca_model.Model( + display_name=_TEST_MODEL_DISPLAY_NAME, container_spec=true_container_spec + ) + + true_input_data_config = gca_training_pipeline.InputDataConfig( + fraction_split=true_fraction_split, + dataset_id=mock_dataset.name, + gcs_destination=gca_io.GcsDestination( + output_uri_prefix=_TEST_BASE_OUTPUT_DIR + ), + ) + + true_training_pipeline = gca_training_pipeline.TrainingPipeline( + display_name=_TEST_DISPLAY_NAME, + training_task_definition=schema.training_job.definition.custom_task, + training_task_inputs=json_format.ParseDict( + { + "workerPoolSpecs": true_worker_pool_spec, + "baseOutputDirectory": {"output_uri_prefix": _TEST_BASE_OUTPUT_DIR}, + }, + struct_pb2.Value(), + ), + model_to_upload=true_managed_model, + input_data_config=true_input_data_config, + ) + + mock_pipeline_service_create.assert_called_once_with( + parent=initializer.global_config.common_location_path(), + training_pipeline=true_training_pipeline, + ) + + assert job._gca_resource is mock_pipeline_service_create.return_value + + mock_model_service_get.assert_called_once_with(name=_TEST_MODEL_NAME) + + assert model_from_job._gca_resource is mock_model_service_get.return_value + + assert job.get_model()._gca_resource is mock_model_service_get.return_value + + assert not job.has_failed + + assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + + +class Test_MachineSpec: + def test_machine_spec_return_spec_dict(self): + test_spec = training_jobs._MachineSpec( + replica_count=_TEST_REPLICA_COUNT, + machine_type=_TEST_MACHINE_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + accelerator_type=_TEST_ACCELERATOR_TYPE, + ) + + true_spec_dict = { + "machineSpec": { + "machineType": _TEST_MACHINE_TYPE, + "acceleratorType": _TEST_ACCELERATOR_TYPE, + "acceleratorCount": _TEST_ACCELERATOR_COUNT, + }, + "replicaCount": _TEST_REPLICA_COUNT, + } + + assert test_spec.spec_dict == true_spec_dict + + def test_machine_spec_return_spec_dict_with_no_accelerator(self): + test_spec = training_jobs._MachineSpec( + replica_count=_TEST_REPLICA_COUNT, + machine_type=_TEST_MACHINE_TYPE, + accelerator_count=0, + accelerator_type="ACCELERATOR_TYPE_UNSPECIFIED", + ) + + true_spec_dict = { + "machineSpec": {"machineType": _TEST_MACHINE_TYPE}, + "replicaCount": _TEST_REPLICA_COUNT, + } + + assert test_spec.spec_dict == true_spec_dict + + def test_machine_spec_spec_dict_raises_invalid_accelerator(self): + test_spec = training_jobs._MachineSpec( + replica_count=_TEST_REPLICA_COUNT, + machine_type=_TEST_MACHINE_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + accelerator_type=_TEST_INVALID_ACCELERATOR_TYPE, + ) + + with pytest.raises(ValueError): + test_spec.spec_dict + + def test_machine_spec_spec_dict_is_empty(self): + test_spec = training_jobs._MachineSpec( + replica_count=0, + machine_type=_TEST_MACHINE_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + accelerator_type=_TEST_INVALID_ACCELERATOR_TYPE, + ) + + assert test_spec.is_empty + + def test_machine_spec_spec_dict_is_not_empty(self): + test_spec = training_jobs._MachineSpec( + replica_count=_TEST_REPLICA_COUNT, + machine_type=_TEST_MACHINE_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + accelerator_type=_TEST_INVALID_ACCELERATOR_TYPE, + ) + + assert not test_spec.is_empty + + +class Test_DistributedTrainingSpec: + def test_machine_spec_returns_pool_spec(self): + + spec = training_jobs._DistributedTrainingSpec( + chief_spec=training_jobs._MachineSpec( + replica_count=1, + machine_type=_TEST_MACHINE_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + accelerator_type=_TEST_ACCELERATOR_TYPE, + ), + worker_spec=training_jobs._MachineSpec( + replica_count=10, + machine_type=_TEST_MACHINE_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + accelerator_type=_TEST_ACCELERATOR_TYPE, + ), + parameter_server_spec=training_jobs._MachineSpec( + replica_count=3, + machine_type=_TEST_MACHINE_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + accelerator_type=_TEST_ACCELERATOR_TYPE, + ), + evaluator_spec=training_jobs._MachineSpec( + replica_count=1, + machine_type=_TEST_MACHINE_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + accelerator_type=_TEST_ACCELERATOR_TYPE, + ), + ) + + true_pool_spec = [ + { + "machineSpec": { + "machineType": _TEST_MACHINE_TYPE, + "acceleratorType": _TEST_ACCELERATOR_TYPE, + "acceleratorCount": _TEST_ACCELERATOR_COUNT, + }, + "replicaCount": 1, + }, + { + "machineSpec": { + "machineType": _TEST_MACHINE_TYPE, + "acceleratorType": _TEST_ACCELERATOR_TYPE, + "acceleratorCount": _TEST_ACCELERATOR_COUNT, + }, + "replicaCount": 10, + }, + { + "machineSpec": { + "machineType": _TEST_MACHINE_TYPE, + "acceleratorType": _TEST_ACCELERATOR_TYPE, + "acceleratorCount": _TEST_ACCELERATOR_COUNT, + }, + "replicaCount": 3, + }, + { + "machineSpec": { + "machineType": _TEST_MACHINE_TYPE, + "acceleratorType": _TEST_ACCELERATOR_TYPE, + "acceleratorCount": _TEST_ACCELERATOR_COUNT, + }, + "replicaCount": 1, + }, + ] + + assert spec.pool_specs == true_pool_spec + + def test_chief_worker_pool_returns_spec(self): + + chief_worker_spec = training_jobs._DistributedTrainingSpec.chief_worker_pool( + replica_count=10, + machine_type=_TEST_MACHINE_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + accelerator_type=_TEST_ACCELERATOR_TYPE, + ) + + true_pool_spec = [ + { + "machineSpec": { + "machineType": _TEST_MACHINE_TYPE, + "acceleratorType": _TEST_ACCELERATOR_TYPE, + "acceleratorCount": _TEST_ACCELERATOR_COUNT, + }, + "replicaCount": 1, + }, + { + "machineSpec": { + "machineType": _TEST_MACHINE_TYPE, + "acceleratorType": _TEST_ACCELERATOR_TYPE, + "acceleratorCount": _TEST_ACCELERATOR_COUNT, + }, + "replicaCount": 9, + }, + ] + + assert chief_worker_spec.pool_specs == true_pool_spec + + def test_chief_worker_pool_returns_just_chief(self): + + chief_worker_spec = training_jobs._DistributedTrainingSpec.chief_worker_pool( + replica_count=1, + machine_type=_TEST_MACHINE_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + accelerator_type=_TEST_ACCELERATOR_TYPE, + ) + + true_pool_spec = [ + { + "machineSpec": { + "machineType": _TEST_MACHINE_TYPE, + "acceleratorType": _TEST_ACCELERATOR_TYPE, + "acceleratorCount": _TEST_ACCELERATOR_COUNT, + }, + "replicaCount": 1, + } + ] + + assert chief_worker_spec.pool_specs == true_pool_spec + + def test_machine_spec_raise_with_more_than_one_chief_replica(self): + + spec = training_jobs._DistributedTrainingSpec( + chief_spec=training_jobs._MachineSpec( + replica_count=2, + machine_type=_TEST_MACHINE_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + accelerator_type=_TEST_ACCELERATOR_TYPE, + ), + ) + + with pytest.raises(ValueError): + spec.pool_specs + + def test_machine_spec_handles_missing_pools(self): + + spec = training_jobs._DistributedTrainingSpec( + chief_spec=training_jobs._MachineSpec( + replica_count=1, + machine_type=_TEST_MACHINE_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + accelerator_type=_TEST_ACCELERATOR_TYPE, + ), + worker_spec=training_jobs._MachineSpec(replica_count=0), + parameter_server_spec=training_jobs._MachineSpec( + replica_count=3, + machine_type=_TEST_MACHINE_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + accelerator_type=_TEST_ACCELERATOR_TYPE, + ), + evaluator_spec=training_jobs._MachineSpec(replica_count=0), + ) + + true_pool_spec = [ + { + "machineSpec": { + "machineType": _TEST_MACHINE_TYPE, + "acceleratorType": _TEST_ACCELERATOR_TYPE, + "acceleratorCount": _TEST_ACCELERATOR_COUNT, + }, + "replicaCount": 1, + }, + {"machineSpec": {"machineType": "n1-standard-4"}, "replicaCount": 0}, + { + "machineSpec": { + "machineType": _TEST_MACHINE_TYPE, + "acceleratorType": _TEST_ACCELERATOR_TYPE, + "acceleratorCount": _TEST_ACCELERATOR_COUNT, + }, + "replicaCount": 3, + }, + ] + + assert spec.pool_specs == true_pool_spec