From a66ba11e55c1c81ab40769675bd91f4199cedd2d Mon Sep 17 00:00:00 2001 From: zjgemi Date: Thu, 22 Feb 2024 14:16:28 +0800 Subject: [PATCH] fix: support wf.query() in debug mode Signed-off-by: zjgemi --- src/dflow/step.py | 5 ++++- src/dflow/workflow.py | 30 +++++++++++++++++++++++++++++- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/src/dflow/step.py b/src/dflow/step.py index 9f36ef95..9d053d82 100644 --- a/src/dflow/step.py +++ b/src/dflow/step.py @@ -464,7 +464,10 @@ def __init__( if self.template.slices.input_parameter: name = self.template.slices.input_parameter[0] value = self.inputs.parameters[name].value - self.with_param = argo_range(argo_len(value)) + if hasattr(value, "__len__"): + self.with_param = argo_range(len(value)) + else: + self.with_param = argo_range(argo_len(value)) else: assert len(self.template.slices.input_artifact) > 0, "sliced "\ "input parameter or artifact must not be empty to infer "\ diff --git a/src/dflow/workflow.py b/src/dflow/workflow.py index b1628f50..e94b5f5c 100644 --- a/src/dflow/workflow.py +++ b/src/dflow/workflow.py @@ -847,6 +847,31 @@ def query( Returns: an ArgoWorkflow object """ + if config["mode"] == "debug": + nodes = {} + for step in self.query_step(): + step.inputs.parameters = list(step.inputs.parameters.values()) + step.inputs.artifacts = list(step.inputs.artifacts.values()) + step.outputs.parameters = list( + step.outputs.parameters.values()) + step.outputs.artifacts = list(step.outputs.artifacts.values()) + nodes[step.id] = step.recover() + outputs = self.query_global_outputs() + if outputs is not None: + outputs.parameters = list(outputs.parameters.values()) + outputs.artifacts = list(outputs.artifacts.values()) + outputs = outputs.recover() + response = { + "metadata": { + "name": self.id, + }, + "status": { + "phase": self.query_status(), + "nodes": nodes, + "outputs": outputs, + } + } + return ArgoWorkflow(response) query_params = None if fields is not None: query_params = [('fields', ",".join(fields))] @@ -955,6 +980,7 @@ def query_step( "workflow": self.id, "displayName": _name, "key": s, + "id": s, "startedAt": os.path.getmtime(stepdir), "phase": _phase, "type": _type, @@ -998,6 +1024,7 @@ def query_step( }) step = ArgoStep(step, self.id) step_list.append(step) + step_list.sort(key=lambda x: x["startedAt"]) return step_list return self.query().get_step(name=name, key=key, phase=phase, id=id, @@ -1015,7 +1042,8 @@ def query_keys_of_steps( a list of keys """ if config["mode"] == "debug": - return [step.key for step in self.query_step()] + return [step.key for step in self.query_step() + if step.key is not None] try: try: response = self.api_instance.api_client.call_api(