From 751b1f4a159dc069b99ba140aef8feba8a46e062 Mon Sep 17 00:00:00 2001 From: zjgemi Date: Mon, 18 Mar 2024 18:15:37 +0800 Subject: [PATCH 1/2] fix: reuse global outputs of reused steps Signed-off-by: zjgemi --- src/dflow/workflow.py | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/src/dflow/workflow.py b/src/dflow/workflow.py index e94b5f5c..73c7dbfe 100644 --- a/src/dflow/workflow.py +++ b/src/dflow/workflow.py @@ -20,7 +20,7 @@ from .step import Step from .steps import Steps from .task import Task -from .utils import copy_s3, get_key, linktree, randstr, set_key +from .utils import copy_s3, force_link, get_key, linktree, randstr, set_key try: import urllib3 @@ -371,7 +371,7 @@ def wait(self, interval=1): while self.query_status() in ["Pending", "Running"]: time.sleep(interval) - def handle_reused_step(self, step): + def handle_reused_step(self, step, global_parameters, global_artifacts): outputs = {} if hasattr(step, "outputs"): if hasattr(step.outputs, "exitCode"): @@ -379,9 +379,14 @@ def handle_reused_step(self, step): if hasattr(step.outputs, "parameters"): outputs["parameters"] = [] for name, par in step.outputs.parameters.items(): - if not hasattr(step.outputs.parameters[name], - "save_as_artifact"): + if not hasattr(par, "save_as_artifact"): outputs["parameters"].append(par.recover()) + if hasattr(par, "globalName") and name != \ + "dflow_global": + global_par = par.recover() + global_par["name"] = par.globalName + global_par.pop("globalName", None) + global_parameters[par.globalName] = global_par if hasattr(step.outputs, "artifacts"): for name, art in step.outputs.artifacts.items(): group_key = step.get("inputs", {}).get( @@ -397,6 +402,11 @@ def handle_reused_step(self, step): else: self.handle_reused_artifact_with_copy( step, name, art) + if hasattr(art, "globalName"): + global_art = art.recover() + global_art["name"] = art.globalName + global_art.pop("globalName", None) + global_artifacts[art.globalName] = global_art outputs["artifacts"] = [ art.recover() for art in step.outputs.artifacts.values()] self.memoize_map["%s-%s" % (self.id, step.key)] = { @@ -470,7 +480,8 @@ def convert_to_argo(self, reuse_step=None): assert isinstance(self.context, (Context, Executor)) self = self.context.render(self) - status = None + global_parameters = {} + global_artifacts = {} if reuse_step is not None: self.reused_keys = [step.key for step in reuse_step if step.key is not None] @@ -484,7 +495,8 @@ def convert_to_argo(self, reuse_step=None): if step.key is None: continue key2id[step.key] = step.id - self.handle_reused_step(step) + self.handle_reused_step(step, global_parameters, + global_artifacts) for key, step in self.memoize_map.items(): data = {key: json.dumps(step)} @@ -506,8 +518,8 @@ def convert_to_argo(self, reuse_step=None): self.handle_template(self.entrypoint, memoize_prefix=self.id, memoize_configmap="dflow") if config["save_keys_in_global_outputs"]: - status = {"outputs": {"parameters": [ - {"name": key, "value": id} for key, id in key2id.items()]}} + for key, id in key2id.items(): + global_parameters[key] = {"name": key, "value": id} else: self.handle_template(self.entrypoint) @@ -573,7 +585,8 @@ def convert_to_argo(self, reuse_step=None): artifact_repository_ref=None if self.artifact_repo_key is None else V1alpha1ArtifactRepositoryRef(key=self.artifact_repo_key) ), - status=status) + status={"outputs": {"parameters": list(global_parameters.values()), + "artifacts": list(global_artifacts.values())}}) def deduplicate_templates(self): logger.debug("before deduplication: %s" % len(self.argo_templates)) From fbf5725e40aec464a3b5a8df9ee1e93b7097fb4b Mon Sep 17 00:00:00 2001 From: zjgemi Date: Mon, 18 Mar 2024 18:16:17 +0800 Subject: [PATCH 2/2] fix: format Signed-off-by: zjgemi --- src/dflow/workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dflow/workflow.py b/src/dflow/workflow.py index 73c7dbfe..5550a461 100644 --- a/src/dflow/workflow.py +++ b/src/dflow/workflow.py @@ -20,7 +20,7 @@ from .step import Step from .steps import Steps from .task import Task -from .utils import copy_s3, force_link, get_key, linktree, randstr, set_key +from .utils import copy_s3, get_key, linktree, randstr, set_key try: import urllib3