Skip to content

Commit

Permalink
feat: Support global network parameter.
Browse files Browse the repository at this point in the history
COPYBARA_INTEGRATE_REVIEW=#1702 from nayaknishant:nn-network 32d0972
PiperOrigin-RevId: 485422918
  • Loading branch information
nayaknishant authored and copybara-github committed Nov 1, 2022
1 parent 98dbe5c commit c7f57ad
Show file tree
Hide file tree
Showing 8 changed files with 338 additions and 40 deletions.
17 changes: 16 additions & 1 deletion google/cloud/aiplatform/initializer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-

# Copyright 2020 Google LLC
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,6 +51,7 @@ def __init__(self):
self._staging_bucket = None
self._credentials = None
self._encryption_spec_key_name = None
self._network = None

def init(
self,
Expand All @@ -65,6 +66,7 @@ def init(
staging_bucket: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
encryption_spec_key_name: Optional[str] = None,
network: Optional[str] = None,
):
"""Updates common initialization parameters with provided options.
Expand Down Expand Up @@ -95,6 +97,12 @@ def init(
resource is created.
If set, this resource and all sub-resources will be secured by this key.
network (str):
Optional. The full name of the Compute Engine network to which jobs
and resources should be peered. E.g. "projects/12345/global/networks/myVPC".
Private services access must already be configured for the network.
If specified, all eligible jobs and resources created will be peered
with this VPC.
Raises:
ValueError:
If experiment_description is provided but experiment is not.
Expand Down Expand Up @@ -130,6 +138,8 @@ def init(
self._credentials = credentials
if encryption_spec_key_name:
self._encryption_spec_key_name = encryption_spec_key_name
if network is not None:
self._network = network

if experiment:
metadata._experiment_tracker.set_experiment(
Expand Down Expand Up @@ -237,6 +247,11 @@ def encryption_spec_key_name(self) -> Optional[str]:
"""Default encryption spec key name, if provided."""
return self._encryption_spec_key_name

@property
def network(self) -> Optional[str]:
"""Default Compute Engine network to peer to, if provided."""
return self._network

@property
def experiment_name(self) -> Optional[str]:
"""Default experiment name, if provided."""
Expand Down
140 changes: 136 additions & 4 deletions google/cloud/aiplatform/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1515,7 +1515,6 @@ def from_local_script(
staging_bucket=staging_bucket,
)

@base.optional_sync()
def run(
self,
service_account: Optional[str] = None,
Expand All @@ -1537,7 +1536,8 @@ def run(
Optional. The full name of the Compute Engine network to which the job
should be peered. For example, projects/12345/global/networks/myVPC.
Private services access must already be configured for the network.
If left unspecified, the job is not peered with any network.
If left unspecified, the network set in aiplatform.init will be used.
Otherwise, the job is not peered with any network.
timeout (int):
The maximum job running time in seconds. The default is 7 days.
restart_job_on_worker_restart (bool):
Expand Down Expand Up @@ -1570,7 +1570,73 @@ def run(
create_request_timeout (float):
Optional. The timeout for the create request in seconds.
"""
network = network or initializer.global_config.network

self._run(
service_account=service_account,
network=network,
timeout=timeout,
restart_job_on_worker_restart=restart_job_on_worker_restart,
enable_web_access=enable_web_access,
tensorboard=tensorboard,
sync=sync,
create_request_timeout=create_request_timeout,
)

@base.optional_sync()
def _run(
self,
service_account: Optional[str] = None,
network: Optional[str] = None,
timeout: Optional[int] = None,
restart_job_on_worker_restart: bool = False,
enable_web_access: bool = False,
tensorboard: Optional[str] = None,
sync: bool = True,
create_request_timeout: Optional[float] = None,
) -> None:
"""Helper method to ensure network synchronization and to run the configured CustomJob.
Args:
service_account (str):
Optional. Specifies the service account for workload run-as account.
Users submitting jobs must have act-as permission on this run-as account.
network (str):
Optional. The full name of the Compute Engine network to which the job
should be peered. For example, projects/12345/global/networks/myVPC.
Private services access must already be configured for the network.
timeout (int):
The maximum job running time in seconds. The default is 7 days.
restart_job_on_worker_restart (bool):
Restarts the entire CustomJob if a worker
gets restarted. This feature can be used by
distributed training jobs that are not resilient
to workers leaving and joining a job.
enable_web_access (bool):
Whether you want Vertex AI to enable interactive shell access
to training containers.
https://cloud.google.com/vertex-ai/docs/training/monitor-debug-interactive-shell
tensorboard (str):
Optional. The name of a Vertex AI
[Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard]
resource to which this CustomJob will upload Tensorboard
logs. Format:
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
The training script should write Tensorboard to following Vertex AI environment
variable:
AIP_TENSORBOARD_LOG_DIR
`service_account` is required with provided `tensorboard`.
For more information on configuring your service account please visit:
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
sync (bool):
Whether to execute this method synchronously. If False, this method
will unblock and it will be executed in a concurrent Future.
create_request_timeout (float):
Optional. The timeout for the create request in seconds.
"""
if service_account:
self._gca_resource.job_spec.service_account = service_account

Expand Down Expand Up @@ -1907,7 +1973,6 @@ def _log_web_access_uris(self):
)
self._logged_web_access_uris.add(uri)

@base.optional_sync()
def run(
self,
service_account: Optional[str] = None,
Expand All @@ -1929,7 +1994,8 @@ def run(
Optional. The full name of the Compute Engine network to which the job
should be peered. For example, projects/12345/global/networks/myVPC.
Private services access must already be configured for the network.
If left unspecified, the job is not peered with any network.
If left unspecified, the network set in aiplatform.init will be used.
Otherwise, the job is not peered with any network.
timeout (int):
Optional. The maximum job running time in seconds. The default is 7 days.
restart_job_on_worker_restart (bool):
Expand Down Expand Up @@ -1962,7 +2028,73 @@ def run(
create_request_timeout (float):
Optional. The timeout for the create request in seconds.
"""
network = network or initializer.global_config.network

self._run(
service_account=service_account,
network=network,
timeout=timeout,
restart_job_on_worker_restart=restart_job_on_worker_restart,
enable_web_access=enable_web_access,
tensorboard=tensorboard,
sync=sync,
create_request_timeout=create_request_timeout,
)

@base.optional_sync()
def _run(
self,
service_account: Optional[str] = None,
network: Optional[str] = None,
timeout: Optional[int] = None, # seconds
restart_job_on_worker_restart: bool = False,
enable_web_access: bool = False,
tensorboard: Optional[str] = None,
sync: bool = True,
create_request_timeout: Optional[float] = None,
) -> None:
"""Helper method to ensure network synchronization and to run the configured CustomJob.
Args:
service_account (str):
Optional. Specifies the service account for workload run-as account.
Users submitting jobs must have act-as permission on this run-as account.
network (str):
Optional. The full name of the Compute Engine network to which the job
should be peered. For example, projects/12345/global/networks/myVPC.
Private services access must already be configured for the network.
timeout (int):
Optional. The maximum job running time in seconds. The default is 7 days.
restart_job_on_worker_restart (bool):
Restarts the entire CustomJob if a worker
gets restarted. This feature can be used by
distributed training jobs that are not resilient
to workers leaving and joining a job.
enable_web_access (bool):
Whether you want Vertex AI to enable interactive shell access
to training containers.
https://cloud.google.com/vertex-ai/docs/training/monitor-debug-interactive-shell
tensorboard (str):
Optional. The name of a Vertex AI
[Tensorboard][google.cloud.aiplatform.v1beta1.Tensorboard]
resource to which this CustomJob will upload Tensorboard
logs. Format:
``projects/{project}/locations/{location}/tensorboards/{tensorboard}``
The training script should write Tensorboard to following Vertex AI environment
variable:
AIP_TENSORBOARD_LOG_DIR
`service_account` is required with provided `tensorboard`.
For more information on configuring your service account please visit:
https://cloud.google.com/vertex-ai/docs/experiments/tensorboard-training
sync (bool):
Whether to execute this method synchronously. If False, this method
will unblock and it will be executed in a concurrent Future.
create_request_timeout (float):
Optional. The timeout for the create request in seconds.
"""
if service_account:
self._gca_resource.trial_job_spec.service_account = service_account

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,10 @@ def __init__(
self._gca_resource = self._get_gca_resource(resource_name=index_endpoint_name)

@classmethod
@base.optional_sync()
def create(
cls,
display_name: str,
network: str,
network: Optional[str] = None,
description: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
project: Optional[str] = None,
Expand All @@ -153,13 +152,12 @@ def create(
The name can be up to 128 characters long and
can be consist of any UTF-8 characters.
network (str):
Required. The full name of the Google Compute Engine
Optional. The full name of the Google Compute Engine
`network <https://cloud.google.com/compute/docs/networks-and-firewalls#networks>`__
to which the IndexEndpoint should be peered.
Private services access must already be configured for the
network. If left unspecified, the Endpoint is not peered
with any network.
Private services access must already be configured for the network.
If left unspecified, the network set with aiplatform.init will be used.
`Format <https://cloud.google.com/compute/docs/reference/rest/v1/networks/insert>`__:
projects/{project}/global/networks/{network}. Where
Expand All @@ -182,13 +180,13 @@ def create(
System reserved label keys are prefixed with
"aiplatform.googleapis.com/" and are immutable.
project (str):
Optional. Project to create EntityType in. If not set, project
Optional. Project to create IndexEndpoint in. If not set, project
set in aiplatform.init will be used.
location (str):
Optional. Location to create EntityType in. If not set, location
Optional. Location to create IndexEndpoint in. If not set, location
set in aiplatform.init will be used.
credentials (auth_credentials.Credentials):
Optional. Custom credentials to use to create EntityTypes. Overrides
Optional. Custom credentials to use to create IndexEndpoints. Overrides
credentials set in aiplatform.init.
request_metadata (Sequence[Tuple[str, str]]):
Optional. Strings which should be sent along with the request as metadata.
Expand All @@ -200,11 +198,98 @@ def create(
Returns:
MatchingEngineIndexEndpoint - IndexEndpoint resource object
Raises:
ValueError: A network must be instantiated when creating a IndexEndpoint.
"""
gapic_index_endpoint = gca_matching_engine_index_endpoint.IndexEndpoint(
network = network or initializer.global_config.network

if not network:
raise ValueError(
"Please provide `network` argument or set network"
"using aiplatform.init(network=...)"
)

return cls._create(
display_name=display_name,
description=description,
network=network,
description=description,
labels=labels,
project=project,
location=location,
credentials=credentials,
request_metadata=request_metadata,
sync=sync,
)

@classmethod
@base.optional_sync()
def _create(
cls,
display_name: str,
network: Optional[str] = None,
description: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
project: Optional[str] = None,
location: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
request_metadata: Optional[Sequence[Tuple[str, str]]] = (),
sync: bool = True,
) -> "MatchingEngineIndexEndpoint":
"""Helper method to ensure network synchronization and to
create a MatchingEngineIndexEndpoint resource.
Args:
display_name (str):
Required. The display name of the IndexEndpoint.
The name can be up to 128 characters long and
can be consist of any UTF-8 characters.
network (str):
Optional. The full name of the Google Compute Engine
`network <https://cloud.google.com/compute/docs/networks-and-firewalls#networks>`__
to which the IndexEndpoint should be peered.
Private services access must already be configured for the network.
`Format <https://cloud.google.com/compute/docs/reference/rest/v1/networks/insert>`__:
projects/{project}/global/networks/{network}. Where
{project} is a project number, as in '12345', and {network}
is network name.
description (str):
Optional. The description of the IndexEndpoint.
labels (Dict[str, str]):
Optional. The labels with user-defined
metadata to organize your IndexEndpoint.
Label keys and values can be no longer than 64
characters (Unicode codepoints), can only
contain lowercase letters, numeric characters,
underscores and dashes. International characters
are allowed.
See https://goo.gl/xmQnxf for more information
on and examples of labels. No more than 64 user
labels can be associated with one
IndexEndpoint (System labels are excluded)."
System reserved label keys are prefixed with
"aiplatform.googleapis.com/" and are immutable.
project (str):
Optional. Project to create IndexEndpoint in. If not set, project
set in aiplatform.init will be used.
location (str):
Optional. Location to create IndexEndpoint in. If not set, location
set in aiplatform.init will be used.
credentials (auth_credentials.Credentials):
Optional. Custom credentials to use to create IndexEndpoints. Overrides
credentials set in aiplatform.init.
request_metadata (Sequence[Tuple[str, str]]):
Optional. Strings which should be sent along with the request as metadata.
sync (bool):
Optional. Whether to execute this creation synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
Returns:
MatchingEngineIndexEndpoint - IndexEndpoint resource object
"""
gapic_index_endpoint = gca_matching_engine_index_endpoint.IndexEndpoint(
display_name=display_name, description=description, network=network
)

if labels:
Expand Down
Loading

0 comments on commit c7f57ad

Please sign in to comment.