Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into feature/batch-predi…
Browse files Browse the repository at this point in the history
…ction/service-account
  • Loading branch information
cymarechal-devoteam committed Jan 11, 2023
2 parents 553e04c + a6a792e commit 4f015f3
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 19 deletions.
2 changes: 2 additions & 0 deletions google/cloud/aiplatform/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,8 @@ def _construct_sdk_resource_from_gapic(
location=endpoint.location,
credentials=credentials,
)
endpoint.authorized_session = None
endpoint.raw_predict_request_url = None

return endpoint

Expand Down
4 changes: 0 additions & 4 deletions google/cloud/aiplatform/utils/gcs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,6 @@ def generate_gcs_directory_for_pipeline_artifacts(
"""Gets or creates the GCS directory for Vertex Pipelines artifacts.
Args:
service_account: Optional. Google Cloud service account that will be used
to run the pipelines. If this function creates a new bucket it will give
permission to the specified service account to access the bucket.
If not provided, the Google Cloud Compute Engine service account will be used.
project: Optional. Google Cloud Project that contains the staging bucket.
location: Optional. Google Cloud location to use for the staging bucket.
Expand Down
6 changes: 3 additions & 3 deletions tests/system/aiplatform/test_batch_prediction.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@

class TestBatchPredictionJob(e2e_base.TestEndToEnd):
_temp_prefix = "temp_e2e_batch_prediction_test_"
aiplatform.init(project=e2e_base._PROJECT, location=e2e_base._LOCATION)
model = aiplatform.Model(_PERMANENT_CHURN_MODEL_ID)

def test_model_monitoring(self):
aiplatform.init(project=e2e_base._PROJECT, location=e2e_base._LOCATION)
model = aiplatform.Model(_PERMANENT_CHURN_MODEL_ID)
skew_detection_config = aiplatform.model_monitoring.SkewDetectionConfig(
data_source=_PERMANENT_CHURN_TRAINING_DATA,
target_field="churned",
Expand All @@ -62,7 +62,7 @@ def test_model_monitoring(self):

bpj = aiplatform.BatchPredictionJob.create(
job_display_name=self._make_display_name(key=_TEST_JOB_DISPLAY_NAME),
model_name=self.model,
model_name=model,
gcs_source=_PERMANENT_CHURN_TESTING_DATA,
gcs_destination_prefix=_PERMANENT_CHURN_GS_DEST,
machine_type=_TEST_MACHINE_TYPE,
Expand Down
45 changes: 36 additions & 9 deletions tests/system/aiplatform/test_model_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

# constants used for testing
USER_EMAIL = "[email protected]"
PERMANENT_CHURN_ENDPOINT_ID = "1843089351408353280"
PERMANENT_CHURN_MODEL_ID = "5295507484113371136"
CHURN_MODEL_PATH = "gs://mco-mm/churn"
DEFAULT_INPUT = {
"cnt_ad_reward": 0,
Expand Down Expand Up @@ -117,15 +117,26 @@
objective_config2 = model_monitoring.ObjectiveConfig(skew_config, drift_config2)


@pytest.mark.usefixtures("tear_down_resources")
class TestModelDeploymentMonitoring(e2e_base.TestEndToEnd):
_temp_prefix = "temp_e2e_model_monitoring_test_"
aiplatform.init(project=e2e_base._PROJECT, location=e2e_base._LOCATION)
endpoint = aiplatform.Endpoint(PERMANENT_CHURN_ENDPOINT_ID)

def test_mdm_two_models_one_valid_config(self):
def test_create_endpoint(self, shared_state):
# initial setup
aiplatform.init(project=e2e_base._PROJECT, location=e2e_base._LOCATION)
self.endpoint = aiplatform.Endpoint.create(self._make_display_name("endpoint"))
shared_state["resources"] = [self.endpoint]
self.model = aiplatform.Model(PERMANENT_CHURN_MODEL_ID)
self.endpoint.deploy(self.model)
self.endpoint.deploy(self.model, traffic_percentage=50)

def test_mdm_two_models_one_valid_config(self, shared_state):
"""
Enable model monitoring on two existing models deployed to the same endpoint.
"""
assert len(shared_state["resources"]) == 1
self.endpoint = shared_state["resources"][0]
aiplatform.init(project=e2e_base._PROJECT, location=e2e_base._LOCATION)
# test model monitoring configurations
job = aiplatform.ModelDeploymentMonitoringJob.create(
display_name=self._make_display_name(key=JOB_NAME),
Expand Down Expand Up @@ -153,6 +164,7 @@ def test_mdm_two_models_one_valid_config(self):
== [USER_EMAIL]
)
assert gapic_job.model_monitoring_alert_config.enable_logging
assert len(gapic_job.model_deployment_monitoring_objective_configs) == 2

gca_obj_config = gapic_job.model_deployment_monitoring_objective_configs[
0
Expand Down Expand Up @@ -181,8 +193,11 @@ def test_mdm_two_models_one_valid_config(self):
with pytest.raises(core_exceptions.NotFound):
job.api_client.get_model_deployment_monitoring_job(name=job_resource)

def test_mdm_pause_and_update_config(self):
def test_mdm_pause_and_update_config(self, shared_state):
"""Test objective config updates for existing MDM job"""
assert len(shared_state["resources"]) == 1
self.endpoint = shared_state["resources"][0]
aiplatform.init(project=e2e_base._PROJECT, location=e2e_base._LOCATION)
job = aiplatform.ModelDeploymentMonitoringJob.create(
display_name=self._make_display_name(key=JOB_NAME),
logging_sampling_strategy=sampling_strategy,
Expand Down Expand Up @@ -245,7 +260,10 @@ def test_mdm_pause_and_update_config(self):
with pytest.raises(core_exceptions.NotFound):
job.state

def test_mdm_two_models_two_valid_configs(self):
def test_mdm_two_models_two_valid_configs(self, shared_state):
assert len(shared_state["resources"]) == 1
self.endpoint = shared_state["resources"][0]
aiplatform.init(project=e2e_base._PROJECT, location=e2e_base._LOCATION)
[deployed_model1, deployed_model2] = list(
map(lambda x: x.id, self.endpoint.list_models())
)
Expand Down Expand Up @@ -302,7 +320,10 @@ def test_mdm_two_models_two_valid_configs(self):

job.delete()

def test_mdm_invalid_config_incorrect_model_id(self):
def test_mdm_invalid_config_incorrect_model_id(self, shared_state):
assert len(shared_state["resources"]) == 1
self.endpoint = shared_state["resources"][0]
aiplatform.init(project=e2e_base._PROJECT, location=e2e_base._LOCATION)
with pytest.raises(ValueError) as e:
aiplatform.ModelDeploymentMonitoringJob.create(
display_name=self._make_display_name(key=JOB_NAME),
Expand All @@ -318,7 +339,10 @@ def test_mdm_invalid_config_incorrect_model_id(self):
)
assert "Invalid model ID" in str(e.value)

def test_mdm_invalid_config_xai(self):
def test_mdm_invalid_config_xai(self, shared_state):
assert len(shared_state["resources"]) == 1
self.endpoint = shared_state["resources"][0]
aiplatform.init(project=e2e_base._PROJECT, location=e2e_base._LOCATION)
with pytest.raises(RuntimeError) as e:
objective_config.explanation_config = model_monitoring.ExplanationConfig()
aiplatform.ModelDeploymentMonitoringJob.create(
Expand All @@ -337,7 +361,10 @@ def test_mdm_invalid_config_xai(self):
in str(e.value)
)

def test_mdm_two_models_invalid_configs_xai(self):
def test_mdm_two_models_invalid_configs_xai(self, shared_state):
assert len(shared_state["resources"]) == 1
self.endpoint = shared_state["resources"][0]
aiplatform.init(project=e2e_base._PROJECT, location=e2e_base._LOCATION)
[deployed_model1, deployed_model2] = list(
map(lambda x: x.id, self.endpoint.list_models())
)
Expand Down
72 changes: 69 additions & 3 deletions tests/unit/aiplatform/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from google.cloud.aiplatform import compat, utils
from google.cloud.aiplatform.compat.types import pipeline_failure_policy
from google.cloud.aiplatform.utils import (
gcs_utils,
pipeline_utils,
prediction_utils,
tensorboard_utils,
Expand All @@ -52,9 +53,10 @@
model_service_client_default = model_service_client_v1


GCS_BUCKET = "FAKE_BUCKET"
GCS_PREFIX = "FAKE/PREFIX"
FAKE_FILENAME = "FAKE_FILENAME"
GCS_BUCKET = "fake-bucket"
GCS_PREFIX = "fake/prefix"
FAKE_FILENAME = "fake-filename"
EXPECTED_TIME = datetime.datetime(2023, 1, 6, 8, 54, 41, 734495)


@pytest.fixture
Expand All @@ -78,6 +80,31 @@ def get_blobs(prefix):
yield mock_storage_client


@pytest.fixture()
def mock_datetime():
with patch.object(datetime, "datetime", autospec=True) as mock_datetime:
mock_datetime.now.return_value = EXPECTED_TIME
yield mock_datetime


@pytest.fixture
def mock_storage_blob_upload_from_filename():
with patch(
"google.cloud.storage.Blob.upload_from_filename"
) as mock_blob_upload_from_filename, patch(
"google.cloud.storage.Bucket.exists", return_value=True
):
yield mock_blob_upload_from_filename


@pytest.fixture()
def mock_bucket_not_exist():
with patch("google.cloud.storage.Blob.from_string") as mock_bucket_not_exist, patch(
"google.cloud.storage.Bucket.exists", return_value=False
):
yield mock_bucket_not_exist


def test_invalid_region_raises_with_invalid_region():
with pytest.raises(ValueError):
aiplatform.utils.validate_region(region="us-east5")
Expand Down Expand Up @@ -458,6 +485,45 @@ def test_timestamped_unique_name():
assert re.match(r"\d{4}-\d{2}-\d{2}-\d{2}-\d{2}-\d{2}-.{5}", name)


@pytest.mark.usefixtures("google_auth_mock")
class TestGcsUtils:
def test_upload_to_gcs(self, json_file, mock_storage_blob_upload_from_filename):
gcs_utils.upload_to_gcs(json_file, f"gs://{GCS_BUCKET}/{GCS_PREFIX}")
assert mock_storage_blob_upload_from_filename.called_once_with(json_file)

def test_stage_local_data_in_gcs(
self, json_file, mock_datetime, mock_storage_blob_upload_from_filename
):
timestamp = EXPECTED_TIME.isoformat(sep="-", timespec="milliseconds")
staging_gcs_dir = f"gs://{GCS_BUCKET}/{GCS_PREFIX}"
data_uri = gcs_utils.stage_local_data_in_gcs(json_file, staging_gcs_dir)
assert mock_storage_blob_upload_from_filename.called_once_with(json_file)
assert (
data_uri
== f"{staging_gcs_dir}/vertex_ai_auto_staging/{timestamp}/test.json"
)

def test_generate_gcs_directory_for_pipeline_artifacts(self):
output = gcs_utils.generate_gcs_directory_for_pipeline_artifacts(
"project", "us-central1"
)
assert output == "gs://project-vertex-pipelines-us-central1/output_artifacts/"

def test_create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist(
self, mock_bucket_not_exist, mock_storage_client
):
output = (
gcs_utils.create_gcs_bucket_for_pipeline_artifacts_if_it_does_not_exist(
project="test-project", location="us-central1"
)
)
assert mock_storage_client.called
assert mock_bucket_not_exist.called
assert (
output == "gs://test-project-vertex-pipelines-us-central1/output_artifacts/"
)


class TestPipelineUtils:
SAMPLE_JOB_SPEC = {
"pipelineSpec": {
Expand Down

0 comments on commit 4f015f3

Please sign in to comment.