diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index 0f10a52311..0cfb28c462 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -591,6 +591,7 @@ def _run_job( gcs_destination_uri_prefix: Optional[str] = None, bigquery_destination: Optional[str] = None, create_request_timeout: Optional[float] = None, + block: Optional[bool] = True, ) -> Optional[models.Model]: """Runs the training job. @@ -769,6 +770,8 @@ def _run_job( - AIP_TEST_DATA_URI = "bigquery_destination.dataset_*.test" create_request_timeout (float): Optional. The timeout for the create request in seconds. + block (bool): + Optional. If True, block until complete. """ input_data_config = self._create_input_data_config( @@ -824,7 +827,7 @@ def _run_job( _LOGGER.info("View Training:\n%s" % self._dashboard_uri()) - model = self._get_model() + model = self._get_model(block=block) if model is None: _LOGGER.warning( @@ -901,7 +904,7 @@ def _force_get_model(self, sync: bool = True) -> models.Model: return model - def _get_model(self) -> Optional[models.Model]: + def _get_model(self, block: bool = True) -> Optional[models.Model]: """Helper method to get and instantiate the Model to Upload. Returns: @@ -911,7 +914,8 @@ def _get_model(self) -> Optional[models.Model]: Raises: RuntimeError: If Training failed. """ - self._block_until_complete() + if block: + self._block_until_complete() if self.has_failed: raise RuntimeError( @@ -3264,10 +3268,8 @@ def run( create_request_timeout=create_request_timeout, ) - @base.optional_sync(construct_object_on_arg="managed_model") - def _run( + def submit( self, - python_packager: source_utils._TrainingScriptPythonPackager, dataset: Optional[ Union[ datasets.ImageDataset, @@ -3275,21 +3277,30 @@ def _run( datasets.TextDataset, datasets.VideoDataset, ] - ], - annotation_schema_uri: Optional[str], - worker_pool_specs: worker_spec_utils._DistributedTrainingSpec, - managed_model: Optional[gca_model.Model] = None, + ] = None, + annotation_schema_uri: Optional[str] = None, + model_display_name: Optional[str] = None, + model_labels: Optional[Dict[str, str]] = None, model_id: Optional[str] = None, parent_model: Optional[str] = None, is_default_version: Optional[bool] = True, model_version_aliases: Optional[Sequence[str]] = None, model_version_description: Optional[str] = None, - args: Optional[List[Union[str, float, int]]] = None, - environment_variables: Optional[Dict[str, str]] = None, base_output_dir: Optional[str] = None, service_account: Optional[str] = None, network: Optional[str] = None, bigquery_destination: Optional[str] = None, + args: Optional[List[Union[str, float, int]]] = None, + environment_variables: Optional[Dict[str, str]] = None, + replica_count: int = 1, + machine_type: str = "n1-standard-4", + accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED", + accelerator_count: int = 0, + boot_disk_type: str = "pd-ssd", + boot_disk_size_gb: int = 100, + reduction_server_replica_count: int = 0, + reduction_server_machine_type: Optional[str] = None, + reduction_server_container_uri: Optional[str] = None, training_fraction_split: Optional[float] = None, validation_fraction_split: Optional[float] = None, test_fraction_split: Optional[float] = None, @@ -3303,15 +3314,49 @@ def _run( enable_web_access: bool = False, enable_dashboard_access: bool = False, tensorboard: Optional[str] = None, - reduction_server_container_uri: Optional[str] = None, sync=True, create_request_timeout: Optional[float] = None, ) -> Optional[models.Model]: - """Packages local script and launches training_job. + """Submits the custom training job without blocking until completion. + + Distributed Training Support: + If replica count = 1 then one chief replica will be provisioned. If + replica_count > 1 the remainder will be provisioned as a worker replica pool. + ie: replica_count = 10 will result in 1 chief and 9 workers + All replicas have same machine_type, accelerator_type, and accelerator_count + + If training on a Vertex AI dataset, you can use one of the following split configurations: + Data fraction splits: + Any of ``training_fraction_split``, ``validation_fraction_split`` and + ``test_fraction_split`` may optionally be provided, they must sum to up to 1. If + the provided ones sum to less than 1, the remainder is assigned to sets as + decided by Vertex AI. If none of the fractions are set, by default roughly 80% + of data will be used for training, 10% for validation, and 10% for test. + + Data filter splits: + Assigns input data to training, validation, and test sets + based on the given filters, data pieces not matched by any + filter are ignored. Currently only supported for Datasets + containing DataItems. + If any of the filters in this message are to match nothing, then + they can be set as '-' (the minus sign). + If using filter splits, all of ``training_filter_split``, ``validation_filter_split`` and + ``test_filter_split`` must be provided. + Supported only for unstructured Datasets. + + Predefined splits: + Assigns input data to training, validation, and test sets based on the value of a provided key. + If using predefined splits, ``predefined_split_column_name`` must be provided. + Supported only for tabular Datasets. + + Timestamp splits: + Assigns input data to training, validation, and test sets + based on a provided timestamps. The youngest data pieces are + assigned to training set, next to validation set, and the oldest + to the test set. + Supported only for tabular Datasets. Args: - python_packager (source_utils._TrainingScriptPythonPackager): - Required. Python Packager pointing to training script locally. dataset ( Union[ datasets.ImageDataset, @@ -3320,14 +3365,54 @@ def _run( datasets.VideoDataset, ] ): - Vertex AI to fit this training against. + Vertex AI to fit this training against. Custom training script should + retrieve datasets through passed in environment variables uris: + + os.environ["AIP_TRAINING_DATA_URI"] + os.environ["AIP_VALIDATION_DATA_URI"] + os.environ["AIP_TEST_DATA_URI"] + + Additionally the dataset format is passed in as: + + os.environ["AIP_DATA_FORMAT"] annotation_schema_uri (str): Google Cloud Storage URI points to a YAML file describing - annotation schema. - worker_pools_spec (worker_spec_utils._DistributedTrainingSpec): - Worker pools pecs required to run job. - managed_model (gca_model.Model): - Model proto if this script produces a Managed Model. + annotation schema. The schema is defined as an OpenAPI 3.0.2 + [Schema Object](https://github.com/OAI/OpenAPI-Specification/blob/main/versions/3.0.2.md#schema-object) The schema files + that can be used here are found in + gs://google-cloud-aiplatform/schema/dataset/annotation/, + note that the chosen schema must be consistent with + ``metadata`` + of the Dataset specified by + ``dataset_id``. + + Only Annotations that both match this schema and belong to + DataItems not ignored by the split method are used in + respectively training, validation or test role, depending on + the role of the DataItem they are on. + + When used in conjunction with + ``annotations_filter``, + the Annotations used for training are filtered by both + ``annotations_filter`` + and + ``annotation_schema_uri``. + model_display_name (str): + If the script produces a managed Vertex AI Model. The display name of + the Model. The name can be up to 128 characters long and can be consist + of any UTF-8 characters. + + If not provided upon creation, the job's display_name is used. + model_labels (Dict[str, str]): + Optional. The labels with user-defined metadata to + organize your Models. + Label keys and values can be no longer than 64 + characters (Unicode codepoints), can only + contain lowercase letters, numeric characters, + underscores and dashes. International characters + are allowed. + See https://goo.gl/xmQnxf for more information + and examples of labels. model_id (str): Optional. The ID to use for the Model produced by this job, which will become the final component of the model resource name. @@ -3358,18 +3443,6 @@ def _run( The format is [a-z][a-zA-Z0-9-]{0,126}[a-z0-9] model_version_description (str): Optional. The description of the model version being uploaded by this job. - args (List[Unions[str, int, float]]): - Command line arguments to be passed to the Python script. - environment_variables (Dict[str, str]): - Environment variables to be passed to the container. - Should be a dictionary where keys are environment variable names - and values are environment variable values for those names. - At most 10 environment variables can be specified. - The Name of the environment variable must be unique. - - environment_variables = { - 'MY_KEY': 'MY_VALUE' - } base_output_dir (str): GCS output directory of job. If not provided a timestamped directory in the staging directory will be used. @@ -3387,7 +3460,8 @@ def _run( The full name of the Compute Engine network to which the job should be peered. For example, projects/12345/global/networks/myVPC. Private services access must already be configured for the network. - If left unspecified, the job is not peered with any network. + If left unspecified, the network set in aiplatform.init will be used. + Otherwise, the job is not peered with any network. bigquery_destination (str): Provide this field if `dataset` is a BigQuery dataset. The BigQuery project location where the training data is to @@ -3403,6 +3477,44 @@ def _run( - AIP_TRAINING_DATA_URI ="bigquery_destination.dataset_*.training" - AIP_VALIDATION_DATA_URI = "bigquery_destination.dataset_*.validation" - AIP_TEST_DATA_URI = "bigquery_destination.dataset_*.test" + args (List[Unions[str, int, float]]): + Command line arguments to be passed to the Python script. + environment_variables (Dict[str, str]): + Environment variables to be passed to the container. + Should be a dictionary where keys are environment variable names + and values are environment variable values for those names. + At most 10 environment variables can be specified. + The Name of the environment variable must be unique. + + environment_variables = { + 'MY_KEY': 'MY_VALUE' + } + replica_count (int): + The number of worker replicas. If replica count = 1 then one chief + replica will be provisioned. If replica_count > 1 the remainder will be + provisioned as a worker replica pool. + machine_type (str): + The type of machine to use for training. + accelerator_type (str): + Hardware accelerator type. One of ACCELERATOR_TYPE_UNSPECIFIED, + NVIDIA_TESLA_K80, NVIDIA_TESLA_P100, NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, + NVIDIA_TESLA_T4 + accelerator_count (int): + The number of accelerators to attach to a worker replica. + boot_disk_type (str): + Type of the boot disk, default is `pd-ssd`. + Valid values: `pd-ssd` (Persistent Disk Solid State Drive) or + `pd-standard` (Persistent Disk Hard Disk Drive). + boot_disk_size_gb (int): + Size in GB of the boot disk, default is 100GB. + boot disk size must be within the range of [100, 64000]. + reduction_server_replica_count (int): + The number of reduction server replicas, default is 0. + reduction_server_machine_type (str): + Optional. The type of machine to use for reduction server. + reduction_server_container_uri (str): + Optional. The Uri of the reduction server container image. + See details: https://cloud.google.com/vertex-ai/docs/training/distributed-training#reduce_training_time_with_reduction_server training_fraction_split (float): Optional. The fraction of the input data that is to be used to train the Model. This is ignored if Dataset is not provided. @@ -3481,73 +3593,54 @@ def _run( `service_account` is required with provided `tensorboard`. For more information on configuring your service account please visit: https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training - reduction_server_container_uri (str): - Optional. The Uri of the reduction server container image. + create_request_timeout (float): + Optional. The timeout for the create request in seconds. sync (bool): Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will be immediately returned and synced when the Future has completed. - create_request_timeout (float) - Optional. The timeout for the create request in seconds Returns: model: The trained Vertex AI Model resource or None if training did not produce a Vertex AI Model. """ - package_gcs_uri = python_packager.package_and_copy_to_gcs( - gcs_staging_dir=self._staging_bucket, - project=self.project, - credentials=self.credentials, - ) - - for spec_order, spec in enumerate(worker_pool_specs): - - if not spec: - continue - - if ( - spec_order == worker_spec_utils._SPEC_ORDERS["server_spec"] - and reduction_server_container_uri - ): - spec["container_spec"] = { - "image_uri": reduction_server_container_uri, - } - else: - spec["python_package_spec"] = { - "executor_image_uri": self._container_uri, - "python_module": python_packager.module_name, - "package_uris": [package_gcs_uri], - } + network = network or initializer.global_config.network - if args: - spec["python_package_spec"]["args"] = args + worker_pool_specs, managed_model = self._prepare_and_validate_run( + model_display_name=model_display_name, + model_labels=model_labels, + replica_count=replica_count, + machine_type=machine_type, + accelerator_count=accelerator_count, + accelerator_type=accelerator_type, + boot_disk_type=boot_disk_type, + boot_disk_size_gb=boot_disk_size_gb, + reduction_server_replica_count=reduction_server_replica_count, + reduction_server_machine_type=reduction_server_machine_type, + ) - if environment_variables: - spec["python_package_spec"]["env"] = [ - {"name": key, "value": value} - for key, value in environment_variables.items() - ] + # make and copy package + python_packager = source_utils._TrainingScriptPythonPackager( + script_path=self._script_path, requirements=self._requirements + ) - ( - training_task_inputs, - base_output_dir, - ) = self._prepare_training_task_inputs_and_output_dir( + return self._run( + python_packager=python_packager, + dataset=dataset, + annotation_schema_uri=annotation_schema_uri, worker_pool_specs=worker_pool_specs, + managed_model=managed_model, + model_id=model_id, + parent_model=parent_model, + is_default_version=is_default_version, + model_version_aliases=model_version_aliases, + model_version_description=model_version_description, + args=args, + environment_variables=environment_variables, base_output_dir=base_output_dir, service_account=service_account, network=network, - timeout=timeout, - restart_job_on_worker_restart=restart_job_on_worker_restart, - enable_web_access=enable_web_access, - enable_dashboard_access=enable_dashboard_access, - tensorboard=tensorboard, - ) - - model = self._run_job( - training_task_definition=schema.training_job.definition.custom_task, - training_task_inputs=training_task_inputs, - dataset=dataset, - annotation_schema_uri=annotation_schema_uri, + bigquery_destination=bigquery_destination, training_fraction_split=training_fraction_split, validation_fraction_split=validation_fraction_split, test_fraction_split=test_fraction_split, @@ -3556,8 +3649,316 @@ def _run( test_filter_split=test_filter_split, predefined_split_column_name=predefined_split_column_name, timestamp_split_column_name=timestamp_split_column_name, - model=managed_model, - model_id=model_id, + timeout=timeout, + restart_job_on_worker_restart=restart_job_on_worker_restart, + enable_web_access=enable_web_access, + enable_dashboard_access=enable_dashboard_access, + tensorboard=tensorboard, + reduction_server_container_uri=reduction_server_container_uri + if reduction_server_replica_count > 0 + else None, + sync=sync, + create_request_timeout=create_request_timeout, + block=False, + ) + + @base.optional_sync(construct_object_on_arg="managed_model") + def _run( + self, + python_packager: source_utils._TrainingScriptPythonPackager, + dataset: Optional[ + Union[ + datasets.ImageDataset, + datasets.TabularDataset, + datasets.TextDataset, + datasets.VideoDataset, + ] + ], + annotation_schema_uri: Optional[str], + worker_pool_specs: worker_spec_utils._DistributedTrainingSpec, + managed_model: Optional[gca_model.Model] = None, + model_id: Optional[str] = None, + parent_model: Optional[str] = None, + is_default_version: Optional[bool] = True, + model_version_aliases: Optional[Sequence[str]] = None, + model_version_description: Optional[str] = None, + args: Optional[List[Union[str, float, int]]] = None, + environment_variables: Optional[Dict[str, str]] = None, + base_output_dir: Optional[str] = None, + service_account: Optional[str] = None, + network: Optional[str] = None, + bigquery_destination: Optional[str] = None, + training_fraction_split: Optional[float] = None, + validation_fraction_split: Optional[float] = None, + test_fraction_split: Optional[float] = None, + training_filter_split: Optional[str] = None, + validation_filter_split: Optional[str] = None, + test_filter_split: Optional[str] = None, + predefined_split_column_name: Optional[str] = None, + timestamp_split_column_name: Optional[str] = None, + timeout: Optional[int] = None, + restart_job_on_worker_restart: bool = False, + enable_web_access: bool = False, + enable_dashboard_access: bool = False, + tensorboard: Optional[str] = None, + reduction_server_container_uri: Optional[str] = None, + sync=True, + create_request_timeout: Optional[float] = None, + block: Optional[bool] = True, + ) -> Optional[models.Model]: + """Packages local script and launches training_job. + + Args: + python_packager (source_utils._TrainingScriptPythonPackager): + Required. Python Packager pointing to training script locally. + dataset ( + Union[ + datasets.ImageDataset, + datasets.TabularDataset, + datasets.TextDataset, + datasets.VideoDataset, + ] + ): + Vertex AI to fit this training against. + annotation_schema_uri (str): + Google Cloud Storage URI points to a YAML file describing + annotation schema. + worker_pools_spec (worker_spec_utils._DistributedTrainingSpec): + Worker pools pecs required to run job. + managed_model (gca_model.Model): + Model proto if this script produces a Managed Model. + model_id (str): + Optional. The ID to use for the Model produced by this job, + which will become the final component of the model resource name. + This value may be up to 63 characters, and valid characters + are `[a-z0-9_-]`. The first character cannot be a number or hyphen. + parent_model (str): + Optional. The resource name or model ID of an existing model. + The new model uploaded by this job will be a version of `parent_model`. + + Only set this field when training a new version of an existing model. + is_default_version (bool): + Optional. When set to True, the newly uploaded model version will + automatically have alias "default" included. Subsequent uses of + the model produced by this job without a version specified will + use this "default" version. + + When set to False, the "default" alias will not be moved. + Actions targeting the model version produced by this job will need + to specifically reference this version by ID or alias. + + New model uploads, i.e. version 1, will always be "default" aliased. + model_version_aliases (Sequence[str]): + Optional. User provided version aliases so that the model version + uploaded by this job can be referenced via alias instead of + auto-generated version ID. A default version alias will be created + for the first version of the model. + + The format is [a-z][a-zA-Z0-9-]{0,126}[a-z0-9] + model_version_description (str): + Optional. The description of the model version being uploaded by this job. + args (List[Unions[str, int, float]]): + Command line arguments to be passed to the Python script. + environment_variables (Dict[str, str]): + Environment variables to be passed to the container. + Should be a dictionary where keys are environment variable names + and values are environment variable values for those names. + At most 10 environment variables can be specified. + The Name of the environment variable must be unique. + + environment_variables = { + 'MY_KEY': 'MY_VALUE' + } + base_output_dir (str): + GCS output directory of job. If not provided a + timestamped directory in the staging directory will be used. + + Vertex AI sets the following environment variables when it runs your training code: + + - AIP_MODEL_DIR: a Cloud Storage URI of a directory intended for saving model artifacts, i.e. /model/ + - AIP_CHECKPOINT_DIR: a Cloud Storage URI of a directory intended for saving checkpoints, i.e. /checkpoints/ + - AIP_TENSORBOARD_LOG_DIR: a Cloud Storage URI of a directory intended for saving TensorBoard logs, i.e. /logs/ + + service_account (str): + Specifies the service account for workload run-as account. + Users submitting jobs must have act-as permission on this run-as account. + network (str): + The full name of the Compute Engine network to which the job + should be peered. For example, projects/12345/global/networks/myVPC. + Private services access must already be configured for the network. + If left unspecified, the job is not peered with any network. + bigquery_destination (str): + Provide this field if `dataset` is a BigQuery dataset. + The BigQuery project location where the training data is to + be written to. In the given project a new dataset is created + with name + ``dataset___`` + where timestamp is in YYYY_MM_DDThh_mm_ss_sssZ format. All + training input data will be written into that dataset. In + the dataset three tables will be created, ``training``, + ``validation`` and ``test``. + + - AIP_DATA_FORMAT = "bigquery". + - AIP_TRAINING_DATA_URI ="bigquery_destination.dataset_*.training" + - AIP_VALIDATION_DATA_URI = "bigquery_destination.dataset_*.validation" + - AIP_TEST_DATA_URI = "bigquery_destination.dataset_*.test" + training_fraction_split (float): + Optional. The fraction of the input data that is to be used to train + the Model. This is ignored if Dataset is not provided. + validation_fraction_split (float): + Optional. The fraction of the input data that is to be used to validate + the Model. This is ignored if Dataset is not provided. + test_fraction_split (float): + Optional. The fraction of the input data that is to be used to evaluate + the Model. This is ignored if Dataset is not provided. + training_filter_split (str): + Optional. A filter on DataItems of the Dataset. DataItems that match + this filter are used to train the Model. A filter with same syntax + as the one used in DatasetService.ListDataItems may be used. If a + single DataItem is matched by more than one of the FilterSplit filters, + then it is assigned to the first set that applies to it in the training, + validation, test order. This is ignored if Dataset is not provided. + validation_filter_split (str): + Optional. A filter on DataItems of the Dataset. DataItems that match + this filter are used to validate the Model. A filter with same syntax + as the one used in DatasetService.ListDataItems may be used. If a + single DataItem is matched by more than one of the FilterSplit filters, + then it is assigned to the first set that applies to it in the training, + validation, test order. This is ignored if Dataset is not provided. + test_filter_split (str): + Optional. A filter on DataItems of the Dataset. DataItems that match + this filter are used to test the Model. A filter with same syntax + as the one used in DatasetService.ListDataItems may be used. If a + single DataItem is matched by more than one of the FilterSplit filters, + then it is assigned to the first set that applies to it in the training, + validation, test order. This is ignored if Dataset is not provided. + predefined_split_column_name (str): + Optional. The key is a name of one of the Dataset's data + columns. The value of the key (either the label's value or + value in the column) must be one of {``training``, + ``validation``, ``test``}, and it defines to which set the + given piece of data is assigned. If for a piece of data the + key is not present or has an invalid value, that piece is + ignored by the pipeline. + + Supported only for tabular and time series Datasets. + timestamp_split_column_name (str): + Optional. The key is a name of one of the Dataset's data + columns. The value of the key values of the key (the values in + the column) must be in RFC 3339 `date-time` format, where + `time-offset` = `"Z"` (e.g. 1985-04-12T23:20:50.52Z). If for a + piece of data the key is not present or has an invalid value, + that piece is ignored by the pipeline. + + Supported only for tabular and time series Datasets. + timeout (int): + The maximum job running time in seconds. The default is 7 days. + restart_job_on_worker_restart (bool): + Restarts the entire CustomJob if a worker + gets restarted. This feature can be used by + distributed training jobs that are not resilient + to workers leaving and joining a job. + enable_web_access (bool): + Whether you want Vertex AI to enable interactive shell access + to training containers. + https://cloud.google.com/vertex-ai/docs/training/monitor-debug-interactive-shell + enable_dashboard_access (bool): + Whether you want Vertex AI to enable access to the customized dashboard + to training containers. + tensorboard (str): + Optional. The name of a Vertex AI + [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] + resource to which this CustomJob will upload Tensorboard + logs. Format: + ``projects/{project}/locations/{location}/tensorboards/{tensorboard}`` + + The training script should write Tensorboard to following Vertex AI environment + variable: + + AIP_TENSORBOARD_LOG_DIR + + `service_account` is required with provided `tensorboard`. + For more information on configuring your service account please visit: + https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training + reduction_server_container_uri (str): + Optional. The Uri of the reduction server container image. + sync (bool): + Whether to execute this method synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. + create_request_timeout (float) + Optional. The timeout for the create request in seconds + block (bool): + Optional. If True, block until complete. + + Returns: + model: The trained Vertex AI Model resource or None if training did not + produce a Vertex AI Model. + """ + package_gcs_uri = python_packager.package_and_copy_to_gcs( + gcs_staging_dir=self._staging_bucket, + project=self.project, + credentials=self.credentials, + ) + + for spec_order, spec in enumerate(worker_pool_specs): + + if not spec: + continue + + if ( + spec_order == worker_spec_utils._SPEC_ORDERS["server_spec"] + and reduction_server_container_uri + ): + spec["container_spec"] = { + "image_uri": reduction_server_container_uri, + } + else: + spec["python_package_spec"] = { + "executor_image_uri": self._container_uri, + "python_module": python_packager.module_name, + "package_uris": [package_gcs_uri], + } + + if args: + spec["python_package_spec"]["args"] = args + + if environment_variables: + spec["python_package_spec"]["env"] = [ + {"name": key, "value": value} + for key, value in environment_variables.items() + ] + + ( + training_task_inputs, + base_output_dir, + ) = self._prepare_training_task_inputs_and_output_dir( + worker_pool_specs=worker_pool_specs, + base_output_dir=base_output_dir, + service_account=service_account, + network=network, + timeout=timeout, + restart_job_on_worker_restart=restart_job_on_worker_restart, + enable_web_access=enable_web_access, + enable_dashboard_access=enable_dashboard_access, + tensorboard=tensorboard, + ) + + model = self._run_job( + training_task_definition=schema.training_job.definition.custom_task, + training_task_inputs=training_task_inputs, + dataset=dataset, + annotation_schema_uri=annotation_schema_uri, + training_fraction_split=training_fraction_split, + validation_fraction_split=validation_fraction_split, + test_fraction_split=test_fraction_split, + training_filter_split=training_filter_split, + validation_filter_split=validation_filter_split, + test_filter_split=test_filter_split, + predefined_split_column_name=predefined_split_column_name, + timestamp_split_column_name=timestamp_split_column_name, + model=managed_model, + model_id=model_id, parent_model=parent_model, is_default_version=is_default_version, model_version_aliases=model_version_aliases, @@ -3565,6 +3966,7 @@ def _run( gcs_destination_uri_prefix=base_output_dir, bigquery_destination=bigquery_destination, create_request_timeout=create_request_timeout, + block=block, ) return model @@ -3763,57 +4165,442 @@ def __init__( The key needs to be in the same region as where the compute resource is created. - If set, this TrainingPipeline will be secured by this key. + If set, this TrainingPipeline will be secured by this key. + + Note: Model trained by this TrainingPipeline is also secured + by this key if ``model_to_upload`` is not set separately. + + Overrides encryption_spec_key_name set in aiplatform.init. + model_encryption_spec_key_name (Optional[str]): + Optional. The Cloud KMS resource identifier of the customer + managed encryption key used to protect the model. Has the + form: + ``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``. + The key needs to be in the same region as where the compute + resource is created. + + If set, the trained Model will be secured by this key. + + Overrides encryption_spec_key_name set in aiplatform.init. + staging_bucket (str): + Bucket used to stage source and training artifacts. Overrides + staging_bucket set in aiplatform.init. + """ + if not display_name: + display_name = self.__class__._generate_display_name() + super().__init__( + display_name=display_name, + project=project, + location=location, + credentials=credentials, + labels=labels, + training_encryption_spec_key_name=training_encryption_spec_key_name, + model_encryption_spec_key_name=model_encryption_spec_key_name, + container_uri=container_uri, + model_instance_schema_uri=model_instance_schema_uri, + model_parameters_schema_uri=model_parameters_schema_uri, + model_prediction_schema_uri=model_prediction_schema_uri, + model_serving_container_environment_variables=model_serving_container_environment_variables, + model_serving_container_ports=model_serving_container_ports, + model_serving_container_image_uri=model_serving_container_image_uri, + model_serving_container_command=model_serving_container_command, + model_serving_container_args=model_serving_container_args, + model_serving_container_predict_route=model_serving_container_predict_route, + model_serving_container_health_route=model_serving_container_health_route, + model_description=model_description, + explanation_metadata=explanation_metadata, + explanation_parameters=explanation_parameters, + staging_bucket=staging_bucket, + ) + + self._command = command + + def run( + self, + dataset: Optional[ + Union[ + datasets.ImageDataset, + datasets.TabularDataset, + datasets.TextDataset, + datasets.VideoDataset, + ] + ] = None, + annotation_schema_uri: Optional[str] = None, + model_display_name: Optional[str] = None, + model_labels: Optional[Dict[str, str]] = None, + model_id: Optional[str] = None, + parent_model: Optional[str] = None, + is_default_version: Optional[bool] = True, + model_version_aliases: Optional[Sequence[str]] = None, + model_version_description: Optional[str] = None, + base_output_dir: Optional[str] = None, + service_account: Optional[str] = None, + network: Optional[str] = None, + bigquery_destination: Optional[str] = None, + args: Optional[List[Union[str, float, int]]] = None, + environment_variables: Optional[Dict[str, str]] = None, + replica_count: int = 1, + machine_type: str = "n1-standard-4", + accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED", + accelerator_count: int = 0, + boot_disk_type: str = "pd-ssd", + boot_disk_size_gb: int = 100, + reduction_server_replica_count: int = 0, + reduction_server_machine_type: Optional[str] = None, + reduction_server_container_uri: Optional[str] = None, + training_fraction_split: Optional[float] = None, + validation_fraction_split: Optional[float] = None, + test_fraction_split: Optional[float] = None, + training_filter_split: Optional[str] = None, + validation_filter_split: Optional[str] = None, + test_filter_split: Optional[str] = None, + predefined_split_column_name: Optional[str] = None, + timestamp_split_column_name: Optional[str] = None, + timeout: Optional[int] = None, + restart_job_on_worker_restart: bool = False, + enable_web_access: bool = False, + enable_dashboard_access: bool = False, + tensorboard: Optional[str] = None, + sync=True, + create_request_timeout: Optional[float] = None, + ) -> Optional[models.Model]: + """Runs the custom training job. + + Distributed Training Support: + If replica count = 1 then one chief replica will be provisioned. If + replica_count > 1 the remainder will be provisioned as a worker replica pool. + ie: replica_count = 10 will result in 1 chief and 9 workers + All replicas have same machine_type, accelerator_type, and accelerator_count + + If training on a Vertex AI dataset, you can use one of the following split configurations: + Data fraction splits: + Any of ``training_fraction_split``, ``validation_fraction_split`` and + ``test_fraction_split`` may optionally be provided, they must sum to up to 1. If + the provided ones sum to less than 1, the remainder is assigned to sets as + decided by Vertex AI. If none of the fractions are set, by default roughly 80% + of data will be used for training, 10% for validation, and 10% for test. + + Data filter splits: + Assigns input data to training, validation, and test sets + based on the given filters, data pieces not matched by any + filter are ignored. Currently only supported for Datasets + containing DataItems. + If any of the filters in this message are to match nothing, then + they can be set as '-' (the minus sign). + If using filter splits, all of ``training_filter_split``, ``validation_filter_split`` and + ``test_filter_split`` must be provided. + Supported only for unstructured Datasets. + + Predefined splits: + Assigns input data to training, validation, and test sets based on the value of a provided key. + If using predefined splits, ``predefined_split_column_name`` must be provided. + Supported only for tabular Datasets. + + Timestamp splits: + Assigns input data to training, validation, and test sets + based on a provided timestamps. The youngest data pieces are + assigned to training set, next to validation set, and the oldest + to the test set. + Supported only for tabular Datasets. + + Args: + dataset (Union[datasets.ImageDataset,datasets.TabularDataset,datasets.TextDataset,datasets.VideoDataset]): + Vertex AI to fit this training against. Custom training script should + retrieve datasets through passed in environment variables uris: + + os.environ["AIP_TRAINING_DATA_URI"] + os.environ["AIP_VALIDATION_DATA_URI"] + os.environ["AIP_TEST_DATA_URI"] + + Additionally the dataset format is passed in as: + + os.environ["AIP_DATA_FORMAT"] + annotation_schema_uri (str): + Google Cloud Storage URI points to a YAML file describing + annotation schema. The schema is defined as an OpenAPI 3.0.2 + [Schema Object](https://github.com/OAI/OpenAPI-Specification/blob/main/versions/3.0.2.md#schema-object) The schema files + that can be used here are found in + gs://google-cloud-aiplatform/schema/dataset/annotation/, + note that the chosen schema must be consistent with + ``metadata`` + of the Dataset specified by + ``dataset_id``. + + Only Annotations that both match this schema and belong to + DataItems not ignored by the split method are used in + respectively training, validation or test role, depending on + the role of the DataItem they are on. + + When used in conjunction with + ``annotations_filter``, + the Annotations used for training are filtered by both + ``annotations_filter`` + and + ``annotation_schema_uri``. + model_display_name (str): + If the script produces a managed Vertex AI Model. The display name of + the Model. The name can be up to 128 characters long and can be consist + of any UTF-8 characters. + + If not provided upon creation, the job's display_name is used. + model_labels (Dict[str, str]): + Optional. The labels with user-defined metadata to + organize your Models. + Label keys and values can be no longer than 64 + characters (Unicode codepoints), can only + contain lowercase letters, numeric characters, + underscores and dashes. International characters + are allowed. + See https://goo.gl/xmQnxf for more information + and examples of labels. + model_id (str): + Optional. The ID to use for the Model produced by this job, + which will become the final component of the model resource name. + This value may be up to 63 characters, and valid characters + are `[a-z0-9_-]`. The first character cannot be a number or hyphen. + parent_model (str): + Optional. The resource name or model ID of an existing model. + The new model uploaded by this job will be a version of `parent_model`. + + Only set this field when training a new version of an existing model. + is_default_version (bool): + Optional. When set to True, the newly uploaded model version will + automatically have alias "default" included. Subsequent uses of + the model produced by this job without a version specified will + use this "default" version. + + When set to False, the "default" alias will not be moved. + Actions targeting the model version produced by this job will need + to specifically reference this version by ID or alias. + + New model uploads, i.e. version 1, will always be "default" aliased. + model_version_aliases (Sequence[str]): + Optional. User provided version aliases so that the model version + uploaded by this job can be referenced via alias instead of + auto-generated version ID. A default version alias will be created + for the first version of the model. + + The format is [a-z][a-zA-Z0-9-]{0,126}[a-z0-9] + model_version_description (str): + Optional. The description of the model version being uploaded by this job. + base_output_dir (str): + GCS output directory of job. If not provided a + timestamped directory in the staging directory will be used. - Note: Model trained by this TrainingPipeline is also secured - by this key if ``model_to_upload`` is not set separately. + Vertex AI sets the following environment variables when it runs your training code: - Overrides encryption_spec_key_name set in aiplatform.init. - model_encryption_spec_key_name (Optional[str]): - Optional. The Cloud KMS resource identifier of the customer - managed encryption key used to protect the model. Has the - form: - ``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``. - The key needs to be in the same region as where the compute - resource is created. + - AIP_MODEL_DIR: a Cloud Storage URI of a directory intended for saving model artifacts, i.e. /model/ + - AIP_CHECKPOINT_DIR: a Cloud Storage URI of a directory intended for saving checkpoints, i.e. /checkpoints/ + - AIP_TENSORBOARD_LOG_DIR: a Cloud Storage URI of a directory intended for saving TensorBoard logs, i.e. /logs/ - If set, the trained Model will be secured by this key. + service_account (str): + Specifies the service account for workload run-as account. + Users submitting jobs must have act-as permission on this run-as account. + network (str): + The full name of the Compute Engine network to which the job + should be peered. For example, projects/12345/global/networks/myVPC. + Private services access must already be configured for the network. + If left unspecified, the network set in aiplatform.init will be used. + Otherwise, the job is not peered with any network. + bigquery_destination (str): + Provide this field if `dataset` is a BigQuery dataset. + The BigQuery project location where the training data is to + be written to. In the given project a new dataset is created + with name + ``dataset___`` + where timestamp is in YYYY_MM_DDThh_mm_ss_sssZ format. All + training input data will be written into that dataset. In + the dataset three tables will be created, ``training``, + ``validation`` and ``test``. - Overrides encryption_spec_key_name set in aiplatform.init. - staging_bucket (str): - Bucket used to stage source and training artifacts. Overrides - staging_bucket set in aiplatform.init. + - AIP_DATA_FORMAT = "bigquery". + - AIP_TRAINING_DATA_URI ="bigquery_destination.dataset_*.training" + - AIP_VALIDATION_DATA_URI = "bigquery_destination.dataset_*.validation" + - AIP_TEST_DATA_URI = "bigquery_destination.dataset_*.test" + args (List[Unions[str, int, float]]): + Command line arguments to be passed to the Python script. + environment_variables (Dict[str, str]): + Environment variables to be passed to the container. + Should be a dictionary where keys are environment variable names + and values are environment variable values for those names. + At most 10 environment variables can be specified. + The Name of the environment variable must be unique. + + environment_variables = { + 'MY_KEY': 'MY_VALUE' + } + replica_count (int): + The number of worker replicas. If replica count = 1 then one chief + replica will be provisioned. If replica_count > 1 the remainder will be + provisioned as a worker replica pool. + machine_type (str): + The type of machine to use for training. + accelerator_type (str): + Hardware accelerator type. One of ACCELERATOR_TYPE_UNSPECIFIED, + NVIDIA_TESLA_K80, NVIDIA_TESLA_P100, NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, + NVIDIA_TESLA_T4 + accelerator_count (int): + The number of accelerators to attach to a worker replica. + boot_disk_type (str): + Type of the boot disk, default is `pd-ssd`. + Valid values: `pd-ssd` (Persistent Disk Solid State Drive) or + `pd-standard` (Persistent Disk Hard Disk Drive). + boot_disk_size_gb (int): + Size in GB of the boot disk, default is 100GB. + boot disk size must be within the range of [100, 64000]. + reduction_server_replica_count (int): + The number of reduction server replicas, default is 0. + reduction_server_machine_type (str): + Optional. The type of machine to use for reduction server. + reduction_server_container_uri (str): + Optional. The Uri of the reduction server container image. + See details: https://cloud.google.com/vertex-ai/docs/training/distributed-training#reduce_training_time_with_reduction_server + training_fraction_split (float): + Optional. The fraction of the input data that is to be used to train + the Model. This is ignored if Dataset is not provided. + validation_fraction_split (float): + Optional. The fraction of the input data that is to be used to validate + the Model. This is ignored if Dataset is not provided. + test_fraction_split (float): + Optional. The fraction of the input data that is to be used to evaluate + the Model. This is ignored if Dataset is not provided. + training_filter_split (str): + Optional. A filter on DataItems of the Dataset. DataItems that match + this filter are used to train the Model. A filter with same syntax + as the one used in DatasetService.ListDataItems may be used. If a + single DataItem is matched by more than one of the FilterSplit filters, + then it is assigned to the first set that applies to it in the training, + validation, test order. This is ignored if Dataset is not provided. + validation_filter_split (str): + Optional. A filter on DataItems of the Dataset. DataItems that match + this filter are used to validate the Model. A filter with same syntax + as the one used in DatasetService.ListDataItems may be used. If a + single DataItem is matched by more than one of the FilterSplit filters, + then it is assigned to the first set that applies to it in the training, + validation, test order. This is ignored if Dataset is not provided. + test_filter_split (str): + Optional. A filter on DataItems of the Dataset. DataItems that match + this filter are used to test the Model. A filter with same syntax + as the one used in DatasetService.ListDataItems may be used. If a + single DataItem is matched by more than one of the FilterSplit filters, + then it is assigned to the first set that applies to it in the training, + validation, test order. This is ignored if Dataset is not provided. + predefined_split_column_name (str): + Optional. The key is a name of one of the Dataset's data + columns. The value of the key (either the label's value or + value in the column) must be one of {``training``, + ``validation``, ``test``}, and it defines to which set the + given piece of data is assigned. If for a piece of data the + key is not present or has an invalid value, that piece is + ignored by the pipeline. + + Supported only for tabular and time series Datasets. + timestamp_split_column_name (str): + Optional. The key is a name of one of the Dataset's data + columns. The value of the key values of the key (the values in + the column) must be in RFC 3339 `date-time` format, where + `time-offset` = `"Z"` (e.g. 1985-04-12T23:20:50.52Z). If for a + piece of data the key is not present or has an invalid value, + that piece is ignored by the pipeline. + + Supported only for tabular and time series Datasets. + timeout (int): + The maximum job running time in seconds. The default is 7 days. + restart_job_on_worker_restart (bool): + Restarts the entire CustomJob if a worker + gets restarted. This feature can be used by + distributed training jobs that are not resilient + to workers leaving and joining a job. + enable_web_access (bool): + Whether you want Vertex AI to enable interactive shell access + to training containers. + https://cloud.google.com/vertex-ai/docs/training/monitor-debug-interactive-shell + enable_dashboard_access (bool): + Whether you want Vertex AI to enable access to the customized dashboard + to training containers. + tensorboard (str): + Optional. The name of a Vertex AI + [Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard] + resource to which this CustomJob will upload Tensorboard + logs. Format: + ``projects/{project}/locations/{location}/tensorboards/{tensorboard}`` + + The training script should write Tensorboard to following Vertex AI environment + variable: + + AIP_TENSORBOARD_LOG_DIR + + `service_account` is required with provided `tensorboard`. + For more information on configuring your service account please visit: + https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training + sync (bool): + Whether to execute this method synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. + create_request_timeout (float): + Optional. The timeout for the create request in seconds. + + Returns: + model: The trained Vertex AI Model resource or None if training did not + produce a Vertex AI Model. + + Raises: + RuntimeError: If Training job has already been run, staging_bucket has not + been set, or model_display_name was provided but required arguments + were not provided in constructor. """ - if not display_name: - display_name = self.__class__._generate_display_name() - super().__init__( - display_name=display_name, - project=project, - location=location, - credentials=credentials, - labels=labels, - training_encryption_spec_key_name=training_encryption_spec_key_name, - model_encryption_spec_key_name=model_encryption_spec_key_name, - container_uri=container_uri, - model_instance_schema_uri=model_instance_schema_uri, - model_parameters_schema_uri=model_parameters_schema_uri, - model_prediction_schema_uri=model_prediction_schema_uri, - model_serving_container_environment_variables=model_serving_container_environment_variables, - model_serving_container_ports=model_serving_container_ports, - model_serving_container_image_uri=model_serving_container_image_uri, - model_serving_container_command=model_serving_container_command, - model_serving_container_args=model_serving_container_args, - model_serving_container_predict_route=model_serving_container_predict_route, - model_serving_container_health_route=model_serving_container_health_route, - model_description=model_description, - explanation_metadata=explanation_metadata, - explanation_parameters=explanation_parameters, - staging_bucket=staging_bucket, + network = network or initializer.global_config.network + + worker_pool_specs, managed_model = self._prepare_and_validate_run( + model_display_name=model_display_name, + model_labels=model_labels, + replica_count=replica_count, + machine_type=machine_type, + accelerator_count=accelerator_count, + accelerator_type=accelerator_type, + boot_disk_type=boot_disk_type, + boot_disk_size_gb=boot_disk_size_gb, + reduction_server_replica_count=reduction_server_replica_count, + reduction_server_machine_type=reduction_server_machine_type, ) - self._command = command + return self._run( + dataset=dataset, + annotation_schema_uri=annotation_schema_uri, + worker_pool_specs=worker_pool_specs, + managed_model=managed_model, + model_id=model_id, + parent_model=parent_model, + is_default_version=is_default_version, + model_version_aliases=model_version_aliases, + model_version_description=model_version_description, + args=args, + environment_variables=environment_variables, + base_output_dir=base_output_dir, + service_account=service_account, + network=network, + bigquery_destination=bigquery_destination, + training_fraction_split=training_fraction_split, + validation_fraction_split=validation_fraction_split, + test_fraction_split=test_fraction_split, + training_filter_split=training_filter_split, + validation_filter_split=validation_filter_split, + test_filter_split=test_filter_split, + predefined_split_column_name=predefined_split_column_name, + timestamp_split_column_name=timestamp_split_column_name, + timeout=timeout, + restart_job_on_worker_restart=restart_job_on_worker_restart, + enable_web_access=enable_web_access, + enable_dashboard_access=enable_dashboard_access, + tensorboard=tensorboard, + reduction_server_container_uri=reduction_server_container_uri + if reduction_server_replica_count > 0 + else None, + sync=sync, + create_request_timeout=create_request_timeout, + ) - def run( + def submit( self, dataset: Optional[ Union[ @@ -3862,7 +4649,7 @@ def run( sync=True, create_request_timeout: Optional[float] = None, ) -> Optional[models.Model]: - """Runs the custom training job. + """Submits the custom training job without blocking until completion. Distributed Training Support: If replica count = 1 then one chief replica will be provisioned. If @@ -4196,6 +4983,7 @@ def run( else None, sync=sync, create_request_timeout=create_request_timeout, + block=False, ) @base.optional_sync(construct_object_on_arg="managed_model") @@ -4239,6 +5027,7 @@ def _run( reduction_server_container_uri: Optional[str] = None, sync=True, create_request_timeout: Optional[float] = None, + block: Optional[bool] = True, ) -> Optional[models.Model]: """Packages local script and launches training_job. Args: @@ -4418,6 +5207,8 @@ def _run( be immediately returned and synced when the Future has completed. create_request_timeout (float): Optional. The timeout for the create request in seconds. + block (bool): + Optional. If True, block until complete. Returns: model: The trained Vertex AI Model resource or None if training did not @@ -4488,6 +5279,7 @@ def _run( gcs_destination_uri_prefix=base_output_dir, bigquery_destination=bigquery_destination, create_request_timeout=create_request_timeout, + block=block, ) return model