Skip to content

Commit

Permalink
Fix flux job cancellation (#428)
Browse files Browse the repository at this point in the history
Update v0.49 and v0.26 flux script adapters to adapt cancellation machinery to work with recent update to submit machinery that converts from flux JobID to native types.

Update general cancellation behaviors:
* Check for in progress steps before declaring cancelled successfully to delay until actual final states can be serialized
* Update cancel logic to mirror failure logic: mark all steps downstream of a cancelled step to also be cancelled
  • Loading branch information
jwhite242 authored Oct 30, 2023
1 parent 0442e1e commit 8487230
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 11 deletions.
14 changes: 10 additions & 4 deletions maestrowf/datastructures/core/executiongraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ def write_status(self, path):

def _check_study_completion(self):
# We cancelled, return True marking study as complete.
if self.is_canceled:
if self.is_canceled and not self.in_progress:
LOGGER.info("Cancelled -- completing study.")
return StudyStatus.CANCELLED

Expand Down Expand Up @@ -753,7 +753,7 @@ def execute_ready_steps(self):
# For the status of each currently in progress job, check its
# state.
cleanup_steps = set() # Steps that are in progress showing failed.

cancel_steps = set() # Steps that have dependencies to mark cancelled
for name, status in job_status.items():
LOGGER.debug("Checking job '%s' with status %s.", name, status)
record = self.values[name]
Expand Down Expand Up @@ -841,12 +841,18 @@ def execute_ready_steps(self):
LOGGER.info("Step '%s' was cancelled.", name)
self.in_progress.remove(name)
record.mark_end(State.CANCELLED)
cancel_steps.update(self.bfs_subtree(name)[0])

# Let's handle all the failed steps in one go.
for node in cleanup_steps:
self.failed_steps.add(node)
self.values[node].mark_end(State.FAILED)

# Handle dependent steps that need cancelling
for node in cancel_steps:
self.cancelled_steps.add(node)
self.values[node].mark_end(State.CANCELLED)

# Now that we've checked the statuses of existing jobs we need to make
# sure dependencies haven't been met.
for key in self.values.keys():
Expand All @@ -872,9 +878,9 @@ def execute_ready_steps(self):
"Unfulfilled dependencies: %s",
self._dependencies[key])

s_completed = filter(
s_completed = list(filter(
lambda x: x in self.completed_steps,
self._dependencies[key])
self._dependencies[key]))
self._dependencies[key] = \
self._dependencies[key] - set(s_completed)
LOGGER.debug(
Expand Down
8 changes: 7 additions & 1 deletion maestrowf/interfaces/script/_flux/flux0_26_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,15 @@ def cancel(cls, joblist):
"\n".join(str(j) for j in joblist),
)

# NOTE: cannot pickle JobID instances, so must store as strings and
# reconstruct for use
jobs_rpc = flux.job.list.JobList(
cls.flux_handle,
ids=[flux.job.JobID(jid) for jid in joblist])

cancel_code = CancelCode.OK
cancel_rcode = 0
for job in joblist:
for job in jobs_rpc:
try:
LOGGER.debug("Cancelling Job %s...", job)
flux.job.cancel(cls.flux_handle, int(job))
Expand Down
16 changes: 11 additions & 5 deletions maestrowf/interfaces/script/_flux/flux0_49_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


class FluxInterface_0490(FluxInterface):
# This utility class is for Flux 0.26.0
# This utility class is for Flux 0.49.0
key = "0.49.0"

flux_handle = None
Expand Down Expand Up @@ -242,14 +242,20 @@ def cancel(cls, joblist):
"\n".join(str(j) for j in joblist),
)

# NOTE: cannot pickle JobID instances, so must store as strings and
# reconstruct for use
jobs_rpc = flux.job.list.JobList(
cls.flux_handle,
ids=[flux.job.JobID(jid) for jid in joblist])

cancel_code = CancelCode.OK
cancel_rcode = 0
for job in joblist:
for job in jobs_rpc.jobs():
try:
LOGGER.debug("Cancelling Job %s...", job)
flux.job.cancel(cls.flux_handle, int(job))
LOGGER.debug("Cancelling Job %s...", str(job.id.f58))
flux.job.cancel(cls.flux_handle, int(job.id))
except Exception as exception:
LOGGER.error(str(exception))
LOGGER.error("Job %s: %s", str(job.id.f58), str(exception))
cancel_code = CancelCode.ERROR
cancel_rcode = 1

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool]
[tool.poetry]
name = "maestrowf"
version = "1.1.10dev4"
version = "1.1.10dev5"
description = "A tool to easily orchestrate general computational workflows both locally and on supercomputers."
license = "MIT License"
classifiers = [
Expand Down

0 comments on commit 8487230

Please sign in to comment.