diff --git a/metaflow/plugins/aip/argo_utils.py b/metaflow/plugins/aip/argo_utils.py index e5321da6d36..639cb2f10b0 100644 --- a/metaflow/plugins/aip/argo_utils.py +++ b/metaflow/plugins/aip/argo_utils.py @@ -123,7 +123,8 @@ def template_get_latest( branch_name: Branch name to match. flow_name: Flow name to match. filter_func: Custom filter function that is passed template, and should return boolean value - indicating if the template can be used. + indicating if the template can be used. When writing the filter function, please be mindful that + the labels or annotations may not be present in all the template. name_only: Whether to return only the name of the template or the full manifest. Defaults to True. Returns: diff --git a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py index 2433efe17b2..bcb3c3a4fcd 100644 --- a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py +++ b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py @@ -46,13 +46,15 @@ def _retry_sleep(func: Callable, count=3, seconds=1, **kwargs): class FlowTriggeringFlow(FlowSpec): # Avoid infinite self trigger trigger_enabled: bool = Parameter("trigger_enabled", default=True) - triggered_by: str = Parameter(name="triggered_by", default=None) + + # Function as a test ID for checks and resource cleanups. + parent_workflow: str = Parameter(name="parent_workflow", default=None) @step def start(self): """Upload a downstream pipeline to be triggered""" - if self.triggered_by: - logger.info(f"This flow is triggered by run {self.triggered_by}") + if self.parent_workflow: + logger.info(f"This flow is triggered by run {self.parent_workflow}") if self.trigger_enabled: # Upload pipeline # for the case where generate_base64_uuid returns a string starting with '-' @@ -68,7 +70,7 @@ def start(self): ) for _ in range(3) ] - self.triggered_by_tag = "triggerred-by" + self.parent_tag = "parent-workflow" self.index_tag = "template-index" for template_index, template_name in enumerate( @@ -81,7 +83,7 @@ def start(self): path, extra_args=[ "--tag", - f"{self.triggered_by_tag}:{current.run_id}", + f"{self.parent_tag}:{current.run_id}", "--tag", f"{self.index}:{template_index}", ], @@ -98,29 +100,30 @@ def end(self): if self.trigger_enabled: argo_helper = ArgoHelper() + template_prefix = sanitize_k8s_name(TEST_TEMPLATE_NAME.lower()) # ====== Test template filtering ====== # Test latest template is returned with prefix filter assert self.workflow_template_names[-1] == argo_helper.template_get_latest( - template_prefix=sanitize_k8s_name(TEST_TEMPLATE_NAME.lower()), + template_prefix=template_prefix, flow_name=current.flow_name, - filter_func=lambda template: template["metadata"]["labels"][ - f"metaflow.org/tag_{self.triggered_by_tag}" - ] + filter_func=lambda template: template["metadata"]["labels"].get( + f"metaflow.org/tag_{self.parent_tag}" + ) == current.run_id, ) # Test filter func correctly filters assert self.workflow_template_names[1] == argo_helper.template_get_latest( - template_prefix=sanitize_k8s_name(TEST_TEMPLATE_NAME.lower()), + template_prefix=template_prefix, flow_name=current.flow_name, filter_func=lambda template: ( - template["metadata"]["labels"][ - f"metaflow.org/tag_{self.triggered_by_tag}" - ] + template["metadata"]["labels"].get( + f"metaflow.org/tag_{self.parent_tag}" + ) == current.run_id - and template["metadata"]["labels"][ + and template["metadata"]["labels"].get( f"metaflow.org/tag_{self.index_tag}" - ] + ) == str(1) ), ) @@ -131,7 +134,7 @@ def end(self): template_name=self.workflow_template_names[0], parameters={ "trigger_enabled": False, - "triggered_by": current.run_id, + "parent_workflow": current.run_id, }, ) logger.info(f"{run_id=}, {run_uid=}") @@ -155,10 +158,10 @@ def end(self): logger.info(f"Run Status of {run_id}: {status=}") metaflow_run_id: str = get_metaflow_run_id(run_uid) - logger.info(f"Test triggered_by is passed correctly") + logger.info(f"Test parent_workflow flow is passed correctly") metaflow_path = f"{current.flow_name}/{metaflow_run_id}/start" - _retry_sleep(self.assert_task_triggered_by, metaflow_path=metaflow_path) + _retry_sleep(self.assert_parent_workflow, metaflow_path=metaflow_path) # ====== Clean up test templates ====== for template_name in self.workflow_template_names: @@ -168,11 +171,12 @@ def end(self): logger.info(f"{self.trigger_enabled=}") @staticmethod - def assert_task_triggered_by(metaflow_path: str): + def assert_parent_workflow(metaflow_path: str): logger.info(f"fetching start step {metaflow_path}") start_step = Step(metaflow_path) - logger.info(f"assert {start_step.task.data.triggered_by=} == {current.run_id=}") - assert start_step.task.data.triggered_by == current.run_id + parent_workflow = start_step.task.data.parent_workflow + logger.info(f"assert {parent_workflow=} == {current.run_id=}") + assert parent_workflow == current.run_id @staticmethod def comiple_workflow(template_name, path, extra_args=None):