From f1359199e4f9e16f3ad15c3b5e9d53f8471820d0 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 16 Aug 2024 19:12:10 +0200 Subject: [PATCH] Remove monitoring queue tag switch monitoring db pre-router (#3587) The main goals of this PR is to make _migrate_logs_to_internal much more clearly a message forwarder, rather than a message interpreter. This follows on from PR #2168 which introduces _dispatch_to_internal to dispatches messages based on their tag rather than on the queue the message was received on, and is part of an ongoing series to simplify the queue and routing structure inside the monitoring router and database code. Further PRs in preparation (in draft PR #3315) contain further simplifications building on this PR. After this PR: * the database manager will respond to a STOP message on any incoming queue, vs previously only on the priority queue. This is a consequence of treating the queues all the same now. * the database manager will not perform such strong validation of message structure based on message tag at this point. That's part of expecting the code to forward messages, not inspect them, with later inspecting code being the place to care abou structure. This only affects behaviour when invalid messages are sent. Related PRs and context: #3567 changes the monitoring router to be more of a router and to not inspect and modify certain in-transit messages. There is a long slow project to regularise queues: PR #2117 makes resource info messages look like other message so they can be dispatched alongside other message types. The priority queue was initially (as I understand it) introduced to attempt to address a race condition of message order arrival vs SQL database key constraints. The priority queue is an attempt to force certain messages to be processed before others (not in the priority queue). However a subsequent commit in 2019, 0a4b68555ce1946e46b96a13f9003e0733252ec6, introduces a more robust approach because this priority queue approach does not work and so is not needed. --- parsl/monitoring/db_manager.py | 43 ++++++++++------------------------ 1 file changed, 13 insertions(+), 30 deletions(-) diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index 853bc4c3c7..053c98d598 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -316,7 +316,7 @@ def start(self, self._kill_event = threading.Event() self._priority_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, args=( - priority_queue, 'priority', self._kill_event,), + priority_queue, self._kill_event,), name="Monitoring-migrate-priority", daemon=True, ) @@ -324,7 +324,7 @@ def start(self, self._node_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, args=( - node_queue, 'node', self._kill_event,), + node_queue, self._kill_event,), name="Monitoring-migrate-node", daemon=True, ) @@ -332,7 +332,7 @@ def start(self, self._block_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, args=( - block_queue, 'block', self._kill_event,), + block_queue, self._kill_event,), name="Monitoring-migrate-block", daemon=True, ) @@ -340,7 +340,7 @@ def start(self, self._resource_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, args=( - resource_queue, 'resource', self._kill_event,), + resource_queue, self._kill_event,), name="Monitoring-migrate-resource", daemon=True, ) @@ -577,43 +577,26 @@ def start(self, raise RuntimeError("An exception happened sometime during database processing and should have been logged in database_manager.log") @wrap_with_logs(target="database_manager") - def _migrate_logs_to_internal(self, logs_queue: queue.Queue, queue_tag: str, kill_event: threading.Event) -> None: - logger.info("Starting processing for queue {}".format(queue_tag)) + def _migrate_logs_to_internal(self, logs_queue: queue.Queue, kill_event: threading.Event) -> None: + logger.info("Starting _migrate_logs_to_internal") while not kill_event.is_set() or logs_queue.qsize() != 0: - logger.debug("""Checking STOP conditions for {} threads: {}, {}""" - .format(queue_tag, kill_event.is_set(), logs_queue.qsize() != 0)) + logger.debug("Checking STOP conditions: kill event: %s, queue has entries: %s", + kill_event.is_set(), logs_queue.qsize() != 0) try: x, addr = logs_queue.get(timeout=0.1) except queue.Empty: continue else: - if queue_tag == 'priority' and x == 'STOP': + if x == 'STOP': self.close() - elif queue_tag == 'priority': # implicitly not 'STOP' - assert isinstance(x, tuple) - assert len(x) == 2 - assert x[0] in [MessageType.WORKFLOW_INFO, MessageType.TASK_INFO], \ - "_migrate_logs_to_internal can only migrate WORKFLOW_,TASK_INFO message from priority queue, got x[0] == {}".format(x[0]) - self._dispatch_to_internal(x) - elif queue_tag == 'resource': - assert isinstance(x, tuple), "_migrate_logs_to_internal was expecting a tuple, got {}".format(x) - assert x[0] == MessageType.RESOURCE_INFO, ( - "_migrate_logs_to_internal can only migrate RESOURCE_INFO message from resource queue, " - "got tag {}, message {}".format(x[0], x) - ) - self._dispatch_to_internal(x) - elif queue_tag == 'node': - assert len(x) == 2, "expected message tuple to have exactly two elements" - assert x[0] == MessageType.NODE_INFO, "_migrate_logs_to_internal can only migrate NODE_INFO messages from node queue" - - self._dispatch_to_internal(x) - elif queue_tag == "block": - self._dispatch_to_internal(x) else: - logger.error(f"Discarding because unknown queue tag '{queue_tag}', message: {x}") + self._dispatch_to_internal(x) def _dispatch_to_internal(self, x: Tuple) -> None: + assert isinstance(x, tuple) + assert len(x) == 2, "expected message tuple to have exactly two elements" + if x[0] in [MessageType.WORKFLOW_INFO, MessageType.TASK_INFO]: self.pending_priority_queue.put(cast(Any, x)) elif x[0] == MessageType.RESOURCE_INFO: