Skip to content

Commit

Permalink
Merge pull request galaxyproject#16252 from mvdbeek/delete_jobs_when_…
Browse files Browse the repository at this point in the history
…cancelling_invocation

Delete non-terminal jobs and subworkflow invocations when cancelling invocation
  • Loading branch information
mvdbeek authored Nov 14, 2023
2 parents 4db3b94 + 2a11420 commit 706e977
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
</b-button>
</span>
</div>
<div v-else-if="!invocationSchedulingTerminal">
<div v-else-if="!invocationAndJobTerminal">
<b-alert variant="info" show>
<LoadingSpan :message="`Waiting to complete invocation ${indexStr}`" />
</b-alert>
Expand Down
16 changes: 9 additions & 7 deletions lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1529,13 +1529,14 @@ def mark_as_resubmitted(self, info=None):
self.sa_session.commit()

def change_state(self, state, info=False, flush=True, job=None):
job_supplied = job is not None
if not job_supplied:
if job is None:
job = self.get_job()
self.sa_session.refresh(job)
# Else:
# If this is a new job (e.g. initially queued) - we are in the same
# thread and no other threads are working on the job yet - so don't refresh.
else:
# job attributes may have been changed, so we can't refresh here,
# but we want to make sure that the terminal state check below works
# on the current job state value to minimize race conditions.
self.sa_session.expire(job, ["state"])

if job.state in model.Job.terminal_states:
log.warning(
Expand All @@ -1547,9 +1548,10 @@ def change_state(self, state, info=False, flush=True, job=None):
return
if info:
job.info = info
job.set_state(state)
state_changed = job.set_state(state)
self.sa_session.add(job)
job.update_output_states(self.app.application_stack.supports_skip_locked())
if state_changed:
job.update_output_states(self.app.application_stack.supports_skip_locked())
if flush:
with transaction(self.sa_session):
self.sa_session.commit()
Expand Down
48 changes: 33 additions & 15 deletions lib/galaxy/jobs/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
Dict,
List,
Tuple,
Type,
Union,
)

from sqlalchemy.exc import OperationalError
Expand Down Expand Up @@ -101,19 +103,18 @@ def shutdown(self):


class ItemGrabber:
grab_model: Union[Type[model.Job], Type[model.WorkflowInvocation]]

def __init__(
self,
app,
grab_type="Job",
handler_assignment_method=None,
max_grab=None,
self_handler_tags=None,
handler_tags=None,
):
self.app = app
self.sa_session = app.model.context
self.grab_this = getattr(model, grab_type)
self.grab_type = grab_type
self.handler_assignment_method = handler_assignment_method
self.self_handler_tags = self_handler_tags
self.max_grab = max_grab
Expand All @@ -123,27 +124,33 @@ def __init__(
self._supports_returning = self.app.application_stack.supports_returning()

def setup_query(self):
if self.grab_model is model.Job:
grab_condition = self.grab_model.state == self.grab_model.states.NEW
elif self.grab_model is model.WorkflowInvocation:
grab_condition = self.grab_model.state.in_((self.grab_model.states.NEW, self.grab_model.states.CANCELLING))
else:
raise NotImplementedError(f"Grabbing {self.grab_model.__name__} not implemented")
subq = (
select(self.grab_this.id)
select(self.grab_model.id)
.where(
and_(
self.grab_this.table.c.handler.in_(self.self_handler_tags),
self.grab_this.table.c.state == self.grab_this.states.NEW,
self.grab_model.handler.in_(self.self_handler_tags),
grab_condition,
)
)
.order_by(self.grab_this.table.c.id)
.order_by(self.grab_model.id)
)
if self.max_grab:
subq = subq.limit(self.max_grab)
if self.handler_assignment_method == HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED:
subq = subq.with_for_update(skip_locked=True)
self._grab_query = (
self.grab_this.table.update()
.where(self.grab_this.table.c.id.in_(subq))
self.grab_model.table.update()
.where(self.grab_model.id.in_(subq))
.values(handler=self.app.config.server_name)
)
if self._supports_returning:
self._grab_query = self._grab_query.returning(self.grab_this.table.c.id)
self._grab_query = self._grab_query.returning(self.grab_model.id)
if self.handler_assignment_method == HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION:
self._grab_conn_opts["isolation_level"] = "SERIALIZABLE"
log.info(
Expand Down Expand Up @@ -183,19 +190,31 @@ def grab_unhandled_items(self):
if self._supports_returning:
rows = proxy.fetchall()
if rows:
log.debug(f"Grabbed {self.grab_type}(s): {', '.join(str(row[0]) for row in rows)}")
log.debug(
f"Grabbed {self.grab_model.__name__}(s): {', '.join(str(row[0]) for row in rows)}"
)
else:
trans.rollback()
except OperationalError as e:
# If this is a serialization failure on PostgreSQL, then e.orig is a psycopg2 TransactionRollbackError
# and should have attribute `code`. Other engines should just report the message and move on.
if int(getattr(e.orig, "pgcode", -1)) != 40001:
log.debug(
"Grabbing %s failed (serialization failures are ok): %s", self.grab_type, unicodify(e)
"Grabbing %s failed (serialization failures are ok): %s",
self.grab_model.__name__,
unicodify(e),
)
trans.rollback()


class InvocationGrabber(ItemGrabber):
grab_model = model.WorkflowInvocation


class JobGrabber(ItemGrabber):
grab_model = model.Job


class StopSignalException(Exception):
"""Exception raised when queue returns a stop signal."""

Expand Down Expand Up @@ -236,13 +255,12 @@ def __init__(self, app: MinimalManagerApp, dispatcher):
name = "JobHandlerQueue.monitor_thread"
self._init_monitor_thread(name, target=self.__monitor, config=app.config)
self.job_grabber = None
handler_assignment_method = ItemGrabber.get_grabbable_handler_assignment_method(
handler_assignment_method = JobGrabber.get_grabbable_handler_assignment_method(
self.app.job_config.handler_assignment_methods
)
if handler_assignment_method:
self.job_grabber = ItemGrabber(
self.job_grabber = JobGrabber(
app=app,
grab_type="Job",
handler_assignment_method=handler_assignment_method,
max_grab=self.app.job_config.handler_max_grab,
self_handler_tags=self.app.job_config.self_handler_tags,
Expand Down
10 changes: 4 additions & 6 deletions lib/galaxy/managers/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,18 +421,16 @@ def get_invocation_report(self, trans, invocation_id, **kwd):
target_format=target_format,
)

def cancel_invocation(self, trans, decoded_invocation_id):
def request_invocation_cancellation(self, trans, decoded_invocation_id: int):
workflow_invocation = self.get_invocation(trans, decoded_invocation_id)
cancelled = workflow_invocation.cancel()

if cancelled:
workflow_invocation.add_message(InvocationCancellationUserRequest(reason="user_request"))
trans.sa_session.add(workflow_invocation)
with transaction(trans.sa_session):
trans.sa_session.commit()
else:
# TODO: More specific exception?
raise exceptions.MessageException("Cannot cancel an inactive workflow invocation.")

with transaction(trans.sa_session):
trans.sa_session.commit()

return workflow_invocation

Expand Down
111 changes: 96 additions & 15 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
from galaxy.model.orm.now import now
from galaxy.model.orm.util import add_object_to_object_session
from galaxy.objectstore import ObjectStore
from galaxy.schema.invocation import InvocationCancellationUserRequest
from galaxy.schema.schema import (
DatasetCollectionPopulatedState,
DatasetState,
Expand Down Expand Up @@ -1366,7 +1367,10 @@ class Job(Base, JobLike, UsesCreateAndUpdateTime, Dictifiable, Serializable):

states = JobState

# states that are not expected to change, except through admin action or re-scheduling
terminal_states = [states.OK, states.ERROR, states.DELETED]
# deleting state should not turn back into any of the non-ready states
finished_states = terminal_states + [states.DELETING]
#: job states where the job hasn't finished and the model may still change
non_ready_states = [
states.NEW,
Expand All @@ -1391,13 +1395,7 @@ def running(self):

@property
def finished(self):
states = self.states
return self.state in [
states.OK,
states.ERROR,
states.DELETING,
states.DELETED,
]
return self.state in self.finished_states

def io_dicts(self, exclude_implicit_outputs=False) -> IoDicts:
inp_data: Dict[str, Optional[DatasetInstance]] = {da.name: da.dataset for da in self.input_datasets}
Expand Down Expand Up @@ -1634,12 +1632,32 @@ def all_entry_points_configured(self):
all_configured = ep.configured and all_configured
return all_configured

def set_state(self, state):
def set_state(self, state: JobState) -> bool:
"""
Save state history
Save state history. Returns True if state has changed, else False.
"""
self.state = state
self.state_history.append(JobStateHistory(self))
if self.state == state:
# Nothing changed, no action needed
return False
session = object_session(self)
if session and self.id and state not in Job.finished_states:
# generate statement that will not revert DELETING or DELETED back to anything non-terminal
rval = session.execute(
update(Job.table)
.where(Job.id == self.id, ~Job.state.in_((Job.states.DELETING, Job.states.DELETED)))
.values(state=state)
)
if rval.rowcount == 1:
# Need to expire state since we just updated it, but ORM doesn't know about it.
session.expire(self, ["state"])
self.state_history.append(JobStateHistory(self))
return True
else:
return False
else:
self.state = state
self.state_history.append(JobStateHistory(self))
return True

def get_param_values(self, app, ignore_errors=False):
"""
Expand Down Expand Up @@ -8153,6 +8171,7 @@ class states(str, Enum):
READY = "ready" # Workflow ready for another iteration of scheduling.
SCHEDULED = "scheduled" # Workflow has been scheduled.
CANCELLED = "cancelled"
CANCELLING = "cancelling" # invocation scheduler will cancel job in next iteration
FAILED = "failed"

non_terminal_states = [states.NEW, states.READY]
Expand Down Expand Up @@ -8197,12 +8216,73 @@ def active(self):
states = WorkflowInvocation.states
return self.state in [states.NEW, states.READY]

def cancel(self):
if not self.active:
return False
def set_state(self, state: "WorkflowInvocation.states"):
session = object_session(self)
priority_states = (WorkflowInvocation.states.CANCELLING, WorkflowInvocation.states.CANCELLED)
if session and self.id and state not in priority_states:
# generate statement that will not revert CANCELLING or CANCELLED back to anything non-terminal
session.execute(
update(WorkflowInvocation.table)
.where(
WorkflowInvocation.id == self.id,
or_(~WorkflowInvocation.state.in_(priority_states), WorkflowInvocation.state.is_(None)),
)
.values(state=state)
)
else:
self.state = WorkflowInvocation.states.CANCELLED
# Not bound to a session, or setting cancelling/cancelled
self.state = state

def cancel(self):
if self.state not in [WorkflowInvocation.states.CANCELLING, WorkflowInvocation.states.CANCELLED]:
# No use cancelling workflow again, for all others we may still want to be able to cancel
# remaining tool and workflow steps
self.state = WorkflowInvocation.states.CANCELLING
return True
return False

def cancel_invocation_steps(self):
sa_session = object_session(self)
assert sa_session
job_subq = (
select(Job.id)
.join(WorkflowInvocationStep)
.filter(WorkflowInvocationStep.workflow_invocation_id == self.id)
.filter(~Job.state.in_(Job.finished_states))
.with_for_update()
.scalar_subquery()
)
sa_session.execute(update(Job.table).where(Job.id == job_subq).values({"state": Job.states.DELETING}))

job_collection_subq = (
select(Job.id)
.join(ImplicitCollectionJobsJobAssociation)
.join(ImplicitCollectionJobs)
.join(
WorkflowInvocationStep, WorkflowInvocationStep.implicit_collection_jobs_id == ImplicitCollectionJobs.id
)
.filter(WorkflowInvocationStep.workflow_invocation_id == self.id)
.filter(~Job.state.in_(Job.finished_states))
.with_for_update()
.subquery()
)

sa_session.execute(
update(Job.table)
.where(Job.table.c.id.in_(job_collection_subq.element))
.values({"state": Job.states.DELETING})
)

for invocation in self.subworkflow_invocations:
subworkflow_invocation = invocation.subworkflow_invocation
cancelled = subworkflow_invocation.cancel()
if cancelled:
subworkflow_invocation.add_message(InvocationCancellationUserRequest(reason="user_request"))
sa_session.add(subworkflow_invocation)
sa_session.commit()

def mark_cancelled(self):
self.state = WorkflowInvocation.states.CANCELLED

def fail(self):
self.state = WorkflowInvocation.states.FAILED
Expand Down Expand Up @@ -8252,6 +8332,7 @@ def poll_active_workflow_ids(engine, scheduler=None, handler=None):
or_(
WorkflowInvocation.state == WorkflowInvocation.states.NEW,
WorkflowInvocation.state == WorkflowInvocation.states.READY,
WorkflowInvocation.state == WorkflowInvocation.states.CANCELLING,
),
]
if scheduler is not None:
Expand Down
4 changes: 3 additions & 1 deletion lib/galaxy/webapps/galaxy/api/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,9 @@ def cancel_invocation(self, trans: ProvidesUserContext, invocation_id, **kwd):
:raises: exceptions.MessageException, exceptions.ObjectNotFound
"""
decoded_workflow_invocation_id = self.decode_id(invocation_id)
workflow_invocation = self.workflow_manager.cancel_invocation(trans, decoded_workflow_invocation_id)
workflow_invocation = self.workflow_manager.request_invocation_cancellation(
trans, decoded_workflow_invocation_id
)
return self.__encode_invocation(workflow_invocation, **kwd)

@expose_api
Expand Down
6 changes: 4 additions & 2 deletions lib/galaxy/workflow/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def invoke(self) -> Dict[int, Any]:
log.debug(
f"Workflow invocation [{workflow_invocation.id}] exceeded maximum number of seconds allowed for scheduling [{maximum_duration}], failing."
)
workflow_invocation.state = model.WorkflowInvocation.states.FAILED
workflow_invocation.set_state(model.WorkflowInvocation.states.FAILED)
# All jobs ran successfully, so we can save now
self.trans.sa_session.add(workflow_invocation)

Expand Down Expand Up @@ -264,7 +264,7 @@ def invoke(self) -> Dict[int, Any]:
state = model.WorkflowInvocation.states.READY
else:
state = model.WorkflowInvocation.states.SCHEDULED
workflow_invocation.state = state
workflow_invocation.set_state(state)

# All jobs ran successfully, so we can save now
self.trans.sa_session.add(workflow_invocation)
Expand Down Expand Up @@ -640,6 +640,8 @@ def subworkflow_invoker(
when_values=None,
) -> WorkflowInvoker:
subworkflow_invocation = self._subworkflow_invocation(step)
subworkflow_invocation.handler = self.workflow_invocation.handler
subworkflow_invocation.scheduler = self.workflow_invocation.scheduler
workflow_run_config = workflow_request_to_run_config(subworkflow_invocation, use_cached_job)
subworkflow_progress = self.subworkflow_progress(
subworkflow_invocation,
Expand Down
Loading

0 comments on commit 706e977

Please sign in to comment.