From 0c67ecd0ccbb49ab892f7469d503ece73c244d28 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Tue, 10 Oct 2023 22:09:55 -0400 Subject: [PATCH] Refactor ItemGrabber into JobGrabber and InvocationGrabber --- lib/galaxy/jobs/handler.py | 44 +++++++++++++++-------- lib/galaxy/workflow/scheduling_manager.py | 7 ++-- 2 files changed, 33 insertions(+), 18 deletions(-) diff --git a/lib/galaxy/jobs/handler.py b/lib/galaxy/jobs/handler.py index 028023784dad..8b342b771a1c 100644 --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -13,6 +13,8 @@ Dict, List, Tuple, + Type, + Union, ) from sqlalchemy.exc import OperationalError @@ -101,10 +103,11 @@ def shutdown(self): class ItemGrabber: + grab_model: Union[Type[model.Job], Type[model.WorkflowInvocation]] + def __init__( self, app, - grab_type="Job", handler_assignment_method=None, max_grab=None, self_handler_tags=None, @@ -112,8 +115,6 @@ def __init__( ): self.app = app self.sa_session = app.model.context - self.grab_this = getattr(model, grab_type) - self.grab_type = grab_type self.handler_assignment_method = handler_assignment_method self.self_handler_tags = self_handler_tags self.max_grab = max_grab @@ -123,27 +124,35 @@ def __init__( self._supports_returning = self.app.application_stack.supports_returning() def setup_query(self): + if self.grab_model is model.Job: + grab_condition = self.grab_model.table.c.state == self.grab_model.states.NEW + elif self.grab_model is model.WorkflowInvocation: + grab_condition = self.grab_model.table.c.state.in_( + (self.grab_model.states.NEW, self.grab_model.states.CANCELLING) + ) + else: + raise NotImplementedError(f"Grabbing {self.grab_model} not implemented") subq = ( - select(self.grab_this.id) + select(self.grab_model.id) .where( and_( - self.grab_this.table.c.handler.in_(self.self_handler_tags), - self.grab_this.table.c.state == self.grab_this.states.NEW, + self.grab_model.table.c.handler.in_(self.self_handler_tags), + grab_condition, ) ) - .order_by(self.grab_this.table.c.id) + .order_by(self.grab_model.table.c.id) ) if self.max_grab: subq = subq.limit(self.max_grab) if self.handler_assignment_method == HANDLER_ASSIGNMENT_METHODS.DB_SKIP_LOCKED: subq = subq.with_for_update(skip_locked=True) self._grab_query = ( - self.grab_this.table.update() - .where(self.grab_this.table.c.id.in_(subq)) + self.grab_model.table.update() + .where(self.grab_model.table.c.id.in_(subq)) .values(handler=self.app.config.server_name) ) if self._supports_returning: - self._grab_query = self._grab_query.returning(self.grab_this.table.c.id) + self._grab_query = self._grab_query.returning(self.grab_model.table.c.id) if self.handler_assignment_method == HANDLER_ASSIGNMENT_METHODS.DB_TRANSACTION_ISOLATION: self._grab_conn_opts["isolation_level"] = "SERIALIZABLE" log.info( @@ -183,7 +192,7 @@ def grab_unhandled_items(self): if self._supports_returning: rows = proxy.fetchall() if rows: - log.debug(f"Grabbed {self.grab_type}(s): {', '.join(str(row[0]) for row in rows)}") + log.debug(f"Grabbed {type(self.grab_model)}(s): {', '.join(str(row[0]) for row in rows)}") else: trans.rollback() except OperationalError as e: @@ -191,11 +200,19 @@ def grab_unhandled_items(self): # and should have attribute `code`. Other engines should just report the message and move on. if int(getattr(e.orig, "pgcode", -1)) != 40001: log.debug( - "Grabbing %s failed (serialization failures are ok): %s", self.grab_type, unicodify(e) + "Grabbing %s failed (serialization failures are ok): %s", self.grab_model, unicodify(e) ) trans.rollback() +class InvocationGrabber(ItemGrabber): + grab_model = model.WorkflowInvocation + + +class JobGrabber(ItemGrabber): + grab_model = model.Job + + class StopSignalException(Exception): """Exception raised when queue returns a stop signal.""" @@ -240,9 +257,8 @@ def __init__(self, app: MinimalManagerApp, dispatcher): self.app.job_config.handler_assignment_methods ) if handler_assignment_method: - self.job_grabber = ItemGrabber( + self.job_grabber = JobGrabber( app=app, - grab_type="Job", handler_assignment_method=handler_assignment_method, max_grab=self.app.job_config.handler_max_grab, self_handler_tags=self.app.job_config.self_handler_tags, diff --git a/lib/galaxy/workflow/scheduling_manager.py b/lib/galaxy/workflow/scheduling_manager.py index 32fceaee7cf1..b8851def3e7a 100644 --- a/lib/galaxy/workflow/scheduling_manager.py +++ b/lib/galaxy/workflow/scheduling_manager.py @@ -4,7 +4,7 @@ import galaxy.workflow.schedulers from galaxy import model from galaxy.exceptions import HandlerAssignmentError -from galaxy.jobs.handler import ItemGrabber +from galaxy.jobs.handler import InvocationGrabber from galaxy.model.base import transaction from galaxy.util import plugin_config from galaxy.util.custom_logging import get_logger @@ -285,13 +285,12 @@ def __init__(self, app, workflow_scheduling_manager): self.invocation_grabber = None self_handler_tags = set(self.app.job_config.self_handler_tags) self_handler_tags.add(self.workflow_scheduling_manager.default_handler_id) - handler_assignment_method = ItemGrabber.get_grabbable_handler_assignment_method( + handler_assignment_method = InvocationGrabber.get_grabbable_handler_assignment_method( self.workflow_scheduling_manager.handler_assignment_methods ) if handler_assignment_method: - self.invocation_grabber = ItemGrabber( + self.invocation_grabber = InvocationGrabber( app=app, - grab_type="WorkflowInvocation", handler_assignment_method=handler_assignment_method, max_grab=self.workflow_scheduling_manager.handler_max_grab, self_handler_tags=self_handler_tags,