diff --git a/google/cloud/aiplatform/__init__.py b/google/cloud/aiplatform/__init__.py index 73fad5a223..8107756229 100644 --- a/google/cloud/aiplatform/__init__.py +++ b/google/cloud/aiplatform/__init__.py @@ -51,6 +51,7 @@ BatchPredictionJob, CustomJob, HyperparameterTuningJob, + ModelDeploymentMonitoringJob, ) from google.cloud.aiplatform.pipeline_jobs import PipelineJob from google.cloud.aiplatform.tensorboard import ( @@ -70,6 +71,7 @@ AutoMLTextTrainingJob, AutoMLVideoTrainingJob, ) + from google.cloud.aiplatform import helpers """ @@ -124,7 +126,12 @@ "CustomTrainingJob", "CustomContainerTrainingJob", "CustomPythonPackageTrainingJob", + "EmailAlertConfig", "Endpoint", + "DriftDetectionConfig", + "ExplanationConfig", + "ObjectiveConfig", + "SkewDetectionConfig", "EntityType", "Execution", "Experiment", @@ -137,9 +144,12 @@ "HyperparameterTuningJob", "Model", "ModelEvaluation", + "ModelDeploymentMonitoringJob", "PipelineJob", "PrivateEndpoint", + "RandomSampleConfig", "SequenceToSequencePlusForecastingTrainingJob", + "ScheduleConfig", "TabularDataset", "Tensorboard", "TensorboardExperiment", diff --git a/google/cloud/aiplatform/compat/__init__.py b/google/cloud/aiplatform/compat/__init__.py index 02e66ec494..98ecacc5b5 100644 --- a/google/cloud/aiplatform/compat/__init__.py +++ b/google/cloud/aiplatform/compat/__init__.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright 2021 Google LLC +# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -94,6 +94,10 @@ types.model = types.model_v1beta1 types.model_evaluation = types.model_evaluation_v1beta1 types.model_evaluation_slice = types.model_evaluation_slice_v1beta1 + types.model_deployment_monitoring_job = ( + types.model_deployment_monitoring_job_v1beta1 + ) + types.model_monitoring = types.model_monitoring_v1beta1 types.model_service = types.model_service_v1beta1 types.operation = types.operation_v1beta1 types.pipeline_failure_policy = types.pipeline_failure_policy_v1beta1 @@ -179,6 +183,8 @@ types.model = types.model_v1 types.model_evaluation = types.model_evaluation_v1 types.model_evaluation_slice = types.model_evaluation_slice_v1 + types.model_deployment_monitoring_job = types.model_deployment_monitoring_job_v1 + types.model_monitoring = types.model_monitoring_v1 types.model_service = types.model_service_v1 types.operation = types.operation_v1 types.pipeline_failure_policy = types.pipeline_failure_policy_v1 diff --git a/google/cloud/aiplatform/compat/types/__init__.py b/google/cloud/aiplatform/compat/types/__init__.py index 25c7515877..d07f86fcb0 100644 --- a/google/cloud/aiplatform/compat/types/__init__.py +++ b/google/cloud/aiplatform/compat/types/__init__.py @@ -61,7 +61,9 @@ model as model_v1beta1, model_evaluation as model_evaluation_v1beta1, model_evaluation_slice as model_evaluation_slice_v1beta1, + model_deployment_monitoring_job as model_deployment_monitoring_job_v1beta1, model_service as model_service_v1beta1, + model_monitoring as model_monitoring_v1beta1, operation as operation_v1beta1, pipeline_failure_policy as pipeline_failure_policy_v1beta1, pipeline_job as pipeline_job_v1beta1, @@ -125,7 +127,9 @@ model as model_v1, model_evaluation as model_evaluation_v1, model_evaluation_slice as model_evaluation_slice_v1, + model_deployment_monitoring_job as model_deployment_monitoring_job_v1, model_service as model_service_v1, + model_monitoring as model_monitoring_v1, operation as operation_v1, pipeline_failure_policy as pipeline_failure_policy_v1, pipeline_job as pipeline_job_v1, @@ -191,7 +195,9 @@ model_v1, model_evaluation_v1, model_evaluation_slice_v1, + model_deployment_monitoring_job_v1, model_service_v1, + model_monitoring_v1, operation_v1, pipeline_failure_policy_v1beta1, pipeline_job_v1, @@ -254,7 +260,9 @@ model_v1beta1, model_evaluation_v1beta1, model_evaluation_slice_v1beta1, + model_deployment_monitoring_job_v1beta1, model_service_v1beta1, + model_monitoring_v1beta1, operation_v1beta1, pipeline_failure_policy_v1beta1, pipeline_job_v1beta1, diff --git a/google/cloud/aiplatform/jobs.py b/google/cloud/aiplatform/jobs.py index 08c8bda59a..5d1ac4075c 100644 --- a/google/cloud/aiplatform/jobs.py +++ b/google/cloud/aiplatform/jobs.py @@ -27,6 +27,7 @@ from google.auth import credentials as auth_credentials from google.protobuf import duration_pb2 # type: ignore +from google.protobuf import field_mask_pb2 # type: ignore from google.rpc import status_pb2 from google.cloud import aiplatform @@ -36,16 +37,20 @@ completion_stats as gca_completion_stats, custom_job as gca_custom_job_compat, explanation as gca_explanation_compat, + encryption_spec as gca_encryption_spec_compat, io as gca_io_compat, job_state as gca_job_state, hyperparameter_tuning_job as gca_hyperparameter_tuning_job_compat, machine_resources as gca_machine_resources_compat, manual_batch_tuning_parameters as gca_manual_batch_tuning_parameters_compat, study as gca_study_compat, + model_deployment_monitoring_job as gca_model_deployment_monitoring_job_compat, ) + from google.cloud.aiplatform.constants import base as constants from google.cloud.aiplatform import initializer from google.cloud.aiplatform import hyperparameter_tuning +from google.cloud.aiplatform import model_monitoring from google.cloud.aiplatform import utils from google.cloud.aiplatform.utils import console_utils from google.cloud.aiplatform.utils import source_utils @@ -109,10 +114,10 @@ def __init__( Example: "projects/123/locations/us-central1/batchPredictionJobs/456" or "456" when project, location and job_type are initialized or passed. project: Optional[str] = None, - Optional project to retrieve Job subclass from. If not set, + Optional. project to retrieve Job subclass from. If not set, project set in aiplatform.init will be used. location: Optional[str] = None, - Optional location to retrieve Job subclass from. If not set, + Optional. location to retrieve Job subclass from. If not set, location set in aiplatform.init will be used. credentials: Optional[auth_credentials.Credentials] = None, Custom credentials to use. If not set, credentials set in @@ -257,7 +262,7 @@ def list( credentials set in aiplatform.init. Returns: - List[VertexAiResourceNoun] - A list of Job resource objects + List[VertexAiResourceNoun] - A list of Job resource objects. """ return cls._list_with_local_order( @@ -306,10 +311,10 @@ def __init__( Example: "projects/.../locations/.../batchPredictionJobs/456" or "456" when project and location are initialized or passed. project: Optional[str] = None, - Optional project to retrieve BatchPredictionJob from. If not set, + Optional. project to retrieve BatchPredictionJob from. If not set, project set in aiplatform.init will be used. location: Optional[str] = None, - Optional location to retrieve BatchPredictionJob from. If not set, + Optional. location to retrieve BatchPredictionJob from. If not set, location set in aiplatform.init will be used. credentials: Optional[auth_credentials.Credentials] = None, Custom credentials to use. If not set, credentials set in @@ -882,7 +887,7 @@ def __init__( Args: project(str): Project of the resource noun. location(str): The location of the resource noun. - credentials(google.auth.credentials.Credentials): Optional custom + credentials(google.auth.credentials.Credentials): Optional. custom credentials to use when accessing interacting with resource noun. """ @@ -998,10 +1003,10 @@ def get( resource_name (str): Required. A fully-qualified resource name or ID. project (str): - Optional project to retrieve dataset from. If not set, project + Optional. project to retrieve dataset from. If not set, project set in aiplatform.init will be used. location (str): - Optional location to retrieve dataset from. If not set, location + Optional. location to retrieve dataset from. If not set, location set in aiplatform.init will be used. credentials (auth_credentials.Credentials): Custom credentials to use to upload this model. Overrides @@ -1945,3 +1950,508 @@ def run( def trials(self) -> List[gca_study_compat.Trial]: self._assert_gca_resource_is_available() return list(self._gca_resource.trials) + + +class ModelDeploymentMonitoringJob(_Job): + """Vertex AI Model Deployment Monitoring Job. + + This class should be used in conjunction with the Endpoint class + in order to configure model monitoring for deployed models. + """ + + _resource_noun = "modelDeploymentMonitoringJobs" + _getter_method = "get_model_deployment_monitoring_job" + _list_method = "list_model_deployment_monitoring_jobs" + _cancel_method = "cancel_model_deployment_monitoring_jobs" + _delete_method = "delete_model_deployment_monitoring_job" + _job_type = "model-deployment-monitoring" + _parse_resource_name_method = "parse_model_deployment_monitoring_job_path" + _format_resource_name_method = "model_deployment_monitoring_job_path" + + def __init__( + self, + model_deployment_monitoring_job_name: str, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + ): + """Initializer for ModelDeploymentMonitoringJob. + + Args: + model_deployment_monitoring_job_name (str): + Required. A fully-qualified ModelDeploymentMonitoringJob resource name or ID. + Example: "projects/.../locations/.../modelDeploymentMonitoringJobs/456" or + "456" when project and location are initialized or passed. + project: (str), + Optional. project to retrieve ModelDeploymentMonitoringJob from. If not set, + project set in aiplatform.init will be used. + location: (str), + Optional. location to retrieve ModelDeploymentMonitoringJob from. If not set, + location set in aiplatform.init will be used. + credentials: (auth_credentials.Credentials), + Optional. Custom credentials to use. If not set, credentials set in + aiplatform.init will be used. + """ + super().__init__( + job_name=model_deployment_monitoring_job_name, + project=project, + location=location, + credentials=credentials, + ) + self._gca_resource = self._get_gca_resource( + resource_name=model_deployment_monitoring_job_name + ) + + @classmethod + def _parse_configs( + cls, + objective_configs: Union[ + model_monitoring.ObjectiveConfig, + Dict[str, model_monitoring.ObjectiveConfig], + ], + endpoint: "aiplatform.Endpoint", + deployed_model_ids: Optional[List[str]] = None, + ) -> List[ + gca_model_deployment_monitoring_job_compat.ModelDeploymentMonitoringObjectiveConfig + ]: + """Helper function for matching objective configs with their corresponding models. + + Args: + objective_configs (Union[model_monitoring.objective.ObjectiveConfig, + Dict[str, model_monitoring.objective.ObjectiveConfig]): + Required. A single config if it applies to all models, or a dictionary of + model_id: model_monitoring.objective.ObjectiveConfig if + different model IDs have different configs. + endpoint (aiplatform.Endpoint): + Required. A valid instance of aiplatforn.Endpoint to launch the MDM job on. + deployed_model_ids (Optional[List[str]]): + Optional. A list of deployed model IDs to apply the objective config to. + Note that a model will have a deployed_model_id that is different from the + uploaded model ID, and IDs in this list should consist of deployed model IDs + on the same endpoint passed in the argument. If `objective_configs` is a dictionary, + then this parameter is ignored. If `objective_configs` is an instance of + `model_monitoring.ObjectiveConfig` and `deployed_model_ids` is a non-empty + list of valid IDs, then the same objective config will apply to all models in this list. + + Returns: + A List of ModelDeploymentMonitoringObjectiveConfig objects. + + Raises: + ValueError, when the model IDs given are invalid. + RuntimeError, when XAI is enabled on a model that doesn't have XAI parameters configured. + """ + all_models = [] + xai_enabled = [] + for model in endpoint.list_models(): + all_models.append(model.id) + if str(model.explanation_spec.parameters) != "": + xai_enabled.append(model.id) + + all_configs = [] + + ## when same objective config is applied to SOME or ALL models + if deployed_model_ids is not None: + if not all(model in all_models for model in deployed_model_ids): + error_string = ( + "Invalid model ID. The model ID must be one of [" + + ",".join(all_models) + + "]. Note that deployed model IDs are different from the uploaded model's ID" + ) + raise ValueError(error_string) + else: + all_models = deployed_model_ids + + if isinstance(objective_configs, model_monitoring.ObjectiveConfig): + for model in all_models: + if ( + model not in xai_enabled + and objective_configs.explanation_config is not None + ): + raise RuntimeError( + "Invalid config for model ID %s. `explanation_config` should only be enabled if the model has `explanation_spec populated" + % model + ) + all_configs.append( + gca_model_deployment_monitoring_job_compat.ModelDeploymentMonitoringObjectiveConfig( + deployed_model_id=model, + objective_config=objective_configs.as_proto(), + ) + ) + + ## when different objective configs are applied to EACH model + else: + if not all(model in all_models for model in objective_configs.keys()): + error_string = ( + "Invalid model ID. The model ID must be one of [" + + ",".join(all_models) + + "]. Note that deployed model IDs are different from the uploaded model's ID" + ) + raise ValueError(error_string) + for (deployed_model, objective_config) in objective_configs.items(): + if ( + deployed_model not in xai_enabled + and objective_config.explanation_config is not None + ): + raise RuntimeError( + "Invalid config for model ID %s. `explanation_config` should only be enabled if the model has `explanation_spec populated" + % deployed_model + ) + all_configs.append( + gca_model_deployment_monitoring_job_compat.ModelDeploymentMonitoringObjectiveConfig( + deployed_model_id=deployed_model, + objective_config=objective_config.as_proto(), + ) + ) + + return all_configs + + @classmethod + def create( + cls, + endpoint: Union[str, "aiplatform.Endpoint"], + objective_configs: Optional[ + Union[ + model_monitoring.ObjectiveConfig, + Dict[str, model_monitoring.ObjectiveConfig], + ] + ] = None, + logging_sampling_strategy: Optional[model_monitoring.RandomSampleConfig] = None, + schedule_config: Optional[model_monitoring.ScheduleConfig] = None, + display_name: Optional[str] = None, + deployed_model_ids: Optional[List[str]] = None, + alert_config: Optional[model_monitoring.EmailAlertConfig] = None, + predict_instance_schema_uri: Optional[str] = None, + sample_predict_instance: Optional[str] = None, + analysis_instance_schema_uri: Optional[str] = None, + bigquery_tables_log_ttl: Optional[int] = None, + stats_anomalies_base_directory: Optional[str] = None, + enable_monitoring_pipeline_logs: Optional[bool] = None, + labels: Optional[Dict[str, str]] = None, + encryption_spec_key_name: Optional[str] = None, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + create_request_timeout: Optional[float] = None, + ) -> "ModelDeploymentMonitoringJob": + """Creates and launches a model monitoring job. + + Args: + endpoint (Union[str, "aiplatform.Endpoint"]): + Required. Endpoint resource name or an instance of `aiplatform.Endpoint`. Format: + ``projects/{project}/locations/{location}/endpoints/{endpoint}`` + + objective_configs (Union[model_monitoring.ObjectiveConfig, + Dict[str, model_monitoring.ObjectiveConfig]]): + Required. A single config if it applies to all models, or a dictionary of + model_id: model_monitoring.objective.ObjectiveConfig if + different model IDs have different configs. + + logging_sampling_strategy (model_monitoring.sampling.RandomSampleConfig): + Optional. Sample Strategy for logging. + + schedule_config (model_monitoring.schedule.ScheduleConfig): + Optional. Configures model monitoring job scheduling interval in hours. + This defines how often the monitoring jobs are triggered. + + display_name (str): + Optional. The user-defined name of the + ModelDeploymentMonitoringJob. The name can be up + to 128 characters long and can be consist of any + UTF-8 characters. + Display name of a ModelDeploymentMonitoringJob. + + deployed_model_ids (List[str]): + Optional. Use this argument to specify which deployed models to + apply the objective config to. If left unspecified, the same config + will be applied to all deployed models. + + alert_config (model_monitoring.alert.EmailAlertConfig): + Optional. Configures how alerts are sent to the user. Right now + only email alert is supported. + + predict_instance_schema_uri (str): + Optional. YAML schema file uri describing the format of + a single instance, which are given to format + the Endpoint's prediction (and explanation). If + not set, the schema will be generated from + collected predict requests. + + sample_predict_instance (str): + Optional. Sample Predict instance, same format as PredictionRequest.instances, + this can be set as a replacement of predict_instance_schema_uri + If not set, the schema will be generated from collected predict requests. + + analysis_instance_schema_uri (str): + Optional. YAML schema file uri describing the format of a single + instance that you want Tensorflow Data Validation (TFDV) to + analyze. If this field is empty, all the feature data types are + inferred from predict_instance_schema_uri, meaning that TFDV + will use the data in the exact format as prediction request/response. + If there are any data type differences between predict instance + and TFDV instance, this field can be used to override the schema. + For models trained with Vertex AI, this field must be set as all the + fields in predict instance formatted as string. + + bigquery_tables_log_ttl (int): + Optional. The TTL(time to live) of BigQuery tables in user projects + which stores logs. A day is the basic unit of + the TTL and we take the ceil of TTL/86400(a + day). e.g. { second: 3600} indicates ttl = 1 + day. + + stats_anomalies_base_directory (str): + Optional. Stats anomalies base folder path. + + enable_monitoring_pipeline_logs (bool): + Optional. If true, the scheduled monitoring pipeline logs are sent to + Google Cloud Logging, including pipeline status and + anomalies detected. Please note the logs incur cost, which + are subject to `Cloud Logging + pricing `__. + + labels (Dict[str, str]): + Optional. The labels with user-defined metadata to + organize the ModelDeploymentMonitoringJob. + Label keys and values can be no longer than 64 + characters (Unicode codepoints), can only + contain lowercase letters, numeric characters, + underscores and dashes. International characters + are allowed. See https://goo.gl/xmQnxf for more information + and examples of labels. + + encryption_spec_key_name (str): + Optional. Customer-managed encryption key spec for a + ModelDeploymentMonitoringJob. If set, this + ModelDeploymentMonitoringJob and all + sub-resources of this + ModelDeploymentMonitoringJob will be secured by + this key. + + create_request_timeout (int): + Optional. Timeout in seconds for the model monitoring job creation request. + + Returns: + An instance of ModelDeploymentMonitoringJob. + """ + if not display_name: + display_name = cls._generate_display_name() + + utils.validate_display_name(display_name) + + if labels: + utils.validate_labels(labels) + + if stats_anomalies_base_directory: + stats_anomalies_base_directory = gca_io_compat.GcsDestination( + output_uri_prefix=stats_anomalies_base_directory + ) + + if encryption_spec_key_name: + encryption_spec_key_name = gca_encryption_spec_compat.EncryptionSpec( + kms_key_name=encryption_spec_key_name + ) + + if credentials is None and isinstance(endpoint, aiplatform.Endpoint): + credentials = endpoint.credentials + self = cls._empty_constructor( + project=project, location=location, credentials=credentials + ) + + parent = initializer.global_config.common_location_path( + project=self.project, + location=self.location, + ) + + if isinstance(endpoint, str): + endpoint = aiplatform.Endpoint(endpoint, project, location, credentials) + + mdm_objective_config_seq = cls._parse_configs( + objective_configs, + endpoint, + deployed_model_ids, + ) + + gapic_mdm_job = ( + gca_model_deployment_monitoring_job_compat.ModelDeploymentMonitoringJob( + display_name=display_name, + endpoint=endpoint.resource_name, + model_deployment_monitoring_objective_configs=mdm_objective_config_seq, + logging_sampling_strategy=logging_sampling_strategy.as_proto(), + model_deployment_monitoring_schedule_config=schedule_config.as_proto(), + model_monitoring_alert_config=alert_config.as_proto(), + predict_instance_schema_uri=predict_instance_schema_uri, + analysis_instance_schema_uri=analysis_instance_schema_uri, + sample_predict_instance=sample_predict_instance, + stats_anomalies_base_directory=stats_anomalies_base_directory, + enable_monitoring_pipeline_logs=enable_monitoring_pipeline_logs, + labels=labels, + encryption_spec=encryption_spec_key_name, + ) + ) + + _LOGGER.log_create_with_lro(cls) + self._gca_resource = self.api_client.create_model_deployment_monitoring_job( + parent=parent, + model_deployment_monitoring_job=gapic_mdm_job, + timeout=create_request_timeout, + ) + + _LOGGER.log_create_complete(cls, self._gca_resource, "mdm_job") + + _LOGGER.info( + "View Model Deployment Monitoring Job:\n%s" % self._dashboard_uri() + ) + + return self + + @classmethod + def cancel(cls): + raise NotImplementedError( + "Cancel method is not implemented because it is not applicable. A running model deployment monitoring job can be paused or deleted." + ) + + @property + def end_time(self): + _LOGGER.info( + "Model deployment monitoring jobs do not have an end time since their inactive states are either PAUSED or PENDING." + ) + return None + + def update( + self, + *, + display_name: Optional[str] = None, + schedule_config: Optional[model_monitoring.ScheduleConfig] = None, + alert_config: Optional[model_monitoring.EmailAlertConfig] = None, + logging_sampling_strategy: Optional[model_monitoring.RandomSampleConfig] = None, + labels: Optional[Dict[str, str]] = None, + bigquery_tables_log_ttl: Optional[int] = None, + enable_monitoring_pipeline_logs: Optional[bool] = None, + objective_configs: Optional[ + Union[ + model_monitoring.ObjectiveConfig, + Dict[str, model_monitoring.ObjectiveConfig], + ] + ] = None, + deployed_model_ids: Optional[List[str]] = None, + ) -> "ModelDeploymentMonitoringJob": + """Updates an existing ModelDeploymentMonitoringJob. + + Args: + + display_name (str): + Optional. The user-defined name of the + ModelDeploymentMonitoringJob. The name can be up + to 128 characters long and can be consist of any + UTF-8 characters. + Display name of a ModelDeploymentMonitoringJob. + + schedule_config (model_monitoring.schedule.ScheduleConfig): + Required. Configures model monitoring job scheduling interval in hours. + This defines how often the monitoring jobs are triggered. + alert_config (model_monitoring.alert.EmailAlertConfig): + Optional. Configures how alerts are sent to the user. Right now + only email alert is supported. + logging_sampling_strategy (model_monitoring.sampling.RandomSampleConfig): + Required. Sample Strategy for logging. + + labels (Dict[str, str]): + Optional. The labels with user-defined metadata to + organize the ModelDeploymentMonitoringJob. + Label keys and values can be no longer than 64 + characters (Unicode codepoints), can only + contain lowercase letters, numeric characters, + underscores and dashes. International characters + are allowed. See https://goo.gl/xmQnxf for more information + and examples of labels. + bigquery_tables_log_ttl (int): + Optional. The TTL(time to live) of BigQuery tables in user projects + which stores logs. A day is the basic unit of + the TTL and we take the ceil of TTL/86400(a + day). e.g. { second: 3600} indicates ttl = 1 + day. + + enable_monitoring_pipeline_logs (bool): + Optional. If true, the scheduled monitoring pipeline logs are sent to + Google Cloud Logging, including pipeline status and + anomalies detected. Please note the logs incur cost, which + are subject to `Cloud Logging + pricing `__. + + objective_configs (Union[ + Required. model_monitoring.objective.ObjectiveConfig, + Dict[str, model_monitoring.objective.ObjectiveConfig]): + A single config if it applies to all models, or a dictionary of + model_id: model_monitoring.objective.ObjectiveConfig if + different model IDs have different configs. + + deployed_model_ids (List[str]): + Optional. Use this argument to specify which deployed models to + apply the updated objective config to. If left unspecified, the same config + will be applied to all deployed models. + """ + self._sync_gca_resource() + current_job = self.api_client.get_model_deployment_monitoring_job( + name=self._gca_resource.name + ) + update_mask: List[str] = [] + if display_name is not None: + update_mask.append("display_name") + current_job.display_name = display_name + if schedule_config is not None: + update_mask.append("model_deployment_monitoring_schedule_config") + current_job.model_deployment_monitoring_schedule_config = schedule_config + if alert_config is not None: + update_mask.append("model_monitoring_alert_config") + current_job.model_monitoring_alert_config = alert_config + if logging_sampling_strategy is not None: + update_mask.append("logging_sampling_strategy") + current_job.logging_sampling_strategy = logging_sampling_strategy + if labels is not None: + update_mask.append("labels") + current_job.lables = labels + if bigquery_tables_log_ttl is not None: + update_mask.append("log_ttl") + current_job.log_ttl = bigquery_tables_log_ttl + if enable_monitoring_pipeline_logs is not None: + update_mask.append("enable_monitoring_pipeline_logs") + current_job.enable_monitoring_pipeline_logs = ( + enable_monitoring_pipeline_logs + ) + if objective_configs is not None: + update_mask.append("model_deployment_monitoring_objective_configs") + current_job.model_deployment_monitoring_objective_configs = ( + ModelDeploymentMonitoringJob._parse_configs( + objective_configs, + current_job.endpoint, + deployed_model_ids, + ) + ) + if self.state == gca_job_state.JobState.JOB_STATE_RUNNING: + self.api_client.update_model_deployment_monitoring_job( + model_deployment_monitoring_job=current_job, + update_mask=field_mask_pb2.FieldMask(paths=update_mask), + ) + return self + + def pause(self) -> "ModelDeploymentMonitoringJob": + """Pause a running MDM job.""" + if self.state == gca_job_state.JobState.JOB_STATE_RUNNING: + self.api_client.pause_model_deployment_monitoring_job( + name=self._gca_resource.name + ) + return self + + def resume(self) -> "ModelDeploymentMonitoringJob": + """Resumes a paused MDM job.""" + if self.state == gca_job_state.JobState.JOB_STATE_PAUSED: + self.api_client.resume_model_deployment_monitoring_job( + name=self._gca_resource.name + ) + return self + + def delete(self) -> None: + """Deletes an MDM job.""" + self.api_client.delete_model_deployment_monitoring_job( + name=self._gca_resource.name + ) diff --git a/google/cloud/aiplatform/model_monitoring/__init__.py b/google/cloud/aiplatform/model_monitoring/__init__.py new file mode 100644 index 0000000000..c4562ff147 --- /dev/null +++ b/google/cloud/aiplatform/model_monitoring/__init__.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- + +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from google.cloud.aiplatform.model_monitoring.alert import EmailAlertConfig +from google.cloud.aiplatform.model_monitoring.objective import ( + SkewDetectionConfig, + DriftDetectionConfig, + ExplanationConfig, + ObjectiveConfig, +) +from google.cloud.aiplatform.model_monitoring.sampling import RandomSampleConfig +from google.cloud.aiplatform.model_monitoring.schedule import ScheduleConfig + +__all__ = ( + "EmailAlertConfig", + "SkewDetectionConfig", + "DriftDetectionConfig", + "ExplanationConfig", + "ObjectiveConfig", + "RandomSampleConfig", + "ScheduleConfig", +) diff --git a/google/cloud/aiplatform/model_monitoring/alert.py b/google/cloud/aiplatform/model_monitoring/alert.py new file mode 100644 index 0000000000..9eed27ec21 --- /dev/null +++ b/google/cloud/aiplatform/model_monitoring/alert.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- + +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Optional, List +from google.cloud.aiplatform_v1.types import ( + model_monitoring as gca_model_monitoring, +) + + +class EmailAlertConfig: + def __init__( + self, user_emails: List[str] = [], enable_logging: Optional[bool] = False + ): + """Initializer for EmailAlertConfig. + + Args: + user_emails (List[str]): + The email addresses to send the alert to. + enable_logging (bool): + Optional. Defaults to False. Streams detected anomalies to Cloud Logging. The anomalies will be + put into json payload encoded from proto + [google.cloud.aiplatform.logging.ModelMonitoringAnomaliesLogEntry][]. + This can be further sync'd to Pub/Sub or any other services + supported by Cloud Logging. + """ + self.enable_logging = enable_logging + self.user_emails = user_emails + + def as_proto(self): + """Returns EmailAlertConfig as a proto message.""" + user_email_alert_config = ( + gca_model_monitoring.ModelMonitoringAlertConfig.EmailAlertConfig( + user_emails=self.user_emails + ) + ) + return gca_model_monitoring.ModelMonitoringAlertConfig( + email_alert_config=user_email_alert_config, + enable_logging=self.enable_logging, + ) diff --git a/google/cloud/aiplatform/model_monitoring/objective.py b/google/cloud/aiplatform/model_monitoring/objective.py new file mode 100644 index 0000000000..a7800a3485 --- /dev/null +++ b/google/cloud/aiplatform/model_monitoring/objective.py @@ -0,0 +1,362 @@ +# -*- coding: utf-8 -*- + +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Optional, Dict + +from google.cloud.aiplatform_v1.types import ( + io as gca_io, + ThresholdConfig as gca_threshold_config, + model_monitoring as gca_model_monitoring, +) + +TF_RECORD = "tf-record" +CSV = "csv" +JSONL = "jsonl" + + +class _SkewDetectionConfig: + def __init__( + self, + data_source: str, + skew_thresholds: Dict[str, float], + target_field: str, + attribute_skew_thresholds: Dict[str, float], + data_format: Optional[str] = None, + ): + """Base class for training-serving skew detection. + Args: + data_source (str): + Required. Path to training dataset. + + skew_thresholds (Dict[str, float]): + Optional. Key is the feature name and value is the + threshold. If a feature needs to be monitored + for skew, a value threshold must be configured + for that feature. The threshold here is against + feature distribution distance between the + training and prediction feature. + + target_field (str): + Required. The target field name the model is to + predict. This field will be excluded when doing + Predict and (or) Explain for the training data. + + attribute_skew_thresholds (Dict[str, float]): + Optional. Key is the feature name and value is the + threshold. Feature attributions indicate how much + each feature in your model contributed to the + predictions for each given instance. + + data_format (str): + Optional. Data format of the dataset, only applicable + if the input is from Google Cloud Storage. + The possible formats are: + + "tf-record" + The source file is a TFRecord file. + + "csv" + The source file is a CSV file. + + "jsonl" + The source file is a JSONL file. + """ + self.data_source = data_source + self.skew_thresholds = skew_thresholds + self.attribute_skew_thresholds = attribute_skew_thresholds + self.data_format = data_format + self.target_field = target_field + self.training_dataset = None + + def as_proto(self): + """Returns _SkewDetectionConfig as a proto message.""" + skew_thresholds_mapping = {} + attribution_score_skew_thresholds_mapping = {} + if self.skew_thresholds is not None: + for key in self.skew_thresholds.keys(): + skew_threshold = gca_threshold_config(value=self.skew_thresholds[key]) + skew_thresholds_mapping[key] = skew_threshold + if self.attribute_skew_thresholds is not None: + for key in self.attribute_skew_thresholds.keys(): + attribution_score_skew_threshold = gca_threshold_config( + value=self.attribute_skew_thresholds[key] + ) + attribution_score_skew_thresholds_mapping[ + key + ] = attribution_score_skew_threshold + return gca_model_monitoring.ModelMonitoringObjectiveConfig.TrainingPredictionSkewDetectionConfig( + skew_thresholds=skew_thresholds_mapping, + attribution_score_skew_thresholds=attribution_score_skew_thresholds_mapping, + ) + + +class _DriftDetectionConfig: + def __init__( + self, + drift_thresholds: Dict[str, float], + attribute_drift_thresholds: Dict[str, float], + ): + """Base class for prediction drift detection. + Args: + drift_thresholds (Dict[str, float]): + Required. Key is the feature name and value is the + threshold. If a feature needs to be monitored + for drift, a value threshold must be configured + for that feature. The threshold here is against + feature distribution distance between different + time windws. + attribute_drift_thresholds (Dict[str, float]): + Required. Key is the feature name and value is the + threshold. The threshold here is against + attribution score distance between different + time windows. + """ + self.drift_thresholds = drift_thresholds + self.attribute_drift_thresholds = attribute_drift_thresholds + + def as_proto(self): + """Returns drift detection config as a proto message.""" + drift_thresholds_mapping = {} + attribution_score_drift_thresholds_mapping = {} + if self.drift_thresholds is not None: + for key in self.drift_thresholds.keys(): + drift_threshold = gca_threshold_config(value=self.drift_thresholds[key]) + drift_thresholds_mapping[key] = drift_threshold + if self.attribute_drift_thresholds is not None: + for key in self.attribute_drift_thresholds.keys(): + attribution_score_drift_threshold = gca_threshold_config( + value=self.attribute_drift_thresholds[key] + ) + attribution_score_drift_thresholds_mapping[ + key + ] = attribution_score_drift_threshold + return gca_model_monitoring.ModelMonitoringObjectiveConfig.PredictionDriftDetectionConfig( + drift_thresholds=drift_thresholds_mapping, + attribution_score_drift_thresholds=attribution_score_drift_thresholds_mapping, + ) + + +class _ExplanationConfig: + def __init__(self): + """Base class for ExplanationConfig.""" + self.enable_feature_attributes = False + + def as_proto(self): + """Returns _ExplanationConfig as a proto message.""" + return gca_model_monitoring.ModelMonitoringObjectiveConfig.ExplanationConfig( + enable_feature_attributes=self.enable_feature_attributes + ) + + +class _ObjectiveConfig: + def __init__( + self, + skew_detection_config: Optional[ + "gca_model_monitoring._SkewDetectionConfig" + ] = None, + drift_detection_config: Optional[ + "gca_model_monitoring._DriftDetectionConfig" + ] = None, + explanation_config: Optional["gca_model_monitoring._ExplanationConfig"] = None, + ): + """Base class for ObjectiveConfig. + Args: + skew_detection_config (_SkewDetectionConfig): + Optional. An instance of _SkewDetectionConfig. + drift_detection_config (_DriftDetectionConfig): + Optional. An instance of _DriftDetectionConfig. + explanation_config (_ExplanationConfig): + Optional. An instance of _ExplanationConfig. + """ + self.skew_detection_config = skew_detection_config + self.drift_detection_config = drift_detection_config + self.explanation_config = explanation_config + + def as_proto(self): + """Returns _ObjectiveConfig as a proto message.""" + training_dataset = None + if self.skew_detection_config is not None: + training_dataset = self.skew_detection_config.training_dataset + return gca_model_monitoring.ModelMonitoringObjectiveConfig( + training_dataset=training_dataset, + training_prediction_skew_detection_config=self.skew_detection_config.as_proto() + if self.skew_detection_config is not None + else None, + prediction_drift_detection_config=self.drift_detection_config.as_proto() + if self.drift_detection_config is not None + else None, + explanation_config=self.explanation_config.as_proto() + if self.explanation_config is not None + else None, + ) + + +class SkewDetectionConfig(_SkewDetectionConfig): + """A class that configures skew detection for models deployed to an endpoint. + + Training-serving skew occurs when input data in production has a different + distribution than the data used during model training. Model performance + can deteriorate when production data deviates from training data. + """ + + def __init__( + self, + data_source: str, + target_field: str, + skew_thresholds: Optional[Dict[str, float]] = None, + attribute_skew_thresholds: Optional[Dict[str, float]] = None, + data_format: Optional[str] = None, + ): + """Initializer for SkewDetectionConfig. + + Args: + data_source (str): + Required. Path to training dataset. + + target_field (str): + Required. The target field name the model is to + predict. This field will be excluded when doing + Predict and (or) Explain for the training data. + + skew_thresholds (Dict[str, float]): + Optional. Key is the feature name and value is the + threshold. If a feature needs to be monitored + for skew, a value threshold must be configured + for that feature. The threshold here is against + feature distribution distance between the + training and prediction feature. + + attribute_skew_thresholds (Dict[str, float]): + Optional. Key is the feature name and value is the + threshold. Feature attributions indicate how much + each feature in your model contributed to the + predictions for each given instance. + + data_format (str): + Optional. Data format of the dataset, only applicable + if the input is from Google Cloud Storage. + The possible formats are: + + "tf-record" + The source file is a TFRecord file. + + "csv" + The source file is a CSV file. + + "jsonl" + The source file is a JSONL file. + + Raises: + ValueError for unsupported data formats. + """ + super().__init__( + data_source, + skew_thresholds, + target_field, + attribute_skew_thresholds, + data_format, + ) + + training_dataset = ( + gca_model_monitoring.ModelMonitoringObjectiveConfig.TrainingDataset( + target_field=target_field + ) + ) + if data_source.startswith("bq:/"): + training_dataset.bigquery_source = gca_io.BigQuerySource( + input_uri=data_source + ) + elif data_source.startswith("gs:/"): + training_dataset.gcs_source = gca_io.GcsSource(uris=[data_source]) + if data_format is not None and data_format not in [TF_RECORD, CSV, JSONL]: + raise ValueError( + "Unsupported value. `data_format` must be one of %s, %s, or %s" + % (TF_RECORD, CSV, JSONL) + ) + training_dataset.data_format = data_format + else: + training_dataset.dataset = data_source + self.training_dataset = training_dataset + + +class DriftDetectionConfig(_DriftDetectionConfig): + """A class that configures prediction drift detection for models deployed to an endpoint. + + Prediction drift occurs when feature data distribution changes noticeably + over time, and should be set when the original training data is unavailable. + If original training data is available, SkewDetectionConfig should + be set instead. + """ + + def __init__( + self, + drift_thresholds: Optional[Dict[str, float]] = None, + attribute_drift_thresholds: Optional[Dict[str, float]] = None, + ): + """Initializer for DriftDetectionConfig. + + Args: + drift_thresholds (Dict[str, float]): + Optional. Key is the feature name and value is the + threshold. If a feature needs to be monitored + for drift, a value threshold must be configured + for that feature. The threshold here is against + feature distribution distance between different + time windws. + + attribute_drift_thresholds (Dict[str, float]): + Optional. Key is the feature name and value is the + threshold. The threshold here is against + attribution score distance between different + time windows. + """ + super().__init__(drift_thresholds, attribute_drift_thresholds) + + +class ExplanationConfig(_ExplanationConfig): + """A class that enables Vertex Explainable AI. + + Only applicable if the model has explanation_spec populated. By default, explanation config is disabled. Instantiating this class will enable the config. + """ + + def __init__(self): + """Initializer for ExplanationConfig.""" + super().__init__() + self.enable_feature_attributes = True + + +class ObjectiveConfig(_ObjectiveConfig): + """A class that captures skew detection, drift detection, and explanation configs.""" + + def __init__( + self, + skew_detection_config: Optional["SkewDetectionConfig"] = None, + drift_detection_config: Optional["DriftDetectionConfig"] = None, + explanation_config: Optional["ExplanationConfig"] = None, + ): + """Initializer for ObjectiveConfig. + Args: + skew_detection_config (SkewDetectionConfig): + Optional. An instance of SkewDetectionConfig. + drift_detection_config (DriftDetectionConfig): + Optional. An instance of DriftDetectionConfig. + explanation_config (ExplanationConfig): + Optional. An instance of ExplanationConfig. + """ + super().__init__( + skew_detection_config, drift_detection_config, explanation_config + ) diff --git a/google/cloud/aiplatform/model_monitoring/sampling.py b/google/cloud/aiplatform/model_monitoring/sampling.py new file mode 100644 index 0000000000..977d6b5162 --- /dev/null +++ b/google/cloud/aiplatform/model_monitoring/sampling.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- + +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Optional + +from google.cloud.aiplatform_v1.types import ( + model_monitoring as gca_model_monitoring, +) + + +class RandomSampleConfig: + """A class that configures log sampling strategy.""" + + def __init__(self, sample_rate: Optional[float] = 1): + """Initializer for RandomSampleConfig. + + Args: + sample_rate (float): + Optional. Sets the sampling rate for model monitoring logs. + If not set, all logs are processed. + """ + super().__init__() + self.sample_rate = sample_rate + + def as_proto(self): + """Returns RandomSampleConfig as a proto message.""" + return gca_model_monitoring.SamplingStrategy( + random_sample_config=gca_model_monitoring.SamplingStrategy.RandomSampleConfig( + sample_rate=self.sample_rate + ) + ) diff --git a/google/cloud/aiplatform/model_monitoring/schedule.py b/google/cloud/aiplatform/model_monitoring/schedule.py new file mode 100644 index 0000000000..737d6df077 --- /dev/null +++ b/google/cloud/aiplatform/model_monitoring/schedule.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- + +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from google.protobuf import duration_pb2 # type: ignore +from google.cloud.aiplatform_v1.types import ( + model_deployment_monitoring_job as gca_model_deployment_monitoring_job, +) + + +class ScheduleConfig: + """A class that configures model monitoring schedule.""" + + def __init__(self, monitor_interval: int): + """Initializer for ScheduleConfig. + + Args: + monitor_interval (int): + Sets the model monitoring job scheduling interval in hours. + This defines how often the monitoring jobs are triggered. + """ + super().__init__() + self.monitor_interval = monitor_interval + + def as_proto(self): + """Returns ScheduleConfig as a proto message.""" + return ( + gca_model_deployment_monitoring_job.ModelDeploymentMonitoringScheduleConfig( + monitor_interval=duration_pb2.Duration( + seconds=self.monitor_interval * 3600 + ) + ) + ) diff --git a/tests/system/aiplatform/test_model_monitoring.py b/tests/system/aiplatform/test_model_monitoring.py new file mode 100644 index 0000000000..d066bdae62 --- /dev/null +++ b/tests/system/aiplatform/test_model_monitoring.py @@ -0,0 +1,392 @@ +# -*- coding: utf-8 -*- + +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pytest +import time + +from google.cloud import aiplatform +from google.cloud.aiplatform import model_monitoring +from google.cloud.aiplatform.compat.types import job_state as gca_job_state +from google.api_core import exceptions as core_exceptions +from tests.system.aiplatform import e2e_base + +# constants used for testing +USER_EMAIL = "" +MODEL_NAME = "churn" +MODEL_NAME2 = "churn2" +IMAGE = "us-docker.pkg.dev/cloud-aiplatform/prediction/tf2-cpu.2-5:latest" +ENDPOINT = "us-central1-aiplatform.googleapis.com" +CHURN_MODEL_PATH = "gs://mco-mm/churn" +_DEFAULT_INPUT = { + "cnt_ad_reward": 0, + "cnt_challenge_a_friend": 0, + "cnt_completed_5_levels": 1, + "cnt_level_complete_quickplay": 3, + "cnt_level_end_quickplay": 5, + "cnt_level_reset_quickplay": 2, + "cnt_level_start_quickplay": 6, + "cnt_post_score": 34, + "cnt_spend_virtual_currency": 0, + "cnt_use_extra_steps": 0, + "cnt_user_engagement": 120, + "country": "Denmark", + "dayofweek": 3, + "julianday": 254, + "language": "da-dk", + "month": 9, + "operating_system": "IOS", + "user_pseudo_id": "104B0770BAE16E8B53DF330C95881893", +} +JOB_NAME = "churn" + +# Sampling rate (optional, default=.8) +LOG_SAMPLE_RATE = 0.8 + +# Monitoring Interval in hours +MONITOR_INTERVAL = 1 + +# URI to training dataset. +DATASET_BQ_URI = "bq://mco-mm.bqmlga4.train" + +# Prediction target column name in training dataset. +TARGET = "churned" + +# Skew and drift thresholds. +DEFAULT_THRESHOLD_VALUE = 0.001 +SKEW_DEFAULT_THRESHOLDS = { + "country": DEFAULT_THRESHOLD_VALUE, + "cnt_user_engagement": DEFAULT_THRESHOLD_VALUE, +} +SKEW_CUSTOM_THRESHOLDS = {"cnt_level_start_quickplay": 0.01} +DRIFT_DEFAULT_THRESHOLDS = { + "country": DEFAULT_THRESHOLD_VALUE, + "cnt_user_engagement": DEFAULT_THRESHOLD_VALUE, +} +DRIFT_CUSTOM_THRESHOLDS = {"cnt_level_start_quickplay": 0.01} +ATTRIB_SKEW_DEFAULT_THRESHOLDS = { + "country": DEFAULT_THRESHOLD_VALUE, + "cnt_user_engagement": DEFAULT_THRESHOLD_VALUE, +} +ATTRIB_SKEW_CUSTOM_THRESHOLDS = {"cnt_level_start_quickplay": 0.01} +ATTRIB_DRIFT_DEFAULT_THRESHOLDS = { + "country": DEFAULT_THRESHOLD_VALUE, + "cnt_user_engagement": DEFAULT_THRESHOLD_VALUE, +} +ATTRIB_DRIFT_CUSTOM_THRESHOLDS = {"cnt_level_start_quickplay": 0.01} + +skew_thresholds = SKEW_DEFAULT_THRESHOLDS.copy() +skew_thresholds.update(SKEW_CUSTOM_THRESHOLDS) +drift_thresholds = DRIFT_DEFAULT_THRESHOLDS.copy() +drift_thresholds.update(DRIFT_CUSTOM_THRESHOLDS) +attrib_skew_thresholds = ATTRIB_SKEW_DEFAULT_THRESHOLDS.copy() +attrib_skew_thresholds.update(ATTRIB_SKEW_CUSTOM_THRESHOLDS) +attrib_drift_thresholds = ATTRIB_DRIFT_DEFAULT_THRESHOLDS.copy() +attrib_drift_thresholds.update(ATTRIB_DRIFT_CUSTOM_THRESHOLDS) + +# global test constants +sampling_strategy = model_monitoring.RandomSampleConfig(sample_rate=LOG_SAMPLE_RATE) + +alert_config = model_monitoring.EmailAlertConfig( + user_emails=[USER_EMAIL], enable_logging=True +) + +schedule_config = model_monitoring.ScheduleConfig(monitor_interval=MONITOR_INTERVAL) + +skew_config = model_monitoring.SkewDetectionConfig( + data_source=DATASET_BQ_URI, + skew_thresholds=skew_thresholds, + attribute_skew_thresholds=attrib_skew_thresholds, + target_field=TARGET, +) + +drift_config = model_monitoring.DriftDetectionConfig( + drift_thresholds=drift_thresholds, + attribute_drift_thresholds=attrib_drift_thresholds, +) + +drift_config2 = model_monitoring.DriftDetectionConfig( + drift_thresholds=drift_thresholds, + attribute_drift_thresholds=ATTRIB_DRIFT_DEFAULT_THRESHOLDS, +) + +objective_config = model_monitoring.ObjectiveConfig(skew_config, drift_config) + +objective_config2 = model_monitoring.ObjectiveConfig(skew_config, drift_config2) + + +@pytest.mark.usefixtures("tear_down_resources") +class TestModelDeploymentMonitoring(e2e_base.TestEndToEnd): + _temp_prefix = "temp_vertex_sdk_e2e_model_monitoring_test" + + def temp_endpoint(self, shared_state): + aiplatform.init( + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + ) + + model = aiplatform.Model.upload( + display_name=MODEL_NAME, + artifact_uri=CHURN_MODEL_PATH, + serving_container_image_uri=IMAGE, + ) + shared_state["resources"] = [model] + endpoint = model.deploy(machine_type="n1-standard-2") + predict_response = endpoint.predict(instances=[_DEFAULT_INPUT]) + assert len(predict_response.predictions) == 1 + shared_state["resources"].append(endpoint) + return endpoint + + def temp_endpoint_with_two_models(self, shared_state): + aiplatform.init( + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + ) + + model1 = aiplatform.Model.upload( + display_name=MODEL_NAME, + artifact_uri=CHURN_MODEL_PATH, + serving_container_image_uri=IMAGE, + ) + + model2 = aiplatform.Model.upload( + display_name=MODEL_NAME2, + artifact_uri=CHURN_MODEL_PATH, + serving_container_image_uri=IMAGE, + ) + shared_state["resources"] = [model1, model2] + endpoint = aiplatform.Endpoint.create() + endpoint.deploy( + model=model1, machine_type="n1-standard-2", traffic_percentage=100 + ) + endpoint.deploy( + model=model2, machine_type="n1-standard-2", traffic_percentage=30 + ) + predict_response = endpoint.predict(instances=[_DEFAULT_INPUT]) + assert len(predict_response.predictions) == 1 + shared_state["resources"].append(endpoint) + return endpoint + + def test_mdm_one_model_one_valid_config(self, shared_state): + """ + Upload pre-trained churn model from local file and deploy it for prediction. + """ + # test model monitoring configurations + temp_endpoint = self.temp_endpoint(shared_state) + + job = None + + job = aiplatform.ModelDeploymentMonitoringJob.create( + display_name=JOB_NAME, + logging_sampling_strategy=sampling_strategy, + schedule_config=schedule_config, + alert_config=alert_config, + objective_configs=objective_config, + create_request_timeout=3600, + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + endpoint=temp_endpoint, + predict_instance_schema_uri="", + analysis_instance_schema_uri="", + ) + assert job is not None + + gapic_job = job._gca_resource + assert ( + gapic_job.logging_sampling_strategy.random_sample_config.sample_rate + == LOG_SAMPLE_RATE + ) + assert gapic_job.display_name == JOB_NAME + assert ( + gapic_job.model_deployment_monitoring_schedule_config.monitor_interval.seconds + == MONITOR_INTERVAL * 3600 + ) + assert ( + gapic_job.model_monitoring_alert_config.email_alert_config.user_emails + == [USER_EMAIL] + ) + assert gapic_job.model_monitoring_alert_config.enable_logging + + gca_obj_config = gapic_job.model_deployment_monitoring_objective_configs[ + 0 + ].objective_config + assert gca_obj_config.training_dataset == skew_config.training_dataset + assert ( + gca_obj_config.training_prediction_skew_detection_config + == skew_config.as_proto() + ) + assert ( + gca_obj_config.prediction_drift_detection_config == drift_config.as_proto() + ) + + job_resource = job._gca_resource.name + + # test job update, pause, resume, and delete() + timeout = time.time() + 3600 + new_obj_config = model_monitoring.ObjectiveConfig(skew_config) + + while time.time() < timeout: + if job.state == gca_job_state.JobState.JOB_STATE_RUNNING: + job.update(objective_configs=new_obj_config) + assert str(job._gca_resource.prediction_drift_detection_config) == "" + break + time.sleep(5) + while time.time() < timeout: + if job.state == gca_job_state.JobState.JOB_STATE_RUNNING: + job.pause() + assert job.state == gca_job_state.JobState.JOB_STATE_PAUSED + break + time.sleep(5) + + while time.time() < timeout: + if job.state == gca_job_state.JobState.JOB_STATE_RUNNING: + break + if job.state == gca_job_state.JobState.JOB_STATE_PAUSED: + job.resume() + time.sleep(5) + job.delete() + with pytest.raises(core_exceptions.NotFound): + job.api_client.get_model_deployment_monitoring_job(name=job_resource) + + def test_mdm_two_models_two_valid_configs(self, shared_state): + temp_endpoint_with_two_models = self.temp_endpoint_with_two_models(shared_state) + [deployed_model1, deployed_model2] = list( + map(lambda x: x.id, temp_endpoint_with_two_models.list_models()) + ) + all_configs = { + deployed_model1: objective_config, + deployed_model2: objective_config2, + } + job = None + job = aiplatform.ModelDeploymentMonitoringJob.create( + display_name=JOB_NAME, + logging_sampling_strategy=sampling_strategy, + schedule_config=schedule_config, + alert_config=alert_config, + objective_configs=all_configs, + create_request_timeout=3600, + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + endpoint=temp_endpoint_with_two_models, + predict_instance_schema_uri="", + analysis_instance_schema_uri="", + ) + assert job is not None + + gapic_job = job._gca_resource + assert ( + gapic_job.logging_sampling_strategy.random_sample_config.sample_rate + == LOG_SAMPLE_RATE + ) + assert gapic_job.display_name == JOB_NAME + assert ( + gapic_job.model_deployment_monitoring_schedule_config.monitor_interval.seconds + == MONITOR_INTERVAL * 3600 + ) + assert ( + gapic_job.model_monitoring_alert_config.email_alert_config.user_emails + == [USER_EMAIL] + ) + assert gapic_job.model_monitoring_alert_config.enable_logging + + for config in gapic_job.model_deployment_monitoring_objective_configs: + gca_obj_config = config.objective_config + deployed_model_id = config.deployed_model_id + assert ( + gca_obj_config.training_dataset + == all_configs[deployed_model_id].skew_detection_config.training_dataset + ) + assert ( + gca_obj_config.training_prediction_skew_detection_config + == all_configs[deployed_model_id].skew_detection_config.as_proto() + ) + assert ( + gca_obj_config.prediction_drift_detection_config + == all_configs[deployed_model_id].drift_detection_config.as_proto() + ) + + job.delete() + + def test_mdm_invalid_config_incorrect_model_id(self, shared_state): + temp_endpoint = self.temp_endpoint(shared_state) + with pytest.raises(ValueError) as e: + aiplatform.ModelDeploymentMonitoringJob.create( + display_name=JOB_NAME, + logging_sampling_strategy=sampling_strategy, + schedule_config=schedule_config, + alert_config=alert_config, + objective_configs=objective_config, + create_request_timeout=3600, + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + endpoint=temp_endpoint, + predict_instance_schema_uri="", + analysis_instance_schema_uri="", + deployed_model_ids=[""], + ) + assert "Invalid model ID" in str(e.value) + + def test_mdm_invalid_config_xai(self, shared_state): + temp_endpoint = self.temp_endpoint(shared_state) + with pytest.raises(RuntimeError) as e: + objective_config.explanation_config = model_monitoring.ExplanationConfig() + aiplatform.ModelDeploymentMonitoringJob.create( + display_name=JOB_NAME, + logging_sampling_strategy=sampling_strategy, + schedule_config=schedule_config, + alert_config=alert_config, + objective_configs=objective_config, + create_request_timeout=3600, + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + endpoint=temp_endpoint, + predict_instance_schema_uri="", + analysis_instance_schema_uri="", + ) + assert ( + "`explanation_config` should only be enabled if the model has `explanation_spec populated" + in str(e.value) + ) + + def test_mdm_two_models_invalid_configs_xai(self, shared_state): + temp_endpoint_with_two_models = self.temp_endpoint_with_two_models(shared_state) + [deployed_model1, deployed_model2] = list( + map(lambda x: x.id, temp_endpoint_with_two_models.list_models()) + ) + objective_config.explanation_config = model_monitoring.ExplanationConfig() + all_configs = { + deployed_model1: objective_config, + deployed_model2: objective_config2, + } + with pytest.raises(RuntimeError) as e: + objective_config.explanation_config = model_monitoring.ExplanationConfig() + aiplatform.ModelDeploymentMonitoringJob.create( + display_name=JOB_NAME, + logging_sampling_strategy=sampling_strategy, + schedule_config=schedule_config, + alert_config=alert_config, + objective_configs=all_configs, + create_request_timeout=3600, + project=e2e_base._PROJECT, + location=e2e_base._LOCATION, + endpoint=temp_endpoint_with_two_models, + predict_instance_schema_uri="", + analysis_instance_schema_uri="", + ) + assert ( + "`explanation_config` should only be enabled if the model has `explanation_spec populated" + in str(e.value) + ) diff --git a/tests/unit/aiplatform/test_model_monitoring.py b/tests/unit/aiplatform/test_model_monitoring.py new file mode 100644 index 0000000000..a29aa060db --- /dev/null +++ b/tests/unit/aiplatform/test_model_monitoring.py @@ -0,0 +1,112 @@ +# -*- coding: utf-8 -*- + +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pytest + +from google.cloud.aiplatform import model_monitoring + +_TEST_THRESHOLD = 0.1 +_TEST_TARGET_FIELD = "target" +_TEST_BQ_DATASOURCE = "bq://test/data" +_TEST_GCS_DATASOURCE = "gs://test/data" +_TEST_OTHER_DATASOURCE = "" +_TEST_KEY = "key" +_TEST_EMAIL1 = "test1" +_TEST_EMAIL2 = "test2" +_TEST_VALID_DATA_FORMATS = ["tf-record", "csv", "jsonl"] +_TEST_SAMPLING_RATE = 0.8 +_TEST_MONITORING_INTERVAL = 1 + + +class TestModelMonitoringConfigs: + @pytest.mark.parametrize( + "data_source", + [_TEST_BQ_DATASOURCE, _TEST_GCS_DATASOURCE, _TEST_OTHER_DATASOURCE], + ) + @pytest.mark.parametrize("data_format", _TEST_VALID_DATA_FORMATS) + def test_valid_configs(self, data_source, data_format): + random_sample_config = model_monitoring.RandomSampleConfig( + sample_rate=_TEST_SAMPLING_RATE + ) + + schedule_config = model_monitoring.ScheduleConfig( + monitor_interval=_TEST_MONITORING_INTERVAL + ) + + alert_config = model_monitoring.EmailAlertConfig( + user_emails=[_TEST_EMAIL1, _TEST_EMAIL2] + ) + + prediction_drift_config = model_monitoring.DriftDetectionConfig( + drift_thresholds={_TEST_KEY: _TEST_THRESHOLD} + ) + + skew_config = model_monitoring.SkewDetectionConfig( + data_source=data_source, + skew_thresholds={_TEST_KEY: _TEST_THRESHOLD}, + target_field=_TEST_TARGET_FIELD, + attribute_skew_thresholds={_TEST_KEY: _TEST_THRESHOLD}, + data_format=data_format, + ) + + xai_config = model_monitoring.ExplanationConfig() + + objective_config = model_monitoring.ObjectiveConfig( + skew_detection_config=skew_config, + drift_detection_config=prediction_drift_config, + explanation_config=xai_config, + ) + + assert ( + objective_config.as_proto().training_dataset == skew_config.training_dataset + ) + assert ( + objective_config.as_proto().training_prediction_skew_detection_config + == skew_config.as_proto() + ) + assert ( + objective_config.as_proto().prediction_drift_detection_config + == prediction_drift_config.as_proto() + ) + assert objective_config.as_proto().explanation_config == xai_config.as_proto() + assert _TEST_EMAIL1 in alert_config.as_proto().email_alert_config.user_emails + assert _TEST_EMAIL2 in alert_config.as_proto().email_alert_config.user_emails + assert ( + random_sample_config.as_proto().random_sample_config.sample_rate + == _TEST_SAMPLING_RATE + ) + assert ( + schedule_config.as_proto().monitor_interval.seconds + == _TEST_MONITORING_INTERVAL * 3600 + ) + + @pytest.mark.parametrize("data_source", [_TEST_GCS_DATASOURCE]) + @pytest.mark.parametrize("data_format", ["other"]) + def test_invalid_data_format(self, data_source, data_format): + if data_format == "other": + with pytest.raises(ValueError) as e: + model_monitoring.SkewDetectionConfig( + data_source=data_source, + skew_thresholds={_TEST_KEY: _TEST_THRESHOLD}, + target_field=_TEST_TARGET_FIELD, + attribute_skew_thresholds={_TEST_KEY: _TEST_THRESHOLD}, + data_format=data_format, + ) + assert ( + "Unsupported value. `data_format` must be one of tf-record, csv, or jsonl" + in str(e.value) + )