From c83eca85dd42a5230fc2b4cc0d7154cd6028e7bb Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 3 May 2024 09:42:23 +0000 Subject: [PATCH] Rename submit-side monitoring radio for clarification A Parsl executor is configured with a submit-side monitoring radio sender, which is used by the BlockProviderExecutor to send block status messages to the monitoring subsystem. Parsl executors also have a notion of a remote monitoring radio, used by remote workers to sending monitoring messages. This can be confusing when both of these radio senders are referred to in the same piece of code, as happened in ongoing monitoring plugin development in draft PR #3315. This PR is intended to make this sitution much less ambiguous by avoiding the mention of a monitoring radio in executor code without qualifying whether it is a submit-side or remote-worker-side radio definition. A future PR from the #3315 stack will introduce other monitoring radio references with the remote prefix, replacing the current radio_mode and related attributes. --- parsl/dataflow/dflow.py | 2 +- parsl/executors/base.py | 14 +++++++------- parsl/executors/status_handling.py | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index a62a2261d0..88ef063230 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -1180,7 +1180,7 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None: if self.monitoring: executor.hub_address = self.monitoring.hub_address executor.hub_zmq_port = self.monitoring.hub_zmq_port - executor.monitoring_radio = self.monitoring.radio + executor.submit_monitoring_radio = self.monitoring.radio if hasattr(executor, 'provider'): if hasattr(executor.provider, 'script_dir'): executor.provider.script_dir = os.path.join(self.run_dir, 'submit_scripts') diff --git a/parsl/executors/base.py b/parsl/executors/base.py index 941f392e9f..a112b9eb00 100644 --- a/parsl/executors/base.py +++ b/parsl/executors/base.py @@ -52,13 +52,13 @@ def __init__( *, hub_address: Optional[str] = None, hub_zmq_port: Optional[int] = None, - monitoring_radio: Optional[MonitoringRadioSender] = None, + submit_monitoring_radio: Optional[MonitoringRadioSender] = None, run_dir: str = ".", run_id: Optional[str] = None, ): self.hub_address = hub_address self.hub_zmq_port = hub_zmq_port - self.monitoring_radio = monitoring_radio + self.submit_monitoring_radio = submit_monitoring_radio self.run_dir = os.path.abspath(run_dir) self.run_id = run_id @@ -147,11 +147,11 @@ def hub_zmq_port(self, value: Optional[int]) -> None: self._hub_zmq_port = value @property - def monitoring_radio(self) -> Optional[MonitoringRadioSender]: + def submit_monitoring_radio(self) -> Optional[MonitoringRadioSender]: """Local radio for sending monitoring messages """ - return self._monitoring_radio + return self._submit_monitoring_radio - @monitoring_radio.setter - def monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None: - self._monitoring_radio = value + @submit_monitoring_radio.setter + def submit_monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None: + self._submit_monitoring_radio = value diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 1e4ea3c0b4..0f7ed90592 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -269,10 +269,10 @@ def workers_per_node(self) -> Union[int, float]: def send_monitoring_info(self, status: Dict) -> None: # Send monitoring info for HTEX when monitoring enabled - if self.monitoring_radio: + if self.submit_monitoring_radio: msg = self.create_monitoring_info(status) logger.debug("Sending block monitoring message: %r", msg) - self.monitoring_radio.send((MessageType.BLOCK_INFO, msg)) + self.submit_monitoring_radio.send((MessageType.BLOCK_INFO, msg)) def create_monitoring_info(self, status: Dict[str, JobStatus]) -> Sequence[object]: """Create a monitoring message for each block based on the poll status.