From 23d9c4f5bf39999d2e0abbda10b57ca2c6c3bc3e Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sat, 20 Apr 2024 11:58:25 +0000 Subject: [PATCH] Make UDP radio receiver shutdown period configurable .. and use that new parameter in stderr/out tests which don't make use of the UDP radio. --- parsl/monitoring/monitoring.py | 10 +++++++++- parsl/monitoring/router.py | 10 +++++++--- parsl/tests/test_monitoring/test_stdouterr.py | 1 + 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index a76e2cf487..d23f1a4700 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -48,7 +48,8 @@ def __init__(self, logdir: Optional[str] = None, monitoring_debug: bool = False, resource_monitoring_enabled: bool = True, - resource_monitoring_interval: float = 30): # in seconds + resource_monitoring_interval: float = 30, # in seconds + udp_atexit_timeout: float = 3): """ Parameters ---------- @@ -87,6 +88,10 @@ def __init__(self, If set to 0, only start and end information will be logged, and no periodic monitoring will be made. Default: 30 seconds + udp_atexit_timeout : float + The amount of time in seconds to wait for more UDP messages at shutdown, after the last DFK + workflow message is received. + """ if _db_manager_excepts: @@ -106,6 +111,8 @@ def __init__(self, self.resource_monitoring_enabled = resource_monitoring_enabled self.resource_monitoring_interval = resource_monitoring_interval + self.udp_atexit_timeout = udp_atexit_timeout + def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None: logger.debug("Starting MonitoringHub") @@ -166,6 +173,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No "zmq_port_range": self.hub_port_range, "logdir": self.logdir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, + "udp_atexit_timeout": self.udp_atexit_timeout }, name="Monitoring-Router-Process", daemon=True, diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 343410e3a4..d397d8e6af 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -33,7 +33,7 @@ def __init__(self, monitoring_hub_address: str = "127.0.0.1", logdir: str = ".", logging_level: int = logging.INFO, - atexit_timeout: int = 3, # in seconds + atexit_timeout: float = 3, # in seconds priority_msgs: mpq.Queue, node_msgs: mpq.Queue, block_msgs: mpq.Queue, @@ -56,7 +56,8 @@ def __init__(self, logging_level : int Logging level as defined in the logging module. Default: logging.INFO atexit_timeout : float, optional - The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received. + The amount of time in seconds to wait for more UDP messages at shutdown, after the last DFK + workflow message is received. *_msgs : Queue Four multiprocessing queues to receive messages, routed by type tag, and sometimes modified according to type tag. @@ -217,6 +218,8 @@ def router_starter(*, udp_port: Optional[int], zmq_port_range: Tuple[int, int], + udp_atexit_timeout: float, + logdir: str, logging_level: int) -> None: setproctitle("parsl: monitoring router") @@ -230,7 +233,8 @@ def router_starter(*, node_msgs=node_msgs, block_msgs=block_msgs, resource_msgs=resource_msgs, - exit_event=exit_event) + exit_event=exit_event, + atexit_timeout=udp_atexit_timeout) except Exception as e: logger.error("MonitoringRouter construction failed.", exc_info=True) comm_q.put(f"Monitoring router construction failed: {e}") diff --git a/parsl/tests/test_monitoring/test_stdouterr.py b/parsl/tests/test_monitoring/test_stdouterr.py index d1817164c0..d306a6babf 100644 --- a/parsl/tests/test_monitoring/test_stdouterr.py +++ b/parsl/tests/test_monitoring/test_stdouterr.py @@ -38,6 +38,7 @@ def fresh_config(run_dir): monitoring=MonitoringHub( hub_address="localhost", hub_port=55055, + udp_atexit_timeout=0 ) )