Skip to content

Commit

Permalink
added test coverage for model monitoring config for two models
Browse files Browse the repository at this point in the history
  • Loading branch information
rosiezou committed Jun 28, 2022
1 parent 9755549 commit 4de17a6
Showing 1 changed file with 160 additions and 81 deletions.
241 changes: 160 additions & 81 deletions tests/system/aiplatform/test_model_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# limitations under the License.
#

import random
import pytest
import time

Expand All @@ -27,6 +26,7 @@
# constants used for testing
USER_EMAIL = ""
MODEL_NAME = "churn"
MODEL_NAME2 = "churn2"
IMAGE = "us-docker.pkg.dev/cloud-aiplatform/prediction/tf2-cpu.2-5:latest"
ENDPOINT = "us-central1-aiplatform.googleapis.com"
CHURN_MODEL_PATH = "gs://mco-mm/churn"
Expand Down Expand Up @@ -63,8 +63,6 @@

# Prediction target column name in training dataset.
TARGET = "churned"
MDM_JOB_PREFIX = "modelDeploymentMonitoringJobs"
RESOURCE_ID = str(random.randint(10000000, 99999999))

# Skew and drift thresholds.
DEFAULT_THRESHOLD_VALUE = 0.001
Expand Down Expand Up @@ -98,61 +96,50 @@
attrib_drift_thresholds = ATTRIB_DRIFT_DEFAULT_THRESHOLDS.copy()
attrib_drift_thresholds.update(ATTRIB_DRIFT_CUSTOM_THRESHOLDS)

# Sampling distributions for categorical features...
DAYOFWEEK = {1: 1040, 2: 1223, 3: 1352, 4: 1217, 5: 1078, 6: 1011, 7: 1110}
LANGUAGE = {
"en-us": 4807,
"en-gb": 678,
"ja-jp": 419,
"en-au": 310,
"en-ca": 299,
"de-de": 147,
"en-in": 130,
"en": 127,
"fr-fr": 94,
"pt-br": 81,
"es-us": 65,
"zh-tw": 64,
"zh-hans-cn": 55,
"es-mx": 53,
"nl-nl": 37,
"fr-ca": 34,
"en-za": 29,
"vi-vn": 29,
"en-nz": 29,
"es-es": 25,
}
OS = {"IOS": 3980, "ANDROID": 3798, "null": 253}
MONTH = {6: 3125, 7: 1838, 8: 1276, 9: 1718, 10: 74}
COUNTRY = {
"United States": 4395,
"India": 486,
"Japan": 450,
"Canada": 354,
"Australia": 327,
"United Kingdom": 303,
"Germany": 144,
"Mexico": 102,
"France": 97,
"Brazil": 93,
"Taiwan": 72,
"China": 65,
"Saudi Arabia": 49,
"Pakistan": 48,
"Egypt": 46,
"Netherlands": 45,
"Vietnam": 42,
"Philippines": 39,
"South Africa": 38,
}
# global test constants
sampling_strategy = aiplatform.model_monitoring.RandomSampleConfig(
sample_rate=LOG_SAMPLE_RATE
)

alert_config = aiplatform.model_monitoring.EmailAlertConfig(
user_emails=[USER_EMAIL], enable_logging=True
)

schedule_config = aiplatform.model_monitoring.ScheduleConfig(
monitor_interval=MONITOR_INTERVAL
)

skew_config = aiplatform.model_monitoring.EndpointSkewDetectionConfig(
data_source=DATASET_BQ_URI,
skew_thresholds=skew_thresholds,
attribute_skew_thresholds=attrib_skew_thresholds,
target_field=TARGET,
)

drift_config = aiplatform.model_monitoring.EndpointDriftDetectionConfig(
drift_thresholds=drift_thresholds,
attribute_drift_thresholds=attrib_drift_thresholds,
)

drift_config2 = aiplatform.model_monitoring.EndpointDriftDetectionConfig(
drift_thresholds=drift_thresholds,
attribute_drift_thresholds=ATTRIB_DRIFT_DEFAULT_THRESHOLDS,
)

objective_config = aiplatform.model_monitoring.EndpointObjectiveConfig(
skew_config, drift_config
)

objective_config2 = aiplatform.model_monitoring.EndpointObjectiveConfig(
skew_config, drift_config2
)


@pytest.mark.usefixtures("tear_down_resources")
class TestModelDeploymentMonitoring(e2e_base.TestEndToEnd):
_temp_prefix = "temp_vertex_sdk_e2e_model_monitoring_test"

@pytest.fixture()
def temp_endpoint(self):
def temp_endpoint(self, shared_state):
aiplatform.init(
project=e2e_base._PROJECT,
location=e2e_base._LOCATION,
Expand All @@ -163,46 +150,51 @@ def temp_endpoint(self):
artifact_uri=CHURN_MODEL_PATH,
serving_container_image_uri=IMAGE,
)

shared_state["resources"] = [model]
endpoint = model.deploy(machine_type="n1-standard-2")
predict_response = endpoint.predict(instances=[_DEFAULT_INPUT])
assert len(predict_response.predictions) == 1
yield endpoint

def test_mdm_one_model_one_valid_config(self, temp_endpoint):
"""
Upload pre-trained churn model from local file and deploy it for prediction.
Then launch a model monitoring job and generate artificial traffic.
"""
# test model monitoring configurations
job = None
shared_state["resources"].append(endpoint)
return endpoint

sampling_strategy = aiplatform.model_monitoring.RandomSampleConfig(
sample_rate=LOG_SAMPLE_RATE
def temp_endpoint_with_two_models(self, shared_state):
aiplatform.init(
project=e2e_base._PROJECT,
location=e2e_base._LOCATION,
)

alert_config = aiplatform.model_monitoring.EmailAlertConfig(
user_emails=[USER_EMAIL], enable_logging=True
model1 = aiplatform.Model.upload(
display_name=MODEL_NAME,
artifact_uri=CHURN_MODEL_PATH,
serving_container_image_uri=IMAGE,
)

schedule_config = aiplatform.model_monitoring.ScheduleConfig(
monitor_interval=MONITOR_INTERVAL
model2 = aiplatform.Model.upload(
display_name=MODEL_NAME2,
artifact_uri=CHURN_MODEL_PATH,
serving_container_image_uri=IMAGE,
)

skew_config = aiplatform.model_monitoring.EndpointSkewDetectionConfig(
data_source=DATASET_BQ_URI,
skew_thresholds=skew_thresholds,
attribute_skew_thresholds=attrib_skew_thresholds,
target_field=TARGET,
shared_state["resources"] = [model1, model2]
endpoint = aiplatform.Endpoint.create()
endpoint.deploy(
model=model1, machine_type="n1-standard-2", traffic_percentage=100
)
drift_config = aiplatform.model_monitoring.EndpointDriftDetectionConfig(
drift_thresholds=drift_thresholds,
attribute_drift_thresholds=attrib_drift_thresholds,
endpoint.deploy(
model=model2, machine_type="n1-standard-2", traffic_percentage=30
)
predict_response = endpoint.predict(instances=[_DEFAULT_INPUT])
assert len(predict_response.predictions) == 1
shared_state["resources"].append(endpoint)
return endpoint

objective_config = aiplatform.model_monitoring.EndpointObjectiveConfig(
skew_config, drift_config
)
def test_mdm_one_model_one_valid_config(self, shared_state):
"""
Upload pre-trained churn model from local file and deploy it for prediction.
"""
# test model monitoring configurations
temp_endpoint = self.temp_endpoint(shared_state)

job = None

job = aiplatform.ModelDeploymentMonitoringJob.create(
display_name=JOB_NAME,
Expand All @@ -219,7 +211,34 @@ def test_mdm_one_model_one_valid_config(self, temp_endpoint):
analysis_instance_schema_uri="",
)
assert job is not None
assert job.display_name == JOB_NAME

gapic_job = job._gca_resource
assert (
gapic_job.logging_sampling_strategy.random_sample_config.sample_rate
== LOG_SAMPLE_RATE
)
assert gapic_job.display_name == JOB_NAME
assert (
gapic_job.model_deployment_monitoring_schedule_config.monitor_interval.seconds
== MONITOR_INTERVAL
)
assert (
gapic_job.model_monitoring_alert_config.email_alert_config.user_emails
== [USER_EMAIL]
)
assert gapic_job.model_monitoring_alert_config.enable_logging

gca_obj_config = gapic_job.model_deployment_monitoring_objective_configs[
0
].objective_config
assert gca_obj_config.training_dataset == skew_config.training_dataset
assert (
gca_obj_config.training_prediction_skew_detection_config
== skew_config.as_proto()
)
assert (
gca_obj_config.prediction_drift_detection_config == drift_config.as_proto()
)

job_resource = job._gca_resource.name

Expand Down Expand Up @@ -247,3 +266,63 @@ def test_mdm_one_model_one_valid_config(self, temp_endpoint):
job.delete()
with pytest.raises(core_exceptions.NotFound):
job.api_client.get_model_deployment_monitoring_job(name=job_resource)

def test_mdm_two_models_one_valid_config(self, shared_state):
temp_endpoint_with_two_models = self.temp_endpoint_with_two_models(shared_state)
[deployed_model1, deployed_model2] = list(
map(lambda x: x.id, temp_endpoint_with_two_models.list_models())
)
all_configs = {
deployed_model1: objective_config,
deployed_model2: objective_config2,
}
job = None
job = aiplatform.ModelDeploymentMonitoringJob.create(
display_name=JOB_NAME,
logging_sampling_strategy=sampling_strategy,
monitor_interval=MONITOR_INTERVAL,
schedule_config=schedule_config,
alert_config=alert_config,
objective_configs=objective_config,
timeout=3600,
project=e2e_base._PROJECT,
location=e2e_base._LOCATION,
endpoint=temp_endpoint_with_two_models,
predict_instance_schema_uri="",
analysis_instance_schema_uri="",
)
assert job is not None

gapic_job = job._gca_resource
assert (
gapic_job.logging_sampling_strategy.random_sample_config.sample_rate
== LOG_SAMPLE_RATE
)
assert gapic_job.display_name == JOB_NAME
assert (
gapic_job.model_deployment_monitoring_schedule_config.monitor_interval.seconds
== MONITOR_INTERVAL
)
assert (
gapic_job.model_monitoring_alert_config.email_alert_config.user_emails
== [USER_EMAIL]
)
assert gapic_job.model_monitoring_alert_config.enable_logging

for config in gapic_job.model_deployment_monitoring_objective_configs:
gca_obj_config = config.objective_config
deployed_model_id = config.deployed_model_id
assert (
gca_obj_config.training_dataset
== all_configs[deployed_model_id].skew_detection_config.training_dataset
)
assert (
gca_obj_config.training_prediction_skew_detection_config
== all_configs[deployed_model_id].skew_detection_config.as_proto()
)
assert (
gca_obj_config.prediction_drift_detection_config
== all_configs[deployed_model_id].drift_detection_config.as_proto()
)

job.delete()

0 comments on commit 4de17a6

Please sign in to comment.