Skip to content

Commit

Permalink
fix: fix bug when checking PipelineJob failure status
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 538299878
  • Loading branch information
connor-mccarthy authored and copybara-github committed Jun 6, 2023
1 parent 76465e2 commit a154859
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 8 deletions.
9 changes: 1 addition & 8 deletions google/cloud/aiplatform/pipeline_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,13 +628,6 @@ def done(self) -> bool:

return self.state in _PIPELINE_COMPLETE_STATES

def _has_failed(self) -> bool:
"""Return True if PipelineJob has Failed."""
if not self._gca_resource:
return False

return self.state in _PIPELINE_ERROR_STATES

def _get_context(self) -> context.Context:
"""Returns the PipelineRun Context for this PipelineJob in the MetadataStore.
Expand All @@ -655,7 +648,7 @@ def _get_context(self) -> context.Context:
time.sleep(1)

if not pipeline_run_context:
if self._has_failed:
if self.has_failed:
raise RuntimeError(
f"Cannot associate PipelineJob to Experiment: {self.gca_resource.error}"
)
Expand Down
55 changes: 55 additions & 0 deletions tests/unit/aiplatform/test_pipeline_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1567,6 +1567,61 @@ def test_pipeline_failure_raises(self, mock_load_yaml_and_json, sync):
if not sync:
job.wait()

@pytest.mark.usefixtures(
"mock_pipeline_service_create",
"mock_pipeline_service_get_with_fail",
"mock_pipeline_bucket_exists",
)
@pytest.mark.parametrize(
"job_spec",
[_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB],
)
def test_pipeline_job_has_failed_property(self, mock_load_yaml_and_json):
aiplatform.init(
project=_TEST_PROJECT,
staging_bucket=_TEST_GCS_BUCKET_NAME,
location=_TEST_LOCATION,
credentials=_TEST_CREDENTIALS,
)

job = pipeline_jobs.PipelineJob(
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
template_path=_TEST_TEMPLATE_PATH,
job_id=_TEST_PIPELINE_JOB_ID,
parameter_values=_TEST_PIPELINE_PARAMETER_VALUES,
enable_caching=True,
)

job.submit(
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
)

assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_RUNNING
assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_RUNNING
assert job.has_failed

@pytest.mark.parametrize(
"job_spec",
[_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB],
)
def test_pipeline_job_has_failed_property_with_no_submit(
self, mock_load_yaml_and_json
):
job = pipeline_jobs.PipelineJob(
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
template_path=_TEST_TEMPLATE_PATH,
job_id=_TEST_PIPELINE_JOB_ID,
parameter_values=_TEST_PIPELINE_PARAMETER_VALUES,
enable_caching=True,
)

with pytest.raises(
RuntimeError,
match=r"PipelineJob resource has not been created\.",
):
assert job.has_failed

@pytest.mark.parametrize(
"job_spec",
[_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB],
Expand Down

0 comments on commit a154859

Please sign in to comment.