Skip to content

Commit

Permalink
Fix intergration test
Browse files Browse the repository at this point in the history
  • Loading branch information
cloudw committed Aug 19, 2024
1 parent e9b4130 commit b3c3766
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 22 deletions.
3 changes: 2 additions & 1 deletion metaflow/plugins/aip/argo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ def template_get_latest(
branch_name: Branch name to match.
flow_name: Flow name to match.
filter_func: Custom filter function that is passed template, and should return boolean value
indicating if the template can be used.
indicating if the template can be used. When writing the filter function, please be mindful that
the labels or annotations may not be present in all the template.
name_only: Whether to return only the name of the template or the full manifest. Defaults to True.
Returns:
Expand Down
46 changes: 25 additions & 21 deletions metaflow/plugins/aip/tests/flows/flow_triggering_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ def _retry_sleep(func: Callable, count=3, seconds=1, **kwargs):
class FlowTriggeringFlow(FlowSpec):
# Avoid infinite self trigger
trigger_enabled: bool = Parameter("trigger_enabled", default=True)
triggered_by: str = Parameter(name="triggered_by", default=None)

# Function as a test ID for checks and resource cleanups.
parent_workflow: str = Parameter(name="parent_workflow", default=None)

@step
def start(self):
"""Upload a downstream pipeline to be triggered"""
if self.triggered_by:
logger.info(f"This flow is triggered by run {self.triggered_by}")
if self.parent_workflow:
logger.info(f"This flow is triggered by run {self.parent_workflow}")

if self.trigger_enabled: # Upload pipeline
# for the case where generate_base64_uuid returns a string starting with '-'
Expand All @@ -68,7 +70,7 @@ def start(self):
)
for _ in range(3)
]
self.triggered_by_tag = "triggerred-by"
self.parent_tag = "parent-workflow"
self.index_tag = "template-index"

for template_index, template_name in enumerate(
Expand All @@ -81,7 +83,7 @@ def start(self):
path,
extra_args=[
"--tag",
f"{self.triggered_by_tag}:{current.run_id}",
f"{self.parent_tag}:{current.run_id}",
"--tag",
f"{self.index}:{template_index}",
],
Expand All @@ -98,29 +100,30 @@ def end(self):
if self.trigger_enabled:
argo_helper = ArgoHelper()

template_prefix = sanitize_k8s_name(TEST_TEMPLATE_NAME.lower())
# ====== Test template filtering ======
# Test latest template is returned with prefix filter
assert self.workflow_template_names[-1] == argo_helper.template_get_latest(
template_prefix=sanitize_k8s_name(TEST_TEMPLATE_NAME.lower()),
template_prefix=template_prefix,
flow_name=current.flow_name,
filter_func=lambda template: template["metadata"]["labels"][
f"metaflow.org/tag_{self.triggered_by_tag}"
]
filter_func=lambda template: template["metadata"]["labels"].get(
f"metaflow.org/tag_{self.parent_tag}"
)
== current.run_id,
)

# Test filter func correctly filters
assert self.workflow_template_names[1] == argo_helper.template_get_latest(
template_prefix=sanitize_k8s_name(TEST_TEMPLATE_NAME.lower()),
template_prefix=template_prefix,
flow_name=current.flow_name,
filter_func=lambda template: (
template["metadata"]["labels"][
f"metaflow.org/tag_{self.triggered_by_tag}"
]
template["metadata"]["labels"].get(
f"metaflow.org/tag_{self.parent_tag}"
)
== current.run_id
and template["metadata"]["labels"][
and template["metadata"]["labels"].get(
f"metaflow.org/tag_{self.index_tag}"
]
)
== str(1)
),
)
Expand All @@ -131,7 +134,7 @@ def end(self):
template_name=self.workflow_template_names[0],
parameters={
"trigger_enabled": False,
"triggered_by": current.run_id,
"parent_workflow": current.run_id,
},
)
logger.info(f"{run_id=}, {run_uid=}")
Expand All @@ -155,10 +158,10 @@ def end(self):
logger.info(f"Run Status of {run_id}: {status=}")

metaflow_run_id: str = get_metaflow_run_id(run_uid)
logger.info(f"Test triggered_by is passed correctly")
logger.info(f"Test parent_workflow flow is passed correctly")
metaflow_path = f"{current.flow_name}/{metaflow_run_id}/start"

_retry_sleep(self.assert_task_triggered_by, metaflow_path=metaflow_path)
_retry_sleep(self.assert_parent_workflow, metaflow_path=metaflow_path)

# ====== Clean up test templates ======
for template_name in self.workflow_template_names:
Expand All @@ -168,11 +171,12 @@ def end(self):
logger.info(f"{self.trigger_enabled=}")

@staticmethod
def assert_task_triggered_by(metaflow_path: str):
def assert_parent_workflow(metaflow_path: str):
logger.info(f"fetching start step {metaflow_path}")
start_step = Step(metaflow_path)
logger.info(f"assert {start_step.task.data.triggered_by=} == {current.run_id=}")
assert start_step.task.data.triggered_by == current.run_id
parent_workflow = start_step.task.data.parent_workflow
logger.info(f"assert {parent_workflow=} == {current.run_id=}")
assert parent_workflow == current.run_id

@staticmethod
def comiple_workflow(template_name, path, extra_args=None):
Expand Down

0 comments on commit b3c3766

Please sign in to comment.