Skip to content

Commit

Permalink
fix: failfast=False for default in debug mode
Browse files Browse the repository at this point in the history
Signed-off-by: zjgemi <[email protected]>
  • Loading branch information
zjgemi committed Feb 22, 2024
1 parent a66ba11 commit ac2c5a9
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/dflow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def split_headers(s):
"debug_s3": boolize(os.environ.get("DFLOW_DEBUG_S3", False)),
"debug_workdir": os.environ.get("DFLOW_DEBUG_WORKDIR", "."),
"debug_artifact_dir": os.environ.get("DFLOW_DEBUG_ARTIFACT_DIR", "."),
"debug_failfast": boolize(os.environ.get("DFLOW_DEBUG_FAILFAST", False)),
}


Expand Down
10 changes: 8 additions & 2 deletions src/dflow/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -1623,6 +1623,7 @@ def handle_expr(val, scope):
"batch" % config["debug_batch_interval"])
time.sleep(config["debug_batch_interval"])

failed = []
for future in concurrent.futures.as_completed(futures):
j = futures.index(future)
try:
Expand All @@ -1633,12 +1634,17 @@ def handle_expr(val, scope):
self.parallel_steps[j].phase = "Failed"
if not self.continue_on_failed:
self.phase = "Failed"
raise RuntimeError("Step %s failed" %
self.parallel_steps[j])
if config["debug_failfast"]:
raise RuntimeError("Step %s failed" %
self.parallel_steps[j])
else:
failed.append(self.parallel_steps[j])
else:
self.parallel_steps[j].outputs = deepcopy(ps.outputs)
logging.info("Outputs of %s collected" %
self.parallel_steps[j])
if len(failed) > 0:
raise RuntimeError("Step %s failed" % failed)

for name, par in self.outputs.parameters.items():
par.value = []
Expand Down

0 comments on commit ac2c5a9

Please sign in to comment.