Skip to content

Commit

Permalink
Don't copy monitoring address/port parameters into the DFK.
Browse files Browse the repository at this point in the history
Prior to this PR, monitoring hub address and ZMQ port were stored as
attributes of the DFK. The address also existed as an attribute on
dfk.monitoring, and the ZMQ port was returned by dfk.monitoring.start

Afte this PR, those values are not added to the DFK, but instead are
accessed via dfk.monitoring.

These two attributes are now only set on a new executor when monitoring
is enabled, rather than always being intialised by the DFK. Default values
now come from the executor __init__ method, which is a more usual style
in Python for providing default values. See PR #3361

This is part of ongoing work to introduce more pluggable monitoring
network connectivity - see PR #3315
  • Loading branch information
benclifford committed Jul 18, 2024
1 parent be9b116 commit 5e1322e
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 11 deletions.
10 changes: 3 additions & 7 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,10 @@ def __init__(self, config: Config) -> None:
self.monitoring: Optional[MonitoringHub]
self.monitoring = config.monitoring

# hub address and port for interchange to connect
self.hub_address = None # type: Optional[str]
self.hub_zmq_port = None # type: Optional[int]
if self.monitoring:
if self.monitoring.logdir is None:
self.monitoring.logdir = self.run_dir
self.hub_address = self.monitoring.hub_address
self.hub_zmq_port = self.monitoring.start(self.run_id, self.run_dir, self.config.run_dir)
self.monitoring.start(self.run_id, self.run_dir, self.config.run_dir)

self.time_began = datetime.datetime.now()
self.time_completed: Optional[datetime.datetime] = None
Expand Down Expand Up @@ -1181,9 +1177,9 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
for executor in executors:
executor.run_id = self.run_id
executor.run_dir = self.run_dir
executor.hub_address = self.hub_address
executor.hub_zmq_port = self.hub_zmq_port
if self.monitoring:
executor.hub_address = self.monitoring.hub_address
executor.hub_zmq_port = self.monitoring.hub_zmq_port
executor.monitoring_radio = self.monitoring.radio
if hasattr(executor, 'provider'):
if hasattr(executor.provider, 'script_dir'):
Expand Down
4 changes: 2 additions & 2 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def __init__(self,
self.resource_monitoring_enabled = resource_monitoring_enabled
self.resource_monitoring_interval = resource_monitoring_interval

def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> int:
def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None:

logger.debug("Starting MonitoringHub")

Expand Down Expand Up @@ -207,7 +207,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat

logger.info("Monitoring Hub initialized")

return zmq_port
self.hub_zmq_port = zmq_port

# TODO: tighten the Any message format
def send(self, mtype: MessageType, message: Any) -> None:
Expand Down
4 changes: 2 additions & 2 deletions parsl/tests/test_monitoring/test_fuzz_zmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ def test_row_counts():
# the latter is what i'm most suspicious of in my present investigation

# dig out the interchange port...
hub_address = parsl.dfk().hub_address
hub_zmq_port = parsl.dfk().hub_zmq_port
hub_address = parsl.dfk().monitoring.hub_address
hub_zmq_port = parsl.dfk().monitoring.hub_zmq_port

# this will send a string to a new socket connection
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
Expand Down

0 comments on commit 5e1322e

Please sign in to comment.