diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 3ecabd11fe..87ebb88c0b 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -113,14 +113,10 @@ def __init__(self, config: Config) -> None: self.monitoring: Optional[MonitoringHub] self.monitoring = config.monitoring - # hub address and port for interchange to connect - self.hub_address = None # type: Optional[str] - self.hub_zmq_port = None # type: Optional[int] if self.monitoring: if self.monitoring.logdir is None: self.monitoring.logdir = self.run_dir - self.hub_address = self.monitoring.hub_address - self.hub_zmq_port = self.monitoring.start(self.run_id, self.run_dir, self.config.run_dir) + self.monitoring.start(self.run_id, self.run_dir, self.config.run_dir) self.time_began = datetime.datetime.now() self.time_completed: Optional[datetime.datetime] = None @@ -1181,9 +1177,9 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None: for executor in executors: executor.run_id = self.run_id executor.run_dir = self.run_dir - executor.hub_address = self.hub_address - executor.hub_zmq_port = self.hub_zmq_port if self.monitoring: + executor.hub_address = self.monitoring.hub_address + executor.hub_zmq_port = self.monitoring.hub_zmq_port executor.monitoring_radio = self.monitoring.radio if hasattr(executor, 'provider'): if hasattr(executor.provider, 'script_dir'): diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 8e4770a32a..9133e35296 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -105,7 +105,7 @@ def __init__(self, self.resource_monitoring_enabled = resource_monitoring_enabled self.resource_monitoring_interval = resource_monitoring_interval - def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> int: + def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None: logger.debug("Starting MonitoringHub") @@ -207,7 +207,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat logger.info("Monitoring Hub initialized") - return zmq_port + self.hub_zmq_port = zmq_port # TODO: tighten the Any message format def send(self, mtype: MessageType, message: Any) -> None: diff --git a/parsl/tests/test_monitoring/test_fuzz_zmq.py b/parsl/tests/test_monitoring/test_fuzz_zmq.py index 36f048efb3..3f50385564 100644 --- a/parsl/tests/test_monitoring/test_fuzz_zmq.py +++ b/parsl/tests/test_monitoring/test_fuzz_zmq.py @@ -44,8 +44,8 @@ def test_row_counts(): # the latter is what i'm most suspicious of in my present investigation # dig out the interchange port... - hub_address = parsl.dfk().hub_address - hub_zmq_port = parsl.dfk().hub_zmq_port + hub_address = parsl.dfk().monitoring.hub_address + hub_zmq_port = parsl.dfk().monitoring.hub_zmq_port # this will send a string to a new socket connection with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: