From 425a32fccab2147101bec0031ca139bb311b7cff Mon Sep 17 00:00:00 2001 From: Vinny Senthil Date: Tue, 28 Jun 2022 15:23:17 -0700 Subject: [PATCH] feat: Add PrivateEndpoint class and HTTP methods (#1033) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add `network` to SDK initializer * Add urllib3 to library requirements * Fix network init() docstrings * Update Model/Endpoint docs to use top namespace * Add core Private Endpoint wrapper * Drop logs, add URI props * Lint, update typing for Prediction class * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Add bug refs, test stubs, minor fixes * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * test commit * adding examples, slight changes * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * adding deployed model check: q * adding deployed model check * minor changes, adding error catching for explanations * testing to get other fields * removing edge case * adding print statement for debugging explain * using GET instead of POST for explain * using GET instead of POST for explain * using GET instead of POST for explain * removing explain for now, adding model id * removing explain for now, adding model id * removing explain for now, adding model id * cleaned and added docstrings * adding explain back * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * removing explain until working * cleaning up docstrings * create test done, working on predict * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * added unit tests for PrivateEndpoint * test debugging * fixing unit tests * adding fixes * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * formatting encryption_spec_key_name * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fixing comments * adjusting traffic for private endpoint * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * adjusting traffic for private endpoint * adjusting traffic for private endpoint * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * adding fixes * added delete for private Endpoint * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fixing traffic percentage * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * using network instead of class, moving exceptions * cleaning up docstrings * adding fixes, delete testing * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * adding fixes * adding Raises section to private Endpoint docstrings * added private Endpoint check in init, added to testing * added private Endpoint check in init, added to testing * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * adding try/except inside functions, fixing tests, misc fixes * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * reverting _validate_deploy_args method * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fixed type hint * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * lint issue * lint issue * adding system tests * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * adding system test changes * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * adding shared_state to system test * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * trying outside import * putting imports back inside * added system test changes * added system test changes * adding fixes * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fixing system test and init fixes * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * adding fixes * adding fixes * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * adding fixes * adding fixes * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * added global network arg to associated classes * style formatting * docstring fixes * adding extra info to network arg docstring * adding extra info to network arg docstring * removing changes to noxfile * reverting noxfile * reverting noxfile * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * reverting job files to not include global network * reverting initalizer to not include global network config * removing global network config * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * added fix and test to models * added fix and test to models * simplifying delete method Co-authored-by: Owl Bot Co-authored-by: nayaknishant Co-authored-by: sasha-gitg <44654632+sasha-gitg@users.noreply.github.com> --- google/cloud/aiplatform/__init__.py | 2 + google/cloud/aiplatform/initializer.py | 2 +- google/cloud/aiplatform/models.py | 1186 +++++++++++++---- setup.py | 12 +- tests/system/aiplatform/e2e_base.py | 1 - .../aiplatform/test_private_endpoint.py | 69 + tests/unit/aiplatform/test_endpoints.py | 254 +++- tests/unit/aiplatform/test_models.py | 35 +- 8 files changed, 1329 insertions(+), 232 deletions(-) create mode 100644 tests/system/aiplatform/test_private_endpoint.py diff --git a/google/cloud/aiplatform/__init__.py b/google/cloud/aiplatform/__init__.py index 31f459d3f7..88b450460f 100644 --- a/google/cloud/aiplatform/__init__.py +++ b/google/cloud/aiplatform/__init__.py @@ -44,6 +44,7 @@ ) from google.cloud.aiplatform import metadata from google.cloud.aiplatform.models import Endpoint +from google.cloud.aiplatform.models import PrivateEndpoint from google.cloud.aiplatform.models import Model from google.cloud.aiplatform.model_evaluation import ModelEvaluation from google.cloud.aiplatform.jobs import ( @@ -136,6 +137,7 @@ "Model", "ModelEvaluation", "PipelineJob", + "PrivateEndpoint", "SequenceToSequencePlusForecastingTrainingJob", "TabularDataset", "Tensorboard", diff --git a/google/cloud/aiplatform/initializer.py b/google/cloud/aiplatform/initializer.py index 9f0afd9e70..800bf5b014 100644 --- a/google/cloud/aiplatform/initializer.py +++ b/google/cloud/aiplatform/initializer.py @@ -98,7 +98,7 @@ def init( Raises: ValueError: If experiment_description is provided but experiment is not. - If experiment_tensorboard is provided but expeirment is not. + If experiment_tensorboard is provided but experiment is not. """ if experiment_description and experiment is None: diff --git a/google/cloud/aiplatform/models.py b/google/cloud/aiplatform/models.py index 93243b5678..e6e108d45f 100644 --- a/google/cloud/aiplatform/models.py +++ b/google/cloud/aiplatform/models.py @@ -14,12 +14,13 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import json import pathlib import proto import re import shutil import tempfile -from typing import Dict, List, NamedTuple, Optional, Sequence, Tuple, Union +from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Tuple, Union from google.api_core import operation from google.api_core import exceptions as api_exceptions @@ -52,6 +53,7 @@ _DEFAULT_MACHINE_TYPE = "n1-standard-2" _DEPLOYING_MODEL_TRAFFIC_SPLIT_KEY = "0" +_SUCCESSFUL_HTTP_RESPONSE = 300 _LOGGER = base.Logger(__name__) @@ -81,7 +83,7 @@ class Prediction(NamedTuple): of elements as instances to be explained. Default is None. """ - predictions: Dict[str, List] + predictions: List[Dict[str, Any]] deployed_model_id: str explanations: Optional[Sequence[gca_explanation_compat.Explanation]] = None @@ -217,12 +219,6 @@ def create( Optional. The user-defined name of the Endpoint. The name can be up to 128 characters long and can be consist of any UTF-8 characters. - project (str): - Required. Project to retrieve endpoint from. If not set, project - set in aiplatform.init will be used. - location (str): - Required. Location to retrieve endpoint from. If not set, location - set in aiplatform.init will be used. description (str): Optional. The description of the Endpoint. labels (Dict[str, str]): @@ -238,10 +234,16 @@ def create( metadata (Sequence[Tuple[str, str]]): Optional. Strings which should be sent along with the request as metadata. + project (str): + Required. Project to retrieve endpoint from. If not set, project + set in aiplatform.init will be used. + location (str): + Required. Location to retrieve endpoint from. If not set, location + set in aiplatform.init will be used. credentials (auth_credentials.Credentials): Optional. Custom credentials to use to upload this model. Overrides credentials set in aiplatform.init. - encryption_spec_key_name (Optional[str]): + encryption_spec_key_name (str): Optional. The Cloud KMS resource identifier of the customer managed encryption key used to protect the model. Has the form: @@ -269,8 +271,9 @@ def create( is populated based on a query string argument, such as ``?endpoint_id=12345``. This is the fallback for fields that are not included in either the URI or the body. + Returns: - endpoint (endpoint.Endpoint): + endpoint (aiplatform.Endpoint): Created endpoint. """ @@ -316,6 +319,7 @@ def _create( metadata: Optional[Sequence[Tuple[str, str]]] = (), credentials: Optional[auth_credentials.Credentials] = None, encryption_spec: Optional[gca_encryption_spec.EncryptionSpec] = None, + network: Optional[str] = None, sync=True, create_request_timeout: Optional[float] = None, endpoint_id: Optional[str] = None, @@ -354,12 +358,20 @@ def _create( credentials (auth_credentials.Credentials): Optional. Custom credentials to use to upload this model. Overrides credentials set in aiplatform.init. - encryption_spec (Optional[gca_encryption_spec.EncryptionSpec]): + encryption_spec (gca_encryption_spec.EncryptionSpec): Optional. The Cloud KMS customer managed encryption key used to protect the dataset. The key needs to be in the same region as where the compute resource is created. If set, this Dataset and all sub-resources of this Dataset will be secured by this key. + network (str): + Optional. The full name of the Compute Engine network to which + this Endpoint will be peered. E.g. "projects/12345/global/networks/myVPC". + Private services access must already be configured for the network. + If left unspecified, the job is not peered with any network or + the network set in aiplatform.init will be used. + If set, this will be a PrivateEndpoint. Read more about PrivateEndpoints + [in the documentation](https://cloud.google.com/vertex-ai/docs/predictions/using-private-endpoints) sync (bool): Whether to create this endpoint synchronously. create_request_timeout (float): @@ -375,8 +387,9 @@ def _create( is populated based on a query string argument, such as ``?endpoint_id=12345``. This is the fallback for fields that are not included in either the URI or the body. + Returns: - endpoint (endpoint.Endpoint): + endpoint (aiplatform.Endpoint): Created endpoint. """ @@ -389,6 +402,7 @@ def _create( description=description, labels=labels, encryption_spec=encryption_spec, + network=network, ) operation_future = api_client.create_endpoint( @@ -437,7 +451,7 @@ def _construct_sdk_resource_from_gapic( Overrides credentials set in aiplatform.init. Returns: - Endpoint: + Endpoint (aiplatform.Endpoint): An initialized Endpoint resource. """ endpoint = cls._empty_constructor( @@ -466,6 +480,7 @@ def _allocate_traffic( Required. Current traffic split of deployed models in endpoint. traffic_percentage (int): Required. Desired traffic to new deployed model. + Returns: new_traffic_split (Dict[str, int]): Traffic split to use. @@ -503,6 +518,7 @@ def _unallocate_traffic( Required. Current traffic split of deployed models in endpoint. deployed_model_id (str): Required. Desired traffic to new deployed model. + Returns: new_traffic_split (Dict[str, int]): Traffic split to use. @@ -537,9 +553,11 @@ def _validate_deploy_args( accelerator_type: Optional[str], deployed_model_display_name: Optional[str], traffic_split: Optional[Dict[str, int]], - traffic_percentage: int, - explanation_metadata: Optional[explain.ExplanationMetadata] = None, - explanation_parameters: Optional[explain.ExplanationParameters] = None, + traffic_percentage: Optional[int], + explanation_metadata: Optional[aiplatform.explain.ExplanationMetadata] = None, + explanation_parameters: Optional[ + aiplatform.explain.ExplanationParameters + ] = None, ): """Helper method to validate deploy arguments. @@ -568,7 +586,7 @@ def _validate_deploy_args( Required. The display name of the DeployedModel. If not provided upon creation, the Model's display_name is used. traffic_split (Dict[str, int]): - Required. A map from a DeployedModel's ID to the percentage of + Optional. A map from a DeployedModel's ID to the percentage of this Endpoint's traffic that should be forwarded to that DeployedModel. If a DeployedModel's ID is not listed in this map, then it receives no traffic. The traffic percentage values must add up to 100, or @@ -576,25 +594,24 @@ def _validate_deploy_args( the moment. Key for model being deployed is "0". Should not be provided if traffic_percentage is provided. traffic_percentage (int): - Required. Desired traffic to newly deployed model. Defaults to + Optional. Desired traffic to newly deployed model. Defaults to 0 if there are pre-existing deployed models. Defaults to 100 if there are no pre-existing deployed models. Negative values should not be provided. Traffic of previously deployed models at the endpoint will be scaled down to accommodate new deployed model's traffic. Should not be provided if traffic_split is provided. - explanation_metadata (explain.ExplanationMetadata): + explanation_metadata (aiplatform.explain.ExplanationMetadata): Optional. Metadata describing the Model's input and output for explanation. Both `explanation_metadata` and `explanation_parameters` must be passed together when used. For more details, see `Ref docs ` - explanation_parameters (explain.ExplanationParameters): + explanation_parameters (aiplatform.explain.ExplanationParameters): Optional. Parameters to configure explaining for Model's predictions. For more details, see `Ref docs ` Raises: ValueError: if Min or Max replica is negative. Traffic percentage > 100 or < 0. Or if traffic_split does not sum to 100. - ValueError: if either explanation_metadata or explanation_parameters but not both are specified. """ @@ -638,8 +655,10 @@ def deploy( accelerator_type: Optional[str] = None, accelerator_count: Optional[int] = None, service_account: Optional[str] = None, - explanation_metadata: Optional[explain.ExplanationMetadata] = None, - explanation_parameters: Optional[explain.ExplanationParameters] = None, + explanation_metadata: Optional[aiplatform.explain.ExplanationMetadata] = None, + explanation_parameters: Optional[ + aiplatform.explain.ExplanationParameters + ] = None, metadata: Optional[Sequence[Tuple[str, str]]] = (), sync=True, deploy_request_timeout: Optional[float] = None, @@ -701,12 +720,12 @@ def deploy( to the resource project. Users deploying the Model must have the `iam.serviceAccounts.actAs` permission on this service account. - explanation_metadata (explain.ExplanationMetadata): + explanation_metadata (aiplatform.explain.ExplanationMetadata): Optional. Metadata describing the Model's input and output for explanation. Both `explanation_metadata` and `explanation_parameters` must be passed together when used. For more details, see `Ref docs ` - explanation_parameters (explain.ExplanationParameters): + explanation_parameters (aiplatform.explain.ExplanationParameters): Optional. Parameters to configure explaining for Model's predictions. For more details, see `Ref docs ` metadata (Sequence[Tuple[str, str]]): @@ -729,14 +748,14 @@ def deploy( self._sync_gca_resource_if_skipped() self._validate_deploy_args( - min_replica_count, - max_replica_count, - accelerator_type, - deployed_model_display_name, - traffic_split, - traffic_percentage, - explanation_metadata, - explanation_parameters, + min_replica_count=min_replica_count, + max_replica_count=max_replica_count, + accelerator_type=accelerator_type, + deployed_model_display_name=deployed_model_display_name, + traffic_split=traffic_split, + traffic_percentage=traffic_percentage, + explanation_metadata=explanation_metadata, + explanation_parameters=explanation_parameters, ) self._deploy( @@ -772,8 +791,10 @@ def _deploy( accelerator_type: Optional[str] = None, accelerator_count: Optional[int] = None, service_account: Optional[str] = None, - explanation_metadata: Optional[explain.ExplanationMetadata] = None, - explanation_parameters: Optional[explain.ExplanationParameters] = None, + explanation_metadata: Optional[aiplatform.explain.ExplanationMetadata] = None, + explanation_parameters: Optional[ + aiplatform.explain.ExplanationParameters + ] = None, metadata: Optional[Sequence[Tuple[str, str]]] = (), sync=True, deploy_request_timeout: Optional[float] = None, @@ -835,12 +856,12 @@ def _deploy( to the resource project. Users deploying the Model must have the `iam.serviceAccounts.actAs` permission on this service account. - explanation_metadata (explain.ExplanationMetadata): + explanation_metadata (aiplatform.explain.ExplanationMetadata): Optional. Metadata describing the Model's input and output for explanation. Both `explanation_metadata` and `explanation_parameters` must be passed together when used. For more details, see `Ref docs ` - explanation_parameters (explain.ExplanationParameters): + explanation_parameters (aiplatform.explain.ExplanationParameters): Optional. Parameters to configure explaining for Model's predictions. For more details, see `Ref docs ` metadata (Sequence[Tuple[str, str]]): @@ -859,19 +880,17 @@ def _deploy( Target Accelerator Duty Cycle. Must also set accelerator_type and accelerator_count if specified. A default value of 60 will be used if not specified. - Raises: - ValueError: If there is not current traffic split and traffic percentage - is not 0 or 100. """ _LOGGER.log_action_start_against_resource( f"Deploying Model {model.resource_name} to", "", self ) self._deploy_call( - self.api_client, - self.resource_name, - model, - self._gca_resource.traffic_split, + api_client=self.api_client, + endpoint_resource_name=self.resource_name, + model=model, + endpoint_resource_traffic_split=self._gca_resource.traffic_split, + network=self.network, deployed_model_display_name=deployed_model_display_name, traffic_percentage=traffic_percentage, traffic_split=traffic_split, @@ -900,6 +919,7 @@ def _deploy_call( endpoint_resource_name: str, model: "Model", endpoint_resource_traffic_split: Optional[proto.MapField] = None, + network: Optional[str] = None, deployed_model_display_name: Optional[str] = None, traffic_percentage: Optional[int] = 0, traffic_split: Optional[Dict[str, int]] = None, @@ -909,8 +929,10 @@ def _deploy_call( accelerator_type: Optional[str] = None, accelerator_count: Optional[int] = None, service_account: Optional[str] = None, - explanation_metadata: Optional[explain.ExplanationMetadata] = None, - explanation_parameters: Optional[explain.ExplanationParameters] = None, + explanation_metadata: Optional[aiplatform.explain.ExplanationMetadata] = None, + explanation_parameters: Optional[ + aiplatform.explain.ExplanationParameters + ] = None, metadata: Optional[Sequence[Tuple[str, str]]] = (), deploy_request_timeout: Optional[float] = None, autoscaling_target_cpu_utilization: Optional[int] = None, @@ -927,6 +949,12 @@ def _deploy_call( Required. Model to be deployed. endpoint_resource_traffic_split (proto.MapField): Optional. Endpoint current resource traffic split. + network (str): + Optional. The full name of the Compute Engine network to which + this Endpoint will be peered. E.g. "projects/123/global/networks/my_vpc". + Private services access must already be configured for the network. + If left unspecified, the job is not peered with any network or + the network set in aiplatform.init will be used. deployed_model_display_name (str): Optional. The display name of the DeployedModel. If not provided upon creation, the Model's display_name is used. @@ -964,6 +992,12 @@ def _deploy_call( is not provided, the larger value of min_replica_count or 1 will be used. If value provided is smaller than min_replica_count, it will automatically be increased to be min_replica_count. + accelerator_type (str): + Optional. Hardware accelerator type. Must also set accelerator_count if used. + One of ACCELERATOR_TYPE_UNSPECIFIED, NVIDIA_TESLA_K80, NVIDIA_TESLA_P100, + NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, NVIDIA_TESLA_T4 + accelerator_count (int): + Optional. The number of accelerators to attach to a worker replica. service_account (str): The service account that the DeployedModel's container runs as. Specify the email address of the service account. If this service account is not @@ -971,21 +1005,17 @@ def _deploy_call( to the resource project. Users deploying the Model must have the `iam.serviceAccounts.actAs` permission on this service account. - explanation_metadata (explain.ExplanationMetadata): + explanation_metadata (aiplatform.explain.ExplanationMetadata): Optional. Metadata describing the Model's input and output for explanation. Both `explanation_metadata` and `explanation_parameters` must be passed together when used. For more details, see `Ref docs ` - explanation_parameters (explain.ExplanationParameters): + explanation_parameters (aiplatform.explain.ExplanationParameters): Optional. Parameters to configure explaining for Model's predictions. For more details, see `Ref docs ` metadata (Sequence[Tuple[str, str]]): Optional. Strings which should be sent along with the request as metadata. - sync (bool): - Whether to execute this method synchronously. If False, this method - will be executed in concurrent Future and any downstream object will - be immediately returned and synced when the Future has completed. deploy_request_timeout (float): Optional. The timeout for the deploy request in seconds. autoscaling_target_cpu_utilization (int): @@ -995,12 +1025,12 @@ def _deploy_call( Optional. Target Accelerator Duty Cycle. Must also set accelerator_type and accelerator_count if specified. A default value of 60 will be used if not specified. + Raises: + ValueError: If only `accelerator_type` or `accelerator_count` is specified. + ValueError: If model does not support deployment. ValueError: If there is not current traffic split and traffic percentage is not 0 or 100. - ValueError: If only `explanation_metadata` or `explanation_parameters` - is specified. - ValueError: If model does not support deployment. """ max_replica_count = max(min_replica_count, max_replica_count) @@ -1115,7 +1145,9 @@ def _deploy_call( explanation_spec.parameters = explanation_parameters deployed_model.explanation_spec = explanation_spec - if traffic_split is None: + # Checking if traffic percentage is valid + # TODO(b/221059294) PrivateEndpoint should support traffic split + if traffic_split is None and not network: # new model traffic needs to be 100 if no pre-existing models if not endpoint_resource_traffic_split: # default scenario @@ -1279,6 +1311,7 @@ def _instantiate_prediction_client( credentials (google.auth.credentials.Credentials): Optional custom credentials to use when accessing interacting with the prediction client. + Returns: prediction_client (prediction_service_client.PredictionServiceClient): Initialized prediction client with optional overrides. @@ -1338,7 +1371,8 @@ def update( Optional. The timeout for the update request in seconds. Returns: - Endpoint - Updated endpoint resource. + Endpoint (aiplatform.Prediction): + Updated endpoint resource. Raises: ValueError: If `labels` is not the correct format. @@ -1417,8 +1451,10 @@ def predict( [PredictSchemata's][google.cloud.aiplatform.v1beta1.Model.predict_schemata] ``parameters_schema_uri``. timeout (float): Optional. The timeout for this request in seconds. + Returns: - prediction: Prediction with returned predictions and Model Id. + prediction (aiplatform.Prediction): + Prediction with returned predictions and Model ID. """ self.wait() @@ -1474,28 +1510,539 @@ def explain( Optional. If specified, this ExplainRequest will be served by the chosen DeployedModel, overriding this Endpoint's traffic split. timeout (float): Optional. The timeout for this request in seconds. + + Returns: + prediction (aiplatform.Prediction): + Prediction with returned predictions, explanations, and Model ID. + """ + self.wait() + + explain_response = self._prediction_client.explain( + endpoint=self.resource_name, + instances=instances, + parameters=parameters, + deployed_model_id=deployed_model_id, + timeout=timeout, + ) + + return Prediction( + predictions=[ + json_format.MessageToDict(item) + for item in explain_response.predictions.pb + ], + deployed_model_id=explain_response.deployed_model_id, + explanations=explain_response.explanations, + ) + + @classmethod + def list( + cls, + filter: Optional[str] = None, + order_by: Optional[str] = None, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + ) -> List["models.Endpoint"]: + """List all Endpoint resource instances. + + Example Usage: + aiplatform.Endpoint.list( + filter='labels.my_label="my_label_value" OR display_name=!"old_endpoint"', + ) + + Args: + filter (str): + Optional. An expression for filtering the results of the request. + For field names both snake_case and camelCase are supported. + order_by (str): + Optional. A comma-separated list of fields to order by, sorted in + ascending order. Use "desc" after a field name for descending. + Supported fields: `display_name`, `create_time`, `update_time` + project (str): + Optional. Project to retrieve list from. If not set, project + set in aiplatform.init will be used. + location (str): + Optional. Location to retrieve list from. If not set, location + set in aiplatform.init will be used. + credentials (auth_credentials.Credentials): + Optional. Custom credentials to use to retrieve list. Overrides + credentials set in aiplatform.init. + + Returns: + List[models.Endpoint]: + A list of Endpoint resource objects + """ + + return cls._list_with_local_order( + cls_filter=lambda ep: not bool( + ep.network + ), # `network` is empty for public Endpoints + filter=filter, + order_by=order_by, + project=project, + location=location, + credentials=credentials, + ) + + def list_models(self) -> List[gca_endpoint_compat.DeployedModel]: + """Returns a list of the models deployed to this Endpoint. + + Returns: + deployed_models (List[aiplatform.gapic.DeployedModel]): + A list of the models deployed in this Endpoint. + """ + self._sync_gca_resource() + return list(self._gca_resource.deployed_models) + + def undeploy_all(self, sync: bool = True) -> "Endpoint": + """Undeploys every model deployed to this Endpoint. + + Args: + sync (bool): + Whether to execute this method synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. + """ + self._sync_gca_resource() + + models_to_undeploy = sorted( # Undeploy zero traffic models first + self._gca_resource.traffic_split.keys(), + key=lambda id: self._gca_resource.traffic_split[id], + ) + + for deployed_model in models_to_undeploy: + self._undeploy(deployed_model_id=deployed_model, sync=sync) + + return self + + def delete(self, force: bool = False, sync: bool = True) -> None: + """Deletes this Vertex AI Endpoint resource. If force is set to True, + all models on this Endpoint will be undeployed prior to deletion. + + Args: + force (bool): + Required. If force is set to True, all deployed models on this + Endpoint will be undeployed first. Default is False. + sync (bool): + Whether to execute this method synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. + + Raises: + FailedPrecondition: If models are deployed on this Endpoint and force = False. + """ + if force: + self.undeploy_all(sync=sync) + + super().delete(sync=sync) + + +class PrivateEndpoint(Endpoint): + """ + Represents a Vertex AI PrivateEndpoint resource. + + Read more [about private endpoints in the documentation.](https://cloud.google.com/vertex-ai/docs/predictions/using-private-endpoints) + """ + + def __init__( + self, + endpoint_name: str, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + ): + """Retrieves a PrivateEndpoint resource. + + Example usage: + my_private_endpoint = aiplatform.PrivateEndpoint( + endpoint_name="projects/123/locations/us-central1/endpoints/1234567891234567890" + ) + + or (when project and location are initialized) + + my_private_endpoint = aiplatform.PrivateEndpoint( + endpoint_name="1234567891234567890" + ) + + Args: + endpoint_name (str): + Required. A fully-qualified endpoint resource name or endpoint ID. + Example: "projects/123/locations/us-central1/endpoints/my_endpoint_id" or + "my_endpoint_id" when project and location are initialized or passed. + project (str): + Optional. Project to retrieve endpoint from. If not set, project + set in aiplatform.init will be used. + location (str): + Optional. Location to retrieve endpoint from. If not set, location + set in aiplatform.init will be used. + credentials (auth_credentials.Credentials): + Optional. Custom credentials to use to upload this model. Overrides + credentials set in aiplatform.init. + + Raises: + ValueError: If the Endpoint being retrieved is not a PrivateEndpoint. + ImportError: If there is an issue importing the `urllib3` package. + """ + try: + import urllib3 + except ImportError: + raise ImportError( + "Cannot import the urllib3 HTTP client. Please install google-cloud-aiplatform[private_endpoints]." + ) + + super().__init__( + endpoint_name=endpoint_name, + project=project, + location=location, + credentials=credentials, + ) + + if not self.network: + raise ValueError( + "Please ensure the Endpoint being retrieved is a PrivateEndpoint." + ) + + self._http_client = urllib3.PoolManager() + + @property + def predict_http_uri(self) -> Optional[str]: + """HTTP path to send prediction requests to, used when calling `PrivateEndpoint.predict()`""" + if not self._gca_resource.deployed_models: + return None + return self._gca_resource.deployed_models[0].private_endpoints.predict_http_uri + + @property + def explain_http_uri(self) -> Optional[str]: + """HTTP path to send explain requests to, used when calling `PrivateEndpoint.explain()`""" + if not self._gca_resource.deployed_models: + return None + return self._gca_resource.deployed_models[0].private_endpoints.explain_http_uri + + @property + def health_http_uri(self) -> Optional[str]: + """HTTP path to send health check requests to, used when calling `PrivateEndpoint.health_check()`""" + if not self._gca_resource.deployed_models: + return None + return self._gca_resource.deployed_models[0].private_endpoints.health_http_uri + + @classmethod + def create( + cls, + display_name: str, + project: Optional[str] = None, + location: Optional[str] = None, + network: Optional[str] = None, + description: Optional[str] = None, + labels: Optional[Dict[str, str]] = None, + credentials: Optional[auth_credentials.Credentials] = None, + encryption_spec_key_name: Optional[str] = None, + sync=True, + ) -> "PrivateEndpoint": + """Creates a new PrivateEndpoint. + + Example usage: + my_private_endpoint = aiplatform.PrivateEndpoint.create( + display_name="my_endpoint_name", + project="my_project_id", + location="us-central1", + network="projects/123456789123/global/networks/my_vpc" + ) + + or (when project and location are initialized) + + my_private_endpoint = aiplatform.PrivateEndpoint.create( + display_name="my_endpoint_name", + network="projects/123456789123/global/networks/my_vpc" + ) + + Args: + display_name (str): + Required. The user-defined name of the Endpoint. + The name can be up to 128 characters long and can be consist + of any UTF-8 characters. + project (str): + Optional. Project to retrieve endpoint from. If not set, project + set in aiplatform.init will be used. + location (str): + Optional. Location to retrieve endpoint from. If not set, location + set in aiplatform.init will be used. + network (str): + Optional. The full name of the Compute Engine network to which + this Endpoint will be peered. E.g. "projects/123456789123/global/networks/my_vpc". + Private services access must already be configured for the network. + If not set, network set in aiplatform.init will be used. + description (str): + Optional. The description of the Endpoint. + labels (Dict[str, str]): + Optional. The labels with user-defined metadata to + organize your Endpoints. + Label keys and values can be no longer than 64 + characters (Unicode codepoints), can only + contain lowercase letters, numeric characters, + underscores and dashes. International characters + are allowed. + See https://goo.gl/xmQnxf for more information + and examples of labels. + credentials (auth_credentials.Credentials): + Optional. Custom credentials to use to upload this model. Overrides + credentials set in aiplatform.init. + encryption_spec_key_name (str): + Optional. The Cloud KMS resource identifier of the customer + managed encryption key used to protect the model. Has the + form: + ``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``. + The key needs to be in the same region as where the compute + resource is created. + + If set, this Model and all sub-resources of this Model will be secured by this key. + + Overrides encryption_spec_key_name set in aiplatform.init. + sync (bool): + Whether to execute this method synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. + + Returns: + endpoint (aiplatform.PrivateEndpoint): + Created endpoint. + + Raises: + ValueError: A network must be instantiated when creating a PrivateEndpoint. + """ + api_client = cls._instantiate_client(location=location, credentials=credentials) + + utils.validate_display_name(display_name) + if labels: + utils.validate_labels(labels) + + project = project or initializer.global_config.project + location = location or initializer.global_config.location + + if not network: + raise ValueError( + "Please provide required argument `network` or set " + "using aiplatform.init(network=...)" + ) + + return cls._create( + api_client=api_client, + display_name=display_name, + project=project, + location=location, + description=description, + labels=labels, + credentials=credentials, + encryption_spec=initializer.global_config.get_encryption_spec( + encryption_spec_key_name=encryption_spec_key_name + ), + network=network, + sync=sync, + ) + + @classmethod + def _construct_sdk_resource_from_gapic( + cls, + gapic_resource: proto.Message, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + ) -> "PrivateEndpoint": + """Given a GAPIC PrivateEndpoint object, return the SDK representation. + + Args: + gapic_resource (proto.Message): + A GAPIC representation of a PrivateEndpoint resource, usually + retrieved by a get_* or in a list_* API call. + project (str): + Optional. Project to construct Endpoint object from. If not set, + project set in aiplatform.init will be used. + location (str): + Optional. Location to construct Endpoint object from. If not set, + location set in aiplatform.init will be used. + credentials (auth_credentials.Credentials): + Optional. Custom credentials to use to construct Endpoint. + Overrides credentials set in aiplatform.init. + Returns: - prediction: Prediction with returned predictions, explanations and Model Id. + endpoint (aiplatform.PrivateEndpoint): + An initialized PrivateEndpoint resource. + + Raises: + ImportError: If there is an issue importing the `urllib3` package. + """ + try: + import urllib3 + except ImportError: + raise ImportError( + "Cannot import the urllib3 HTTP client. Please install google-cloud-aiplatform[private_endpoints]." + ) + + endpoint = cls._empty_constructor( + project=project, location=location, credentials=credentials + ) + + endpoint._gca_resource = gapic_resource + + endpoint._http_client = urllib3.PoolManager() + + return endpoint + + def _http_request( + self, + method: str, + url: str, + body: Optional[Dict[Any, Any]] = None, + headers: Optional[Dict[str, str]] = None, + ) -> "urllib3.response.HTTPResponse": # type: ignore # noqa: F821 + """Helper function used to perform HTTP requests for PrivateEndpoint. + + Args: + method (str): + Required. The HTTP request method to use. Example: "POST" or "GET" + url (str): + Required. The url used to send requests and get responses from. + body (Dict[Any, Any]): + Optional. Data sent to the url in the HTTP request. For a PrivateEndpoint, + an instance is sent and a prediction response is expected. + headers (Dict[str, str]): + Optional. Header in the HTTP request. + + Returns: + urllib3.response.HTTPResponse: + A HTTP Response container. + + Raises: + ImportError: If there is an issue importing the `urllib3` package. + RuntimeError: If a HTTP request could not be made. + RuntimeError: A connection could not be established with the PrivateEndpoint and + a HTTP request could not be made. + """ + try: + import urllib3 + except ImportError: + raise ImportError( + "Cannot import the urllib3 HTTP client. Please install google-cloud-aiplatform[private_endpoints]." + ) + + try: + response = self._http_client.request( + method=method, url=url, body=body, headers=headers + ) + + if response.status < _SUCCESSFUL_HTTP_RESPONSE: + return response + else: + raise RuntimeError( + f"{response.status} - Failed to make request, see response: " + + response.data.decode("utf-8") + ) + + except urllib3.exceptions.MaxRetryError as exc: + raise RuntimeError( + f"Failed to make a {method} request to this URI, make sure: " + " this call is being made inside the network this PrivateEndpoint is peered to " + f"({self._gca_resource.network}), calling health_check() returns True, " + f"and that {url} is a valid URL." + ) from exc + + def predict(self, instances: List, parameters: Optional[Dict] = None) -> Prediction: + """Make a prediction against this PrivateEndpoint using a HTTP request. + This method must be called within the network the PrivateEndpoint is peered to. + The predict() call will fail otherwise. To check, use `PrivateEndpoint.network`. + + Example usage: + response = my_private_endpoint.predict(instances=[...]) + my_predictions = response.predictions + + Args: + instances (List): + Required. The instances that are the input to the + prediction call. Instance types mut be JSON serializable. + A DeployedModel may have an upper limit + on the number of instances it supports per request, and + when it is exceeded the prediction call errors in case + of AutoML Models, or, in case of customer created + Models, the behaviour is as documented by that Model. + The schema of any single instance may be specified via + Endpoint's DeployedModels' + [Model's][google.cloud.aiplatform.v1beta1.DeployedModel.model] + [PredictSchemata's][google.cloud.aiplatform.v1beta1.Model.predict_schemata] + ``instance_schema_uri``. + parameters (Dict): + The parameters that govern the prediction. The schema of + the parameters may be specified via Endpoint's + DeployedModels' [Model's + ][google.cloud.aiplatform.v1beta1.DeployedModel.model] + [PredictSchemata's][google.cloud.aiplatform.v1beta1.Model.predict_schemata] + ``parameters_schema_uri``. + + Returns: + prediction (aiplatform.Prediction): + Prediction object with returned predictions and Model ID. + + Raises: + RuntimeError: If a model has not been deployed a request cannot be made. + """ + self.wait() + self._sync_gca_resource_if_skipped() + + if not self._gca_resource.deployed_models: + raise RuntimeError( + "Cannot make a predict request because a model has not been deployed on this Private" + "Endpoint. Please ensure a model has been deployed." + ) + + response = self._http_request( + method="POST", + url=self.predict_http_uri, + body=json.dumps({"instances": instances}), + headers={"Content-Type": "application/json"}, + ) + + prediction_response = json.loads(response.data) + + return Prediction( + predictions=prediction_response.get("predictions"), + deployed_model_id=self._gca_resource.deployed_models[0].id, + ) + + def explain(self): + raise NotImplementedError( + f"{self.__class__.__name__} class does not support 'explain' as of now." + ) + + def health_check(self) -> bool: + """ + Makes a request to this PrivateEndpoint's health check URI. Must be within network + that this PrivateEndpoint is in. + + Example Usage: + if my_private_endpoint.health_check(): + print("PrivateEndpoint is healthy!") + + Returns: + bool: + Checks if calls can be made to this PrivateEndpoint. + + Raises: + RuntimeError: If a model has not been deployed a request cannot be made. """ self.wait() + self._sync_gca_resource_if_skipped() - explain_response = self._prediction_client.explain( - endpoint=self.resource_name, - instances=instances, - parameters=parameters, - deployed_model_id=deployed_model_id, - timeout=timeout, - ) + if not self._gca_resource.deployed_models: + raise RuntimeError( + "Cannot make a health check request because a model has not been deployed on this Private" + "Endpoint. Please ensure a model has been deployed." + ) - return Prediction( - predictions=[ - json_format.MessageToDict(item) - for item in explain_response.predictions.pb - ], - deployed_model_id=explain_response.deployed_model_id, - explanations=explain_response.explanations, + response = self._http_request( + method="GET", + url=self.health_http_uri, ) + return response.status < _SUCCESSFUL_HTTP_RESPONSE + @classmethod def list( cls, @@ -1504,14 +2051,17 @@ def list( project: Optional[str] = None, location: Optional[str] = None, credentials: Optional[auth_credentials.Credentials] = None, - ) -> List["models.Endpoint"]: - """List all Endpoint resource instances. + ) -> List["models.PrivateEndpoint"]: + """List all PrivateEndpoint resource instances. Example Usage: + my_private_endpoints = aiplatform.PrivateEndpoint.list() - aiplatform.Endpoint.list( - filter='labels.my_label="my_label_value" OR display_name=!"old_endpoint"', - ) + or + + my_private_endpoints = aiplatform.PrivateEndpoint.list( + filter='labels.my_label="my_label_value" OR display_name=!"old_endpoint"', + ) Args: filter (str): @@ -1532,10 +2082,14 @@ def list( credentials set in aiplatform.init. Returns: - List[models.Endpoint] - A list of Endpoint resource objects + List[models.PrivateEndpoint]: + A list of PrivateEndpoint resource objects. """ return cls._list_with_local_order( + cls_filter=lambda ep: bool( + ep.network + ), # Only PrivateEndpoints have a network set filter=filter, order_by=order_by, project=project, @@ -1543,40 +2097,153 @@ def list( credentials=credentials, ) - def list_models(self) -> List[gca_endpoint_compat.DeployedModel]: - """Returns a list of the models deployed to this Endpoint. - - Returns: - deployed_models (List[aiplatform.gapic.DeployedModel]): - A list of the models deployed in this Endpoint. - """ - self._sync_gca_resource() - return list(self._gca_resource.deployed_models) + def deploy( + self, + model: "Model", + deployed_model_display_name: Optional[str] = None, + machine_type: Optional[str] = None, + min_replica_count: int = 1, + max_replica_count: int = 1, + accelerator_type: Optional[str] = None, + accelerator_count: Optional[int] = None, + service_account: Optional[str] = None, + explanation_metadata: Optional[aiplatform.explain.ExplanationMetadata] = None, + explanation_parameters: Optional[ + aiplatform.explain.ExplanationParameters + ] = None, + metadata: Optional[Sequence[Tuple[str, str]]] = (), + sync=True, + ) -> None: + """Deploys a Model to the PrivateEndpoint. - def undeploy_all(self, sync: bool = True) -> "Endpoint": - """Undeploys every model deployed to this Endpoint. + Example Usage: + my_private_endpoint.deploy( + model=my_model + ) Args: + model (aiplatform.Model): + Required. Model to be deployed. + deployed_model_display_name (str): + Optional. The display name of the DeployedModel. If not provided + upon creation, the Model's display_name is used. + machine_type (str): + Optional. The type of machine. Not specifying machine type will + result in model to be deployed with automatic resources. + min_replica_count (int): + Optional. The minimum number of machine replicas this deployed + model will be always deployed on. If traffic against it increases, + it may dynamically be deployed onto more replicas, and as traffic + decreases, some of these extra replicas may be freed. + max_replica_count (int): + Optional. The maximum number of replicas this deployed model may + be deployed on when the traffic against it increases. If requested + value is too large, the deployment will error, but if deployment + succeeds then the ability to scale the model to that many replicas + is guaranteed (barring service outages). If traffic against the + deployed model increases beyond what its replicas at maximum may + handle, a portion of the traffic will be dropped. If this value + is not provided, the larger value of min_replica_count or 1 will + be used. If value provided is smaller than min_replica_count, it + will automatically be increased to be min_replica_count. + accelerator_type (str): + Optional. Hardware accelerator type. Must also set accelerator_count if used. + One of ACCELERATOR_TYPE_UNSPECIFIED, NVIDIA_TESLA_K80, NVIDIA_TESLA_P100, + NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, NVIDIA_TESLA_T4 + accelerator_count (int): + Optional. The number of accelerators to attach to a worker replica. + service_account (str): + The service account that the DeployedModel's container runs as. Specify the + email address of the service account. If this service account is not + specified, the container runs as a service account that doesn't have access + to the resource project. + Users deploying the Model must have the `iam.serviceAccounts.actAs` + permission on this service account. + explanation_metadata (aiplatform.explain.ExplanationMetadata): + Optional. Metadata describing the Model's input and output for explanation. + Both `explanation_metadata` and `explanation_parameters` must be + passed together when used. For more details, see + `Ref docs ` + explanation_parameters (aiplatform.explain.ExplanationParameters): + Optional. Parameters to configure explaining for Model's predictions. + For more details, see `Ref docs ` + metadata (Sequence[Tuple[str, str]]): + Optional. Strings which should be sent along with the request as + metadata. sync (bool): Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will be immediately returned and synced when the Future has completed. """ - self._sync_gca_resource() + self._validate_deploy_args( + min_replica_count=min_replica_count, + max_replica_count=max_replica_count, + accelerator_type=accelerator_type, + deployed_model_display_name=deployed_model_display_name, + traffic_split=None, + traffic_percentage=100, + explanation_metadata=explanation_metadata, + explanation_parameters=explanation_parameters, + ) - models_to_undeploy = sorted( # Undeploy zero traffic models first - self._gca_resource.traffic_split.keys(), - key=lambda id: self._gca_resource.traffic_split[id], + self._deploy( + model=model, + deployed_model_display_name=deployed_model_display_name, + traffic_percentage=100, + traffic_split=None, + machine_type=machine_type, + min_replica_count=min_replica_count, + max_replica_count=max_replica_count, + accelerator_type=accelerator_type, + accelerator_count=accelerator_count, + service_account=service_account, + explanation_metadata=explanation_metadata, + explanation_parameters=explanation_parameters, + metadata=metadata, + sync=sync, ) - for deployed_model in models_to_undeploy: - self._undeploy(deployed_model_id=deployed_model, sync=sync) + def undeploy( + self, + deployed_model_id: str, + sync=True, + ) -> None: + """Undeploys a deployed model from the PrivateEndpoint. - return self + Example Usage: + my_private_endpoint.undeploy( + deployed_model_id="1234567891232567891" + ) + + or + + my_deployed_model_id = my_private_endpoint.list_models()[0].id + my_private_endpoint.undeploy( + deployed_model_id=my_deployed_model_id + ) + + Args: + deployed_model_id (str): + Required. The ID of the DeployedModel to be undeployed from the + PrivateEndpoint. Use PrivateEndpoint.list_models() to get the + deployed model ID. + sync (bool): + Whether to execute this method synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. + """ + self._sync_gca_resource_if_skipped() + + # TODO(b/211351292): Add traffic splitting for PrivateEndpoint + self._undeploy( + deployed_model_id=deployed_model_id, + traffic_split=None, + sync=sync, + ) def delete(self, force: bool = False, sync: bool = True) -> None: - """Deletes this Vertex AI Endpoint resource. If force is set to True, - all models on this Endpoint will be undeployed prior to deletion. + """Deletes this Vertex AI PrivateEndpoint resource. If force is set to True, + all models on this PrivateEndpoint will be undeployed prior to deletion. Args: force (bool): @@ -1586,13 +2253,17 @@ def delete(self, force: bool = False, sync: bool = True) -> None: Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will be immediately returned and synced when the Future has completed. + Raises: FailedPrecondition: If models are deployed on this Endpoint and force = False. """ - if force: - self.undeploy_all(sync=sync) + if force and self._gca_resource.deployed_models: + self.undeploy( + deployed_model_id=self._gca_resource.deployed_models[0].id, + sync=sync, + ) - super().delete(sync=sync) + super().delete(force=False, sync=sync) class Model(base.VertexAiResourceNounWithFutureManager): @@ -1779,12 +2450,11 @@ def update( """Updates a model. Example usage: - - my_model = my_model.update( - display_name='my-model', - description='my description', - labels={'key': 'value'}, - ) + my_model = my_model.update( + display_name="my-model", + description="my description", + labels={'key': 'value'}, + ) Args: display_name (str): @@ -1802,8 +2472,11 @@ def update( are allowed. See https://goo.gl/xmQnxf for more information and examples of labels. + Returns: - model: Updated model resource. + model (aiplatform.Model): + Updated model resource. + Raises: ValueError: If `labels` is not the correct format. """ @@ -1874,17 +2547,13 @@ def upload( resource. Example usage: - - my_model = Model.upload( - display_name='my-model', - artifact_uri='gs://my-model/saved-model' - serving_container_image_uri='tensorflow/serving' - ) + my_model = Model.upload( + display_name="my-model", + artifact_uri="gs://my-model/saved-model", + serving_container_image_uri="tensorflow/serving" + ) Args: - display_name (str): - Optional. The display name of the Model. The name can be up to 128 - characters long and can be consist of any UTF-8 characters. serving_container_image_uri (str): Required. The URI of the Model serving container. artifact_uri (str): @@ -1973,14 +2642,17 @@ def upload( and probably different, including the URI scheme, than the one given on input. The output URI will point to a location where the user only has a read access. - explanation_metadata (explain.ExplanationMetadata): + explanation_metadata (aiplatform.explain.ExplanationMetadata): Optional. Metadata describing the Model's input and output for explanation. Both `explanation_metadata` and `explanation_parameters` must be passed together when used. For more details, see `Ref docs ` - explanation_parameters (explain.ExplanationParameters): + explanation_parameters (aiplatform.explain.ExplanationParameters): Optional. Parameters to configure explaining for Model's predictions. For more details, see `Ref docs ` + display_name (str): + Optional. The display name of the Model. The name can be up to 128 + characters long and can be consist of any UTF-8 characters. project: Optional[str]=None, Project to upload this model to. Overrides project set in aiplatform.init. @@ -2016,12 +2688,14 @@ def upload( staging_bucket set in aiplatform.init. upload_request_timeout (float): Optional. The timeout for the upload request in seconds. + Returns: - model: Instantiated representation of the uploaded model resource. + model (aiplatform.Model): + Instantiated representation of the uploaded model resource. + Raises: ValueError: If only `explanation_metadata` or `explanation_parameters` - is specified. - Also if model directory does not contain a supported model file. + is specified. Also if model directory does not contain a supported model file. """ if not display_name: display_name = cls._generate_display_name() @@ -2137,10 +2811,9 @@ def upload( return this_model - # TODO(b/172502059) support deploying with endpoint resource name def deploy( self, - endpoint: Optional["Endpoint"] = None, + endpoint: Optional[Union["Endpoint", "PrivateEndpoint"]] = None, deployed_model_display_name: Optional[str] = None, traffic_percentage: Optional[int] = 0, traffic_split: Optional[Dict[str, int]] = None, @@ -2150,21 +2823,24 @@ def deploy( accelerator_type: Optional[str] = None, accelerator_count: Optional[int] = None, service_account: Optional[str] = None, - explanation_metadata: Optional[explain.ExplanationMetadata] = None, - explanation_parameters: Optional[explain.ExplanationParameters] = None, + explanation_metadata: Optional[aiplatform.explain.ExplanationMetadata] = None, + explanation_parameters: Optional[ + aiplatform.explain.ExplanationParameters + ] = None, metadata: Optional[Sequence[Tuple[str, str]]] = (), encryption_spec_key_name: Optional[str] = None, + network: Optional[str] = None, sync=True, deploy_request_timeout: Optional[float] = None, autoscaling_target_cpu_utilization: Optional[int] = None, autoscaling_target_accelerator_duty_cycle: Optional[int] = None, - ) -> Endpoint: + ) -> Union[Endpoint, PrivateEndpoint]: """Deploys model to endpoint. Endpoint will be created if unspecified. Args: - endpoint ("Endpoint"): - Optional. Endpoint to deploy model to. If not specified, endpoint - display name will be model display name+'_endpoint'. + endpoint (Union[Endpoint, PrivateEndpoint]): + Optional. Public or private Endpoint to deploy model to. If not specified, + endpoint display name will be model display name+'_endpoint'. deployed_model_display_name (str): Optional. The display name of the DeployedModel. If not provided upon creation, the Model's display_name is used. @@ -2214,12 +2890,12 @@ def deploy( to the resource project. Users deploying the Model must have the `iam.serviceAccounts.actAs` permission on this service account. - explanation_metadata (explain.ExplanationMetadata): + explanation_metadata (aiplatform.explain.ExplanationMetadata): Optional. Metadata describing the Model's input and output for explanation. Both `explanation_metadata` and `explanation_parameters` must be passed together when used. For more details, see `Ref docs ` - explanation_parameters (explain.ExplanationParameters): + explanation_parameters (aiplatform.explain.ExplanationParameters): Optional. Parameters to configure explaining for Model's predictions. For more details, see `Ref docs ` metadata (Sequence[Tuple[str, str]]): @@ -2233,9 +2909,17 @@ def deploy( The key needs to be in the same region as where the compute resource is created. - If set, this Model and all sub-resources of this Model will be secured by this key. + If set, this Endpoint and all sub-resources of this Endpoint will be secured by this key. - Overrides encryption_spec_key_name set in aiplatform.init + Overrides encryption_spec_key_name set in aiplatform.init. + network (str): + Optional. The full name of the Compute Engine network to which + this Endpoint will be peered. E.g. "projects/12345/global/networks/myVPC". + Private services access must already be configured for the network. + If left unspecified, the job is not peered with any network or + the network set in aiplatform.init will be used. + If set, a PrivateEndpoint will be created. Read more about PrivateEndpoints + [in the documentation](https://cloud.google.com/vertex-ai/docs/predictions/using-private-endpoints). sync (bool): Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will @@ -2249,22 +2933,34 @@ def deploy( Optional. Target Accelerator Duty Cycle. Must also set accelerator_type and accelerator_count if specified. A default value of 60 will be used if not specified. + Returns: - endpoint ("Endpoint"): + endpoint (Union[Endpoint, PrivateEndpoint]): Endpoint with the deployed model. + + Raises: + ValueError: If `traffic_split` is set for PrivateEndpoint. """ Endpoint._validate_deploy_args( - min_replica_count, - max_replica_count, - accelerator_type, - deployed_model_display_name, - traffic_split, - traffic_percentage, - explanation_metadata, - explanation_parameters, + min_replica_count=min_replica_count, + max_replica_count=max_replica_count, + accelerator_type=accelerator_type, + deployed_model_display_name=deployed_model_display_name, + traffic_split=traffic_split, + traffic_percentage=traffic_percentage, + explanation_metadata=explanation_metadata, + explanation_parameters=explanation_parameters, ) + if isinstance(endpoint, PrivateEndpoint): + if traffic_split: + raise ValueError( + "Traffic splitting is not yet supported for PrivateEndpoint. " + "Try calling deploy() without providing `traffic_split`. " + "A maximum of one model can be deployed to each private Endpoint." + ) + return self._deploy( endpoint=endpoint, deployed_model_display_name=deployed_model_display_name, @@ -2281,6 +2977,7 @@ def deploy( metadata=metadata, encryption_spec_key_name=encryption_spec_key_name or initializer.global_config.encryption_spec_key_name, + network=network, sync=sync, deploy_request_timeout=deploy_request_timeout, autoscaling_target_cpu_utilization=autoscaling_target_cpu_utilization, @@ -2290,7 +2987,7 @@ def deploy( @base.optional_sync(return_input_arg="endpoint", bind_future_to_self=False) def _deploy( self, - endpoint: Optional["Endpoint"] = None, + endpoint: Optional[Union["Endpoint", "PrivateEndpoint"]] = None, deployed_model_display_name: Optional[str] = None, traffic_percentage: Optional[int] = 0, traffic_split: Optional[Dict[str, int]] = None, @@ -2300,21 +2997,24 @@ def _deploy( accelerator_type: Optional[str] = None, accelerator_count: Optional[int] = None, service_account: Optional[str] = None, - explanation_metadata: Optional[explain.ExplanationMetadata] = None, - explanation_parameters: Optional[explain.ExplanationParameters] = None, + explanation_metadata: Optional[aiplatform.explain.ExplanationMetadata] = None, + explanation_parameters: Optional[ + aiplatform.explain.ExplanationParameters + ] = None, metadata: Optional[Sequence[Tuple[str, str]]] = (), encryption_spec_key_name: Optional[str] = None, + network: Optional[str] = None, sync: bool = True, deploy_request_timeout: Optional[float] = None, autoscaling_target_cpu_utilization: Optional[int] = None, autoscaling_target_accelerator_duty_cycle: Optional[int] = None, - ) -> Endpoint: + ) -> Union[Endpoint, PrivateEndpoint]: """Deploys model to endpoint. Endpoint will be created if unspecified. Args: - endpoint ("Endpoint"): - Optional. Endpoint to deploy model to. If not specified, endpoint - display name will be model display name+'_endpoint'. + endpoint (Union[Endpoint, PrivateEndpoint]): + Optional. Public or private Endpoint to deploy model to. If not specified, + endpoint display name will be model display name+'_endpoint'. deployed_model_display_name (str): Optional. The display name of the DeployedModel. If not provided upon creation, the Model's display_name is used. @@ -2364,12 +3064,12 @@ def _deploy( to the resource project. Users deploying the Model must have the `iam.serviceAccounts.actAs` permission on this service account. - explanation_metadata (explain.ExplanationMetadata): + explanation_metadata (aiplatform.explain.ExplanationMetadata): Optional. Metadata describing the Model's input and output for explanation. Both `explanation_metadata` and `explanation_parameters` must be passed together when used. For more details, see `Ref docs ` - explanation_parameters (explain.ExplanationParameters): + explanation_parameters (aiplatform.explain.ExplanationParameters): Optional. Parameters to configure explaining for Model's predictions. For more details, see `Ref docs ` metadata (Sequence[Tuple[str, str]]): @@ -2386,6 +3086,14 @@ def _deploy( If set, this Model and all sub-resources of this Model will be secured by this key. Overrides encryption_spec_key_name set in aiplatform.init + network (str): + Optional. The full name of the Compute Engine network to which + this Endpoint will be peered. E.g. "projects/12345/global/networks/myVPC". + Private services access must already be configured for the network. + If left unspecified, the job is not peered with any network or + the network set in aiplatform.init will be used. + If set, a PrivateEndpoint will be created. Read more about PrivateEndpoints + [in the documentation](https://cloud.google.com/vertex-ai/docs/predictions/using-private-endpoints) sync (bool): Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will @@ -2399,28 +3107,41 @@ def _deploy( Optional. Target Accelerator Duty Cycle. Must also set accelerator_type and accelerator_count if specified. A default value of 60 will be used if not specified. + Returns: - endpoint ("Endpoint"): + endpoint (Union[Endpoint, PrivateEndpoint]): Endpoint with the deployed model. """ if endpoint is None: display_name = self.display_name[:118] + "_endpoint" - endpoint = Endpoint.create( - display_name=display_name, - project=self.project, - location=self.location, - credentials=self.credentials, - encryption_spec_key_name=encryption_spec_key_name, - ) + + if not network: + endpoint = Endpoint.create( + display_name=display_name, + project=self.project, + location=self.location, + credentials=self.credentials, + encryption_spec_key_name=encryption_spec_key_name, + ) + else: + endpoint = PrivateEndpoint.create( + display_name=display_name, + network=network, + project=self.project, + location=self.location, + credentials=self.credentials, + encryption_spec_key_name=encryption_spec_key_name, + ) _LOGGER.log_action_start_against_resource("Deploying model to", "", endpoint) - Endpoint._deploy_call( + endpoint._deploy_call( endpoint.api_client, endpoint.resource_name, self, endpoint._gca_resource.traffic_split, + network=network, deployed_model_display_name=deployed_model_display_name, traffic_percentage=traffic_percentage, traffic_split=traffic_split, @@ -2460,8 +3181,10 @@ def batch_predict( starting_replica_count: Optional[int] = None, max_replica_count: Optional[int] = None, generate_explanation: Optional[bool] = False, - explanation_metadata: Optional[explain.ExplanationMetadata] = None, - explanation_parameters: Optional[explain.ExplanationParameters] = None, + explanation_metadata: Optional[aiplatform.explain.ExplanationMetadata] = None, + explanation_parameters: Optional[ + aiplatform.explain.ExplanationParameters + ] = None, labels: Optional[Dict[str, str]] = None, credentials: Optional[auth_credentials.Credentials] = None, encryption_spec_key_name: Optional[str] = None, @@ -2475,13 +3198,12 @@ def batch_predict( required. Example usage: - - my_model.batch_predict( - job_display_name="prediction-123", - gcs_source="gs://example-bucket/instances.csv", - instances_format="csv", - bigquery_destination_prefix="projectId.bqDatasetId.bqTableId" - ) + my_model.batch_predict( + job_display_name="prediction-123", + gcs_source="gs://example-bucket/instances.csv", + instances_format="csv", + bigquery_destination_prefix="projectId.bqDatasetId.bqTableId" + ) Args: job_display_name (str): @@ -2583,7 +3305,7 @@ def batch_predict( keyed `explanation`. The value of the entry is a JSON object that conforms to the [aiplatform.gapic.Explanation] object. - `csv`: Generating explanations for CSV format is not supported. - explanation_metadata (explain.ExplanationMetadata): + explanation_metadata (aiplatform.explain.ExplanationMetadata): Optional. Explanation metadata configuration for this BatchPredictionJob. Can be specified only if `generate_explanation` is set to `True`. @@ -2592,7 +3314,7 @@ def batch_predict( a field of the `explanation_metadata` object is not populated, the corresponding field of the `Model.explanation_metadata` object is inherited. For more details, see `Ref docs ` - explanation_parameters (explain.ExplanationParameters): + explanation_parameters (aiplatform.explain.ExplanationParameters): Optional. Parameters to configure explaining for Model's predictions. Can be specified only if `generate_explanation` is set to `True`. @@ -2631,8 +3353,9 @@ def batch_predict( but too high value will result in a whole batch not fitting in a machine's memory, and the whole operation will fail. The default value is 64. + Returns: - (jobs.BatchPredictionJob): + job (jobs.BatchPredictionJob): Instantiated representation of the created batch prediction job. """ @@ -2676,10 +3399,9 @@ def list( """List all Model resource instances. Example Usage: - - aiplatform.Model.list( - filter='labels.my_label="my_label_value" AND display_name="my_model"', - ) + aiplatform.Model.list( + filter='labels.my_label="my_label_value" AND display_name="my_model"', + ) Args: filter (str): @@ -2700,7 +3422,8 @@ def list( credentials set in aiplatform.init. Returns: - List[models.Model] - A list of Model resource objects + List[models.Model]: + A list of Model resource objects """ return cls._list( @@ -2726,17 +3449,17 @@ def export_model( A Model is considered to be exportable if it has at least one `supported_export_formats`. Either `artifact_destination` or `image_destination` must be provided. - Usage: + Example Usage: my_model.export( - export_format_id='tf-saved-model' - artifact_destination='gs://my-bucket/models/' + export_format_id="tf-saved-model", + artifact_destination="gs://my-bucket/models/" ) or my_model.export( - export_format_id='custom-model' - image_destination='us-central1-docker.pkg.dev/projectId/repo/image' + export_format_id="custom-model", + image_destination="us-central1-docker.pkg.dev/projectId/repo/image" ) Args: @@ -2771,13 +3494,14 @@ def export_model( Whether to execute this export synchronously. If False, this method will be executed in concurrent Future and any downstream object will be immediately returned and synced when the Future has completed. + Returns: output_info (Dict[str, str]): Details of the completed export with output destination paths to the artifacts or container image. + Raises: ValueError: If model does not support exporting. - ValueError: If invalid arguments or export formats are provided. """ @@ -2863,8 +3587,10 @@ def upload_xgboost_model_file( instance_schema_uri: Optional[str] = None, parameters_schema_uri: Optional[str] = None, prediction_schema_uri: Optional[str] = None, - explanation_metadata: Optional[explain.ExplanationMetadata] = None, - explanation_parameters: Optional[explain.ExplanationParameters] = None, + explanation_metadata: Optional[aiplatform.explain.ExplanationMetadata] = None, + explanation_parameters: Optional[ + aiplatform.explain.ExplanationParameters + ] = None, project: Optional[str] = None, location: Optional[str] = None, credentials: Optional[auth_credentials.Credentials] = None, @@ -2879,8 +3605,7 @@ def upload_xgboost_model_file( Note: This function is *experimental* and can be changed in the future. - Example usage:: - + Example usage: my_model = Model.upload_xgboost_model_file( model_file_path="iris.xgboost_model.bst" ) @@ -2941,12 +3666,12 @@ def upload_xgboost_model_file( and probably different, including the URI scheme, than the one given on input. The output URI will point to a location where the user only has a read access. - explanation_metadata (explain.ExplanationMetadata): + explanation_metadata (aiplatform.explain.ExplanationMetadata): Optional. Metadata describing the Model's input and output for explanation. Both `explanation_metadata` and `explanation_parameters` must be passed together when used. For more details, see `Ref docs ` - explanation_parameters (explain.ExplanationParameters): + explanation_parameters (aiplatform.explain.ExplanationParameters): Optional. Parameters to configure explaining for Model's predictions. For more details, see `Ref docs ` project: Optional[str]=None, @@ -2984,12 +3709,14 @@ def upload_xgboost_model_file( staging_bucket set in aiplatform.init. upload_request_timeout (float): Optional. The timeout for the upload request in seconds. + Returns: - model: Instantiated representation of the uploaded model resource. + model (aiplatform.Model): + Instantiated representation of the uploaded model resource. + Raises: ValueError: If only `explanation_metadata` or `explanation_parameters` - is specified. - Also if model directory does not contain a supported model file. + is specified. Also if model directory does not contain a supported model file. """ if not display_name: display_name = cls._generate_display_name("XGBoost model") @@ -3065,8 +3792,10 @@ def upload_scikit_learn_model_file( instance_schema_uri: Optional[str] = None, parameters_schema_uri: Optional[str] = None, prediction_schema_uri: Optional[str] = None, - explanation_metadata: Optional[explain.ExplanationMetadata] = None, - explanation_parameters: Optional[explain.ExplanationParameters] = None, + explanation_metadata: Optional[aiplatform.explain.ExplanationMetadata] = None, + explanation_parameters: Optional[ + aiplatform.explain.ExplanationParameters + ] = None, project: Optional[str] = None, location: Optional[str] = None, credentials: Optional[auth_credentials.Credentials] = None, @@ -3081,8 +3810,7 @@ def upload_scikit_learn_model_file( Note: This function is *experimental* and can be changed in the future. - Example usage:: - + Example usage: my_model = Model.upload_scikit_learn_model_file( model_file_path="iris.sklearn_model.joblib" ) @@ -3144,12 +3872,12 @@ def upload_scikit_learn_model_file( and probably different, including the URI scheme, than the one given on input. The output URI will point to a location where the user only has a read access. - explanation_metadata (explain.ExplanationMetadata): + explanation_metadata (aiplatform.explain.ExplanationMetadata): Optional. Metadata describing the Model's input and output for explanation. Both `explanation_metadata` and `explanation_parameters` must be passed together when used. For more details, see `Ref docs ` - explanation_parameters (explain.ExplanationParameters): + explanation_parameters (aiplatform.explain.ExplanationParameters): Optional. Parameters to configure explaining for Model's predictions. For more details, see `Ref docs ` project: Optional[str]=None, @@ -3185,14 +3913,20 @@ def upload_scikit_learn_model_file( staging_bucket (str): Optional. Bucket to stage local model artifacts. Overrides staging_bucket set in aiplatform.init. + sync (bool): + Whether to execute this method synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. upload_request_timeout (float): Optional. The timeout for the upload request in seconds. + Returns: - model: Instantiated representation of the uploaded model resource. + model (aiplatform.Model): + Instantiated representation of the uploaded model resource. + Raises: ValueError: If only `explanation_metadata` or `explanation_parameters` - is specified. - Also if model directory does not contain a supported model file. + is specified. Also if model directory does not contain a supported model file. """ if not display_name: display_name = cls._generate_display_name("Scikit-Learn model") @@ -3267,8 +4001,10 @@ def upload_tensorflow_saved_model( instance_schema_uri: Optional[str] = None, parameters_schema_uri: Optional[str] = None, prediction_schema_uri: Optional[str] = None, - explanation_metadata: Optional[explain.ExplanationMetadata] = None, - explanation_parameters: Optional[explain.ExplanationParameters] = None, + explanation_metadata: Optional[aiplatform.explain.ExplanationMetadata] = None, + explanation_parameters: Optional[ + aiplatform.explain.ExplanationParameters + ] = None, project: Optional[str] = None, location: Optional[str] = None, credentials: Optional[auth_credentials.Credentials] = None, @@ -3283,8 +4019,7 @@ def upload_tensorflow_saved_model( Note: This function is *experimental* and can be changed in the future. - Example usage:: - + Example usage: my_model = Model.upload_scikit_learn_model_file( model_file_path="iris.tensorflow_model.SavedModel" ) @@ -3348,12 +4083,12 @@ def upload_tensorflow_saved_model( and probably different, including the URI scheme, than the one given on input. The output URI will point to a location where the user only has a read access. - explanation_metadata (explain.ExplanationMetadata): + explanation_metadata (aiplatform.explain.ExplanationMetadata): Optional. Metadata describing the Model's input and output for explanation. Both `explanation_metadata` and `explanation_parameters` must be passed together when used. For more details, see `Ref docs ` - explanation_parameters (explain.ExplanationParameters): + explanation_parameters (aiplatform.explain.ExplanationParameters): Optional. Parameters to configure explaining for Model's predictions. For more details, see `Ref docs ` project: Optional[str]=None, @@ -3389,14 +4124,20 @@ def upload_tensorflow_saved_model( staging_bucket (str): Optional. Bucket to stage local model artifacts. Overrides staging_bucket set in aiplatform.init. + sync (bool): + Whether to execute this method synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. upload_request_timeout (float): Optional. The timeout for the upload request in seconds. + Returns: - model: Instantiated representation of the uploaded model resource. + model (aiplatform.Model): + Instantiated representation of the uploaded model resource. + Raises: ValueError: If only `explanation_metadata` or `explanation_parameters` - is specified. - Also if model directory does not contain a supported model file. + is specified. Also if model directory does not contain a supported model file. """ if not display_name: display_name = cls._generate_display_name("Tensorflow model") @@ -3434,16 +4175,15 @@ def list_model_evaluations( """List all Model Evaluation resources associated with this model. Example Usage: + my_model = Model( + model_name="projects/123/locations/us-central1/models/456" + ) - my_model = Model( - model_name="projects/123/locations/us-central1/models/456" - ) - - my_evaluations = my_model.list_model_evaluations() + my_evaluations = my_model.list_model_evaluations() Returns: - List[model_evaluation.ModelEvaluation]: List of ModelEvaluation resources - for the model. + List[model_evaluation.ModelEvaluation]: + List of ModelEvaluation resources for the model. """ self.wait() @@ -3462,7 +4202,6 @@ def get_model_evaluation( with this model. Example usage: - my_model = Model( model_name="projects/123/locations/us-central1/models/456" ) @@ -3471,15 +4210,16 @@ def get_model_evaluation( evaluation_id="789" ) - # If no arguments are passed, this returns the first evaluation for the model + # If no arguments are passed, this method returns the first evaluation for the model my_evaluation = my_model.get_model_evaluation() Args: evaluation_id (str): Optional. The ID of the model evaluation to retrieve. + Returns: - model_evaluation.ModelEvaluation: Instantiated representation of the - ModelEvaluation resource. + model_evaluation.ModelEvaluation: + Instantiated representation of the ModelEvaluation resource. """ evaluations = self.list_model_evaluations() diff --git a/setup.py b/setup.py index a22cd5a226..996f2d6d7d 100644 --- a/setup.py +++ b/setup.py @@ -52,12 +52,15 @@ "pandas >= 1.0.0", "pyarrow >= 6.0.1", ] -pipelines_extra_requires = [ +pipelines_extra_require = [ "pyyaml>=5.3,<6", ] datasets_extra_require = [ "pyarrow >= 3.0.0, < 8.0dev", ] +private_endpoints_extra_require = [ + "urllib3 >=1.21.1, <1.27", +] full_extra_require = list( set( tensorboard_extra_require @@ -65,8 +68,9 @@ + xai_extra_require + lit_extra_require + featurestore_extra_require - + pipelines_extra_requires + + pipelines_extra_require + datasets_extra_require + + private_endpoints_extra_require ) ) testing_extra_require = ( @@ -118,7 +122,9 @@ "xai": xai_extra_require, "lit": lit_extra_require, "cloud_profiler": profiler_extra_require, - "pipelines": pipelines_extra_requires, + "pipelines": pipelines_extra_require, + "datasets": datasets_extra_require, + "private_endpoints": private_endpoints_extra_require, }, python_requires=">=3.7", classifiers=[ diff --git a/tests/system/aiplatform/e2e_base.py b/tests/system/aiplatform/e2e_base.py index 26d67e9b66..9d3bc31a8d 100644 --- a/tests/system/aiplatform/e2e_base.py +++ b/tests/system/aiplatform/e2e_base.py @@ -173,7 +173,6 @@ def tear_down_resources(self, shared_state: Dict[str, Any]): yield # TODO(b/218310362): Add resource deletion system tests - # Bring all Endpoints to the front of the list # Ensures Models are undeployed first before we attempt deletion shared_state["resources"].sort( diff --git a/tests/system/aiplatform/test_private_endpoint.py b/tests/system/aiplatform/test_private_endpoint.py new file mode 100644 index 0000000000..443b5b8e57 --- /dev/null +++ b/tests/system/aiplatform/test_private_endpoint.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- + +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import pytest + +from google.cloud import aiplatform + +from tests.system.aiplatform import e2e_base + +# permanent_custom_mnist_model +_MODEL_ID = "6430031960164270080" +_PRIVATE_ENDPOINT_NETWORK = "projects/580378083368/global/networks/private-endpoint-vpc" + + +@pytest.mark.usefixtures("tear_down_resources") +class TestPrivateEndpoint(e2e_base.TestEndToEnd): + + _temp_prefix = "temp_vertex_sdk_e2e" + + def test_create_deploy_delete_private_endpoint(self, shared_state): + # Collection of resources generated by this test, to be deleted during teardown + shared_state["resources"] = [] + + aiplatform.init( + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + ) + + private_endpoint = aiplatform.PrivateEndpoint.create( + display_name=self._make_display_name("private_endpoint_test"), + network=_PRIVATE_ENDPOINT_NETWORK, + ) + shared_state["resources"].append(private_endpoint) + + # Verify that the retrieved private Endpoint is the same + my_private_endpoint = aiplatform.PrivateEndpoint( + endpoint_name=private_endpoint.resource_name + ) + assert private_endpoint.resource_name == my_private_endpoint.resource_name + assert private_endpoint.display_name == my_private_endpoint.display_name + + # Verify the endpoint is in the private Endpoint list + list_private_endpoint = aiplatform.PrivateEndpoint.list() + assert private_endpoint.resource_name in [ + private_endpoint.resource_name for private_endpoint in list_private_endpoint + ] + + # Retrieve permanent model, deploy to private Endpoint, then undeploy + my_model = aiplatform.Model(model_name=_MODEL_ID) + + my_private_endpoint.deploy(model=my_model) + assert my_private_endpoint._gca_resource.deployed_models + + deployed_model_id = my_private_endpoint.list_models()[0].id + my_private_endpoint.undeploy(deployed_model_id=deployed_model_id) + assert not my_private_endpoint._gca_resource.deployed_models diff --git a/tests/unit/aiplatform/test_endpoints.py b/tests/unit/aiplatform/test_endpoints.py index 50a6c93303..e14c20ece6 100644 --- a/tests/unit/aiplatform/test_endpoints.py +++ b/tests/unit/aiplatform/test_endpoints.py @@ -17,6 +17,8 @@ import copy import pytest +import urllib3 +import json from unittest import mock from importlib import reload @@ -48,6 +50,7 @@ encryption_spec as gca_encryption_spec, ) + _TEST_PROJECT = "test-project" _TEST_PROJECT_2 = "test-project-2" _TEST_LOCATION = "us-central1" @@ -73,6 +76,7 @@ _TEST_MODEL_NAME = ( f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}/models/{_TEST_ID}" ) +_TEST_NETWORK = f"projects/{_TEST_PROJECT}/global/networks/{_TEST_ID}" _TEST_MODEL_ID = "1028944691210842416" _TEST_PREDICTION = [[1.0, 2.0, 3.0], [3.0, 3.0, 1.0]] @@ -177,6 +181,15 @@ ), ] +_TEST_PRIVATE_ENDPOINT_LIST = [ + gca_endpoint.Endpoint( + name=_TEST_ENDPOINT_NAME, + display_name="aac", + create_time=datetime.now() - timedelta(minutes=15), + network=_TEST_NETWORK, + ), +] + _TEST_LIST_FILTER = 'display_name="abc"' _TEST_LIST_ORDER_BY_CREATE_TIME = "create_time desc" _TEST_LIST_ORDER_BY_DISPLAY_NAME = "display_name" @@ -184,6 +197,13 @@ _TEST_LABELS = {"my_key": "my_value"} +""" +---------------------------------------------------------------------------- +Endpoint Fixtures +---------------------------------------------------------------------------- +""" + + @pytest.fixture def get_endpoint_mock(): with mock.patch.object( @@ -421,6 +441,88 @@ def predict_client_explain_mock(): yield predict_mock +""" +---------------------------------------------------------------------------- +Private Endpoint Fixtures +---------------------------------------------------------------------------- +""" + + +@pytest.fixture +def create_private_endpoint_mock(): + with mock.patch.object( + endpoint_service_client.EndpointServiceClient, "create_endpoint" + ) as create_private_endpoint_mock: + create_private_endpoint_lro_mock = mock.Mock(ga_operation.Operation) + create_private_endpoint_lro_mock.result.return_value = gca_endpoint.Endpoint( + name=_TEST_ENDPOINT_NAME, + display_name=_TEST_DISPLAY_NAME, + network=_TEST_NETWORK, + ) + create_private_endpoint_mock.return_value = create_private_endpoint_lro_mock + yield create_private_endpoint_mock + + +@pytest.fixture +def get_private_endpoint_mock(): + with mock.patch.object( + endpoint_service_client.EndpointServiceClient, "get_endpoint" + ) as get_endpoint_mock: + get_endpoint_mock.return_value = gca_endpoint.Endpoint( + display_name=_TEST_DISPLAY_NAME, + name=_TEST_ENDPOINT_NAME, + network=_TEST_NETWORK, + ) + yield get_endpoint_mock + + +@pytest.fixture +def get_private_endpoint_with_model_mock(): + with mock.patch.object( + endpoint_service_client.EndpointServiceClient, "get_endpoint" + ) as get_endpoint_mock: + get_endpoint_mock.return_value = gca_endpoint.Endpoint( + display_name=_TEST_DISPLAY_NAME, + name=_TEST_ENDPOINT_NAME, + network=_TEST_NETWORK, + deployed_models=[_TEST_DEPLOYED_MODELS[0]], + ) + yield get_endpoint_mock + + +@pytest.fixture +def predict_private_endpoint_mock(): + with mock.patch.object(urllib3.PoolManager, "request") as predict_mock: + predict_mock.return_value = urllib3.response.HTTPResponse( + status=200, body=json.dumps({"predictions": _TEST_PREDICTION}) + ) + yield predict_mock + + +@pytest.fixture +def health_check_private_endpoint_mock(): + with mock.patch.object(urllib3.PoolManager, "request") as health_check_mock: + health_check_mock.return_value = urllib3.response.HTTPResponse(status=200) + yield health_check_mock + + +@pytest.fixture +def list_private_endpoints_mock(): + with mock.patch.object( + endpoint_service_client.EndpointServiceClient, "list_endpoints" + ) as list_endpoints_mock: + list_endpoints_mock.return_value = _TEST_PRIVATE_ENDPOINT_LIST + yield list_endpoints_mock + + +@pytest.fixture +def sdk_undeploy_mock(): + """Mocks the high-level PrivateEndpoint.undeploy() SDK method""" + with mock.patch.object(aiplatform.PrivateEndpoint, "undeploy") as sdk_undeploy_mock: + sdk_undeploy_mock.return_value = None + yield sdk_undeploy_mock + + @pytest.mark.usefixtures("google_auth_mock") class TestEndpoint: def setup_method(self): @@ -1465,7 +1567,6 @@ def test_undeploy(self, undeploy_model_mock, sync): endpoint=test_endpoint.resource_name, deployed_model_id="model1", traffic_split={}, - # traffic_split={"model1": 0}, metadata=(), ) @@ -1579,7 +1680,8 @@ def test_undeploy_zero_traffic_model_without_new_traffic_split( metadata=(), ) - def test_predict(self, get_endpoint_mock, predict_client_predict_mock): + @pytest.mark.usefixtures("get_endpoint_mock") + def test_predict(self, predict_client_predict_mock): test_endpoint = models.Endpoint(_TEST_ID) test_prediction = test_endpoint.predict( @@ -1598,7 +1700,8 @@ def test_predict(self, get_endpoint_mock, predict_client_predict_mock): timeout=None, ) - def test_explain(self, get_endpoint_mock, predict_client_explain_mock): + @pytest.mark.usefixtures("get_endpoint_mock") + def test_explain(self, predict_client_explain_mock): test_endpoint = models.Endpoint(_TEST_ID) test_prediction = test_endpoint.explain( @@ -1817,3 +1920,148 @@ def test_delete_endpoint_with_force( sdk_undeploy_all_mock.assert_called_once() delete_endpoint_mock.assert_called_once_with(name=_TEST_ENDPOINT_NAME) + + +class TestPrivateEndpoint(TestEndpoint): + @pytest.mark.parametrize("sync", [True, False]) + def test_create(self, create_private_endpoint_mock, sync): + test_endpoint = models.PrivateEndpoint.create( + display_name=_TEST_DISPLAY_NAME, + project=_TEST_PROJECT, + location=_TEST_LOCATION, + network=_TEST_NETWORK, + sync=sync, + ) + + if not sync: + test_endpoint.wait() + + expected_endpoint = gca_endpoint.Endpoint( + display_name=_TEST_DISPLAY_NAME, network=_TEST_NETWORK + ) + + create_private_endpoint_mock.assert_called_once_with( + parent=_TEST_PARENT, + endpoint=expected_endpoint, + metadata=(), + timeout=None, + endpoint_id=None, + ) + + @pytest.mark.usefixtures("get_private_endpoint_with_model_mock") + def test_predict(self, predict_private_endpoint_mock): + test_endpoint = models.PrivateEndpoint(_TEST_ID) + test_prediction = test_endpoint.predict( + instances=_TEST_INSTANCES, parameters={"param": 3.0} + ) + + true_prediction = models.Prediction( + predictions=_TEST_PREDICTION, deployed_model_id=_TEST_ID + ) + + assert true_prediction == test_prediction + predict_private_endpoint_mock.assert_called_once_with( + method="POST", + url="", + body='{"instances": [[1.0, 2.0, 3.0], [1.0, 3.0, 4.0]]}', + headers={"Content-Type": "application/json"}, + ) + + @pytest.mark.usefixtures("get_private_endpoint_with_model_mock") + def test_health_check(self, health_check_private_endpoint_mock): + test_endpoint = models.PrivateEndpoint(_TEST_ID) + test_health_check = test_endpoint.health_check() + + true_health_check = True + + assert true_health_check == test_health_check + health_check_private_endpoint_mock.assert_called_once_with( + method="GET", url="", body=None, headers=None + ) + + @pytest.mark.usefixtures("get_private_endpoint_mock", "get_model_mock") + @pytest.mark.parametrize("sync", [True, False]) + def test_deploy(self, deploy_model_mock, sync): + test_endpoint = models.PrivateEndpoint(_TEST_ENDPOINT_NAME) + test_model = models.Model(_TEST_ID) + test_model._gca_resource.supported_deployment_resources_types.append( + aiplatform.gapic.Model.DeploymentResourcesType.AUTOMATIC_RESOURCES + ) + test_endpoint.deploy( + test_model, + sync=sync, + ) + + if not sync: + test_endpoint.wait() + + automatic_resources = gca_machine_resources.AutomaticResources( + min_replica_count=1, + max_replica_count=1, + ) + + deployed_model = gca_endpoint.DeployedModel( + automatic_resources=automatic_resources, + model=test_model.resource_name, + display_name=None, + ) + + deploy_model_mock.assert_called_once_with( + endpoint=test_endpoint.resource_name, + deployed_model=deployed_model, + metadata=(), + timeout=None, + traffic_split=None, + ) + + @pytest.mark.usefixtures("get_private_endpoint_with_model_mock") + @pytest.mark.parametrize("sync", [True, False]) + def test_undeploy(self, undeploy_model_mock, sync): + test_endpoint = models.PrivateEndpoint(_TEST_ENDPOINT_NAME) + test_endpoint.undeploy("model1", sync=sync) + + if not sync: + test_endpoint.wait() + + undeploy_model_mock.assert_called_once_with( + endpoint=test_endpoint.resource_name, + deployed_model_id="model1", + metadata=(), + traffic_split={}, + ) + + @pytest.mark.usefixtures("get_private_endpoint_with_model_mock") + @pytest.mark.parametrize("sync", [True, False]) + def test_delete_without_force(self, sdk_undeploy_mock, delete_endpoint_mock, sync): + + test_endpoint = models.PrivateEndpoint(_TEST_ENDPOINT_NAME) + test_endpoint.delete(sync=sync) + + if not sync: + test_endpoint.wait() + + # undeploy() should not be called unless force is set to True + sdk_undeploy_mock.assert_not_called() + + delete_endpoint_mock.assert_called_once_with(name=_TEST_ENDPOINT_NAME) + + @pytest.mark.usefixtures("get_private_endpoint_with_model_mock") + @pytest.mark.parametrize("sync", [True, False]) + def test_delete_with_force(self, sdk_undeploy_mock, delete_endpoint_mock, sync): + + test_endpoint = models.PrivateEndpoint(_TEST_ENDPOINT_NAME) + test_endpoint._gca_resource.deployed_models = [_TEST_DEPLOYED_MODELS[0]] + test_endpoint.delete(sync=sync) + + if not sync: + test_endpoint.wait() + + # undeploy() should not be called unless force is set to True + sdk_undeploy_mock.called_once_with(deployed_model_id=_TEST_ID, sync=sync) + + delete_endpoint_mock.assert_called_once_with(name=_TEST_ENDPOINT_NAME) + + @pytest.mark.usefixtures("list_private_endpoints_mock") + def test_list(self): + ep_list = aiplatform.PrivateEndpoint.list() + assert ep_list # Ensure list is not empty diff --git a/tests/unit/aiplatform/test_models.py b/tests/unit/aiplatform/test_models.py index 23f933128a..20c4e6b909 100644 --- a/tests/unit/aiplatform/test_models.py +++ b/tests/unit/aiplatform/test_models.py @@ -232,6 +232,8 @@ ), ] +_TEST_NETWORK = f"projects/{_TEST_PROJECT}/global/networks/{_TEST_ID}" + @pytest.fixture def mock_model(): @@ -280,7 +282,6 @@ def get_model_mock(): display_name=_TEST_MODEL_NAME, name=_TEST_MODEL_RESOURCE_NAME, ) - yield get_model_mock @@ -1229,6 +1230,38 @@ def test_deploy_raises_with_impartial_explanation_spec(self): assert e.match(regexp=r"`explanation_parameters` should be specified or None.") + @pytest.mark.usefixtures( + "get_endpoint_mock", "get_model_mock", "create_endpoint_mock" + ) + def test_deploy_no_endpoint_with_network(self, deploy_model_mock): + test_model = models.Model(_TEST_ID) + test_model._gca_resource.supported_deployment_resources_types.append( + aiplatform.gapic.Model.DeploymentResourcesType.AUTOMATIC_RESOURCES + ) + + test_endpoint = test_model.deploy(network=_TEST_NETWORK) + # Ensure endpoint created with `network` is a PrivateEndpoint + assert isinstance(test_endpoint, models.PrivateEndpoint) + + automatic_resources = gca_machine_resources.AutomaticResources( + min_replica_count=1, + max_replica_count=1, + ) + deployed_model = gca_endpoint.DeployedModel( + automatic_resources=automatic_resources, + model=test_model.resource_name, + display_name=None, + ) + + # Ensure traffic_split is set to `None` for PrivateEndpoint + deploy_model_mock.assert_called_once_with( + endpoint=test_endpoint.resource_name, + deployed_model=deployed_model, + traffic_split=None, + metadata=(), + timeout=None, + ) + @pytest.mark.parametrize("sync", [True, False]) @pytest.mark.usefixtures("get_model_mock", "get_batch_prediction_job_mock") def test_init_aiplatform_with_encryption_key_name_and_batch_predict_gcs_source_and_dest(