Skip to content

Commit

Permalink
fix: Fix bug where scheduled pipeline jobs were not running.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 542024449
  • Loading branch information
vertex-sdk-bot authored and copybara-github committed Jun 20, 2023
1 parent a7d92e5 commit 4e7d11a
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def __init__(
"parent": self._parent,
"pipeline_job": {
"runtime_config": runtime_config,
"pipeline_spec": {"fields": pipeline_job.pipeline_spec},
"pipeline_spec": pipeline_job.pipeline_spec,
},
}
pipeline_job_schedule_args = {
Expand Down
21 changes: 16 additions & 5 deletions tests/system/aiplatform/test_pipeline_job_schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
#

from google.cloud import aiplatform
from google.cloud.aiplatform.compat.types import schedule_v1beta1 as gca_schedule
from google.cloud.aiplatform.preview.pipelinejobschedule import pipeline_job_schedules
from google.cloud.aiplatform.compat.types import (
schedule_v1beta1 as gca_schedule,
)
from google.cloud.aiplatform.preview.pipelinejobschedule import (
pipeline_job_schedules,
)
from tests.system.aiplatform import e2e_base

from kfp import components
Expand Down Expand Up @@ -61,7 +65,7 @@ def training_pipeline(number_of_epochs: int = 2):
compiler.Compiler().compile(
pipeline_func=training_pipeline,
package_path=ir_file,
pipeline_name="training-pipeline",
pipeline_name="system-test-training-pipeline",
)
job = aiplatform.PipelineJob(
template_path=ir_file,
Expand All @@ -72,21 +76,28 @@ def training_pipeline(number_of_epochs: int = 2):
pipeline_job=job, display_name="pipeline_job_schedule_display_name"
)

pipeline_job_schedule.create(cron_expression="*/2 * * * *", max_run_count=2)
max_run_count = 2
pipeline_job_schedule.create(
cron_expression="*/5 * * * *",
max_run_count=max_run_count,
max_concurrent_run_count=2,
)

shared_state.setdefault("resources", []).append(pipeline_job_schedule)

pipeline_job_schedule.pause()
assert pipeline_job_schedule.state == gca_schedule.Schedule.State.PAUSED

pipeline_job_schedule.resume()
pipeline_job_schedule.resume(catch_up=True)
assert pipeline_job_schedule.state == gca_schedule.Schedule.State.ACTIVE

pipeline_job_schedule.wait()

list_jobs_with_read_mask = pipeline_job_schedule.list_jobs(
enable_simple_view=True
)
assert len(list_jobs_with_read_mask) == max_run_count

list_jobs_without_read_mask = pipeline_job_schedule.list_jobs()

# enable_simple_view=True should apply the `read_mask` filter to limit PipelineJob fields returned
Expand Down
22 changes: 15 additions & 7 deletions tests/unit/aiplatform/test_pipeline_job_schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from datetime import datetime
from importlib import reload
import json
from typing import Any, Dict
from unittest import mock
from unittest.mock import patch
from urllib import request
Expand Down Expand Up @@ -48,6 +49,7 @@
import pytest
import yaml

from google.protobuf import struct_pb2
from google.protobuf import json_format

_TEST_PROJECT = "test-project"
Expand Down Expand Up @@ -405,6 +407,12 @@ def mock_request_urlopen(job_spec):
yield mock_urlopen


def dict_to_struct(d: Dict[str, Any]) -> struct_pb2.Struct:
s = struct_pb2.Struct()
s.update(d)
return s


@pytest.mark.usefixtures("google_auth_mock")
class TestPipelineJobSchedule:
def setup_method(self):
Expand Down Expand Up @@ -481,7 +489,7 @@ def test_call_schedule_service_create(
"parent": _TEST_PARENT,
"pipeline_job": {
"runtime_config": runtime_config,
"pipeline_spec": {"fields": pipeline_spec},
"pipeline_spec": dict_to_struct(pipeline_spec),
"service_account": _TEST_SERVICE_ACCOUNT,
"network": _TEST_NETWORK,
},
Expand Down Expand Up @@ -565,7 +573,7 @@ def test_call_schedule_service_create_with_different_timezone(
"parent": _TEST_PARENT,
"pipeline_job": {
"runtime_config": runtime_config,
"pipeline_spec": {"fields": pipeline_spec},
"pipeline_spec": dict_to_struct(pipeline_spec),
"service_account": _TEST_SERVICE_ACCOUNT,
"network": _TEST_NETWORK,
},
Expand Down Expand Up @@ -647,7 +655,7 @@ def test_call_schedule_service_create_artifact_registry(
"parent": _TEST_PARENT,
"pipeline_job": {
"runtime_config": runtime_config,
"pipeline_spec": {"fields": pipeline_spec},
"pipeline_spec": dict_to_struct(pipeline_spec),
"service_account": _TEST_SERVICE_ACCOUNT,
"network": _TEST_NETWORK,
},
Expand Down Expand Up @@ -729,7 +737,7 @@ def test_call_schedule_service_create_https(
"parent": _TEST_PARENT,
"pipeline_job": {
"runtime_config": runtime_config,
"pipeline_spec": {"fields": pipeline_spec},
"pipeline_spec": dict_to_struct(pipeline_spec),
"service_account": _TEST_SERVICE_ACCOUNT,
"network": _TEST_NETWORK,
},
Expand Down Expand Up @@ -810,7 +818,7 @@ def test_call_schedule_service_create_with_timeout(
"parent": _TEST_PARENT,
"pipeline_job": {
"runtime_config": runtime_config,
"pipeline_spec": {"fields": pipeline_spec},
"pipeline_spec": dict_to_struct(pipeline_spec),
"service_account": _TEST_SERVICE_ACCOUNT,
"network": _TEST_NETWORK,
},
Expand Down Expand Up @@ -890,7 +898,7 @@ def test_call_schedule_service_create_with_timeout_not_explicitly_set(
"parent": _TEST_PARENT,
"pipeline_job": {
"runtime_config": runtime_config,
"pipeline_spec": {"fields": pipeline_spec},
"pipeline_spec": dict_to_struct(pipeline_spec),
"service_account": _TEST_SERVICE_ACCOUNT,
"network": _TEST_NETWORK,
},
Expand Down Expand Up @@ -958,7 +966,7 @@ def test_call_pipeline_job_create_schedule(
"parent": _TEST_PARENT,
"pipeline_job": {
"runtime_config": runtime_config,
"pipeline_spec": {"fields": pipeline_spec},
"pipeline_spec": dict_to_struct(pipeline_spec),
"service_account": _TEST_SERVICE_ACCOUNT,
"network": _TEST_NETWORK,
},
Expand Down

0 comments on commit 4e7d11a

Please sign in to comment.