From ffbf3d3559a00f8d55c297cf1c6f81ef27cc289e Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 14 Jun 2024 13:08:10 +0000 Subject: [PATCH] make some parameters be included as self attributes this is to avoid wiring them around multiple method calls when self is already there these are for use by upcoming threads they are all multiprocessing objects, so should all be thread-safe, so it should be fine to make them accessible as self attributes from multiple threads --- parsl/monitoring/router.py | 46 ++++++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 70b4862295..b787545c17 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -32,7 +32,12 @@ def __init__(self, logdir: str = ".", run_id: str, logging_level: int = logging.INFO, - atexit_timeout: int = 3 # in seconds + atexit_timeout: int = 3, # in seconds + priority_msgs: "queue.Queue[AddressedMonitoringMessage]", + node_msgs: "queue.Queue[AddressedMonitoringMessage]", + block_msgs: "queue.Queue[AddressedMonitoringMessage]", + resource_msgs: "queue.Queue[AddressedMonitoringMessage]", + exit_event: Event, ): """ Initializes a monitoring configuration class. @@ -52,6 +57,7 @@ def __init__(self, atexit_timeout : float, optional The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received. + TODO: documentation of new parameters """ os.makedirs(logdir, exist_ok=True) self.logger = set_file_logger("{}/monitoring_router.log".format(logdir), @@ -93,19 +99,20 @@ def __init__(self, min_port=zmq_port_range[0], max_port=zmq_port_range[1]) - def start(self, - priority_msgs: "queue.Queue[AddressedMonitoringMessage]", - node_msgs: "queue.Queue[AddressedMonitoringMessage]", - block_msgs: "queue.Queue[AddressedMonitoringMessage]", - resource_msgs: "queue.Queue[AddressedMonitoringMessage]", - exit_event: Event) -> None: + self.priority_msgs = priority_msgs + self.node_msgs = node_msgs + self.block_msgs = block_msgs + self.resource_msgs = resource_msgs + self.exit_event = exit_event + + def start(self) -> None: try: - while not exit_event.is_set(): + while not self.exit_event.is_set(): try: data, addr = self.udp_sock.recvfrom(2048) resource_msg = pickle.loads(data) self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg)) - resource_msgs.put((resource_msg, addr)) + self.resource_msgs.put((resource_msg, addr)) except socket.timeout: pass @@ -125,15 +132,15 @@ def start(self, if msg[0] == MessageType.NODE_INFO: msg[1]['run_id'] = self.run_id - node_msgs.put(msg_0) + self.node_msgs.put(msg_0) elif msg[0] == MessageType.RESOURCE_INFO: - resource_msgs.put(msg_0) + self.resource_msgs.put(msg_0) elif msg[0] == MessageType.BLOCK_INFO: - block_msgs.put(msg_0) + self.block_msgs.put(msg_0) elif msg[0] == MessageType.TASK_INFO: - priority_msgs.put(msg_0) + self.priority_msgs.put(msg_0) elif msg[0] == MessageType.WORKFLOW_INFO: - priority_msgs.put(msg_0) + self.priority_msgs.put(msg_0) else: # There is a type: ignore here because if msg[0] # is of the correct type, this code is unreachable, @@ -158,7 +165,7 @@ def start(self, data, addr = self.udp_sock.recvfrom(2048) msg = pickle.loads(data) self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) - resource_msgs.put((msg, addr)) + self.resource_msgs.put((msg, addr)) last_msg_received_time = time.time() except socket.timeout: pass @@ -191,7 +198,12 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", zmq_port_range=zmq_port_range, logdir=logdir, logging_level=logging_level, - run_id=run_id) + run_id=run_id, + priority_msgs=priority_msgs, + node_msgs=node_msgs, + block_msgs=block_msgs, + resource_msgs=resource_msgs, + exit_event=exit_event) except Exception as e: logger.error("MonitoringRouter construction failed.", exc_info=True) comm_q.put(f"Monitoring router construction failed: {e}") @@ -200,7 +212,7 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", router.logger.info("Starting MonitoringRouter in router_starter") try: - router.start(priority_msgs, node_msgs, block_msgs, resource_msgs, exit_event) + router.start() except Exception as e: router.logger.exception("router.start exception") exception_q.put(('Hub', str(e)))