Skip to content

Commit

Permalink
feat: add get_associated_experiment method to pipeline_jobs (#1476)
Browse files Browse the repository at this point in the history
* feat: add get_associated_experiment method to pipeline_jobs

* updates from reviewer feedback

* clean up system test

* re-add check for experiment schema title
  • Loading branch information
sararob authored Jul 7, 2022
1 parent 23a8a27 commit e9f2c3c
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 0 deletions.
42 changes: 42 additions & 0 deletions google/cloud/aiplatform/pipeline_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from typing import Any, Dict, List, Optional, Union

from google.auth import credentials as auth_credentials
from google.cloud import aiplatform
from google.cloud.aiplatform import base
from google.cloud.aiplatform import initializer
from google.cloud.aiplatform import utils
Expand Down Expand Up @@ -770,3 +771,44 @@ def clone(
)

return cloned

def get_associated_experiment(self) -> Optional["aiplatform.Experiment"]:
"""Gets the aiplatform.Experiment associated with this PipelineJob,
or None if this PipelineJob is not associated with an experiment.
Returns:
An aiplatform.Experiment resource or None if this PipelineJob is
not associated with an experiment..
"""

pipeline_parent_contexts = (
self._gca_resource.job_detail.pipeline_run_context.parent_contexts
)

pipeline_experiment_resources = [
context._Context(resource_name=c)._gca_resource
for c in pipeline_parent_contexts
if c != self._gca_resource.job_detail.pipeline_context.name
]

pipeline_experiment_resource_names = []

for c in pipeline_experiment_resources:
if c.schema_title == metadata_constants.SYSTEM_EXPERIMENT:
pipeline_experiment_resource_names.append(c.name)

if len(pipeline_experiment_resource_names) > 1:
_LOGGER.warning(
f"There is more than one Experiment is associated with this pipeline."
f"The following experiments were found: {pipeline_experiment_resource_names.join(', ')}\n"
f"Returning only the following experiment: {pipeline_experiment_resource_names[0]}"
)

if len(pipeline_experiment_resource_names) >= 1:
return experiment_resources.Experiment(
pipeline_experiment_resource_names[0],
project=self.project,
location=self.location,
credentials=self.credentials,
)
4 changes: 4 additions & 0 deletions tests/system/aiplatform/test_experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ def pipeline(learning_rate: float, dropout_rate: float):

job.wait()

test_experiment = job.get_associated_experiment()

assert test_experiment.name == self._experiment_name

def test_get_experiments_df(self):
aiplatform.init(
project=e2e_base._PROJECT,
Expand Down
191 changes: 191 additions & 0 deletions tests/unit/aiplatform/test_pipeline_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
from google.cloud import aiplatform
from google.cloud.aiplatform import base
from google.cloud.aiplatform import initializer
from google.cloud.aiplatform_v1 import Context as GapicContext
from google.cloud.aiplatform_v1 import MetadataStore as GapicMetadataStore
from google.cloud.aiplatform.metadata import constants
from google.cloud.aiplatform_v1 import MetadataServiceClient
from google.cloud.aiplatform import pipeline_jobs
from google.cloud.aiplatform.compat.types import pipeline_failure_policy
from google.cloud import storage
Expand Down Expand Up @@ -188,6 +192,22 @@
)
_TEST_PIPELINE_CREATE_TIME = datetime.now()

# experiments
_TEST_EXPERIMENT = "test-experiment"

_TEST_METADATASTORE = (
f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}/metadataStores/default"
)
_TEST_CONTEXT_ID = _TEST_EXPERIMENT
_TEST_CONTEXT_NAME = f"{_TEST_METADATASTORE}/contexts/{_TEST_CONTEXT_ID}"

_EXPERIMENT_MOCK = GapicContext(
name=_TEST_CONTEXT_NAME,
schema_title=constants.SYSTEM_EXPERIMENT,
schema_version=constants.SCHEMA_VERSIONS[constants.SYSTEM_EXPERIMENT],
metadata={**constants.EXPERIMENT_METADATA},
)


@pytest.fixture
def mock_pipeline_service_create():
Expand Down Expand Up @@ -303,6 +323,90 @@ def mock_request_urlopen(job_spec):
yield mock_urlopen


# experiment mocks
@pytest.fixture
def get_metadata_store_mock():
with patch.object(
MetadataServiceClient, "get_metadata_store"
) as get_metadata_store_mock:
get_metadata_store_mock.return_value = GapicMetadataStore(
name=_TEST_METADATASTORE,
)
yield get_metadata_store_mock


@pytest.fixture
def get_experiment_mock():
with patch.object(MetadataServiceClient, "get_context") as get_context_mock:
get_context_mock.return_value = _EXPERIMENT_MOCK
yield get_context_mock


@pytest.fixture
def add_context_children_mock():
with patch.object(
MetadataServiceClient, "add_context_children"
) as add_context_children_mock:
yield add_context_children_mock


@pytest.fixture
def list_contexts_mock():
with patch.object(MetadataServiceClient, "list_contexts") as list_contexts_mock:
list_contexts_mock.return_value = [_EXPERIMENT_MOCK]
yield list_contexts_mock


@pytest.fixture
def create_experiment_run_context_mock():
with patch.object(MetadataServiceClient, "create_context") as create_context_mock:
create_context_mock.side_effect = [_EXPERIMENT_MOCK]
yield create_context_mock


def make_pipeline_job_with_experiment(state):
return gca_pipeline_job.PipelineJob(
name=_TEST_PIPELINE_JOB_NAME,
state=state,
create_time=_TEST_PIPELINE_CREATE_TIME,
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
job_detail=gca_pipeline_job.PipelineJobDetail(
pipeline_run_context=gca_context.Context(
name=_TEST_PIPELINE_JOB_NAME,
parent_contexts=[_TEST_CONTEXT_NAME],
),
),
)


@pytest.fixture
def mock_create_pipeline_job_with_experiment():
with mock.patch.object(
pipeline_service_client.PipelineServiceClient, "create_pipeline_job"
) as mock_pipeline_with_experiment:
mock_pipeline_with_experiment.return_value = make_pipeline_job_with_experiment(
gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED
)
yield mock_pipeline_with_experiment


@pytest.fixture
def mock_get_pipeline_job_with_experiment():
with mock.patch.object(
pipeline_service_client.PipelineServiceClient, "get_pipeline_job"
) as mock_pipeline_with_experiment:
mock_pipeline_with_experiment.side_effect = [
make_pipeline_job_with_experiment(
gca_pipeline_state.PipelineState.PIPELINE_STATE_RUNNING
),
make_pipeline_job_with_experiment(
gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED
),
]
yield mock_pipeline_with_experiment


@pytest.mark.usefixtures("google_auth_mock")
class TestPipelineJob:
def setup_method(self):
Expand Down Expand Up @@ -1384,3 +1488,90 @@ def test_clone_pipeline_job_with_all_args(
assert cloned._gca_resource == make_pipeline_job(
gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED
)

@pytest.mark.parametrize(
"job_spec",
[_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB],
)
def test_get_associated_experiment_from_pipeline_returns_none_without_experiment(
self,
mock_pipeline_service_create,
mock_pipeline_service_get,
job_spec,
mock_load_yaml_and_json,
):
aiplatform.init(
project=_TEST_PROJECT,
staging_bucket=_TEST_GCS_BUCKET_NAME,
location=_TEST_LOCATION,
credentials=_TEST_CREDENTIALS,
)

job = pipeline_jobs.PipelineJob(
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
template_path=_TEST_TEMPLATE_PATH,
job_id=_TEST_PIPELINE_JOB_ID,
parameter_values=_TEST_PIPELINE_PARAMETER_VALUES,
enable_caching=True,
)

job.submit(
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
create_request_timeout=None,
)

job.wait()

test_experiment = job.get_associated_experiment()

assert test_experiment is None

@pytest.mark.parametrize(
"job_spec",
[_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB],
)
def test_get_associated_experiment_from_pipeline_returns_experiment(
self,
job_spec,
mock_load_yaml_and_json,
add_context_children_mock,
get_experiment_mock,
create_experiment_run_context_mock,
get_metadata_store_mock,
mock_create_pipeline_job_with_experiment,
mock_get_pipeline_job_with_experiment,
):
aiplatform.init(
project=_TEST_PROJECT,
staging_bucket=_TEST_GCS_BUCKET_NAME,
location=_TEST_LOCATION,
credentials=_TEST_CREDENTIALS,
)

test_experiment = aiplatform.Experiment(_TEST_EXPERIMENT)

job = pipeline_jobs.PipelineJob(
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
template_path=_TEST_TEMPLATE_PATH,
job_id=_TEST_PIPELINE_JOB_ID,
parameter_values=_TEST_PIPELINE_PARAMETER_VALUES,
enable_caching=True,
)

assert get_experiment_mock.call_count == 1

job.submit(
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
create_request_timeout=None,
experiment=test_experiment,
)

job.wait()

associated_experiment = job.get_associated_experiment()

assert associated_experiment.resource_name == _TEST_CONTEXT_NAME

assert add_context_children_mock.call_count == 1

0 comments on commit e9f2c3c

Please sign in to comment.