Skip to content

Commit

Permalink
Make UDP radio receiver shutdown period configurable
Browse files Browse the repository at this point in the history
.. and use that new parameter in stderr/out tests which don't make use of
the UDP radio.
  • Loading branch information
benclifford committed Sep 5, 2024
1 parent 04ba3a6 commit e836b9c
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 4 deletions.
10 changes: 9 additions & 1 deletion parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def __init__(self,
logdir: Optional[str] = None,
monitoring_debug: bool = False,
resource_monitoring_enabled: bool = True,
resource_monitoring_interval: float = 30): # in seconds
resource_monitoring_interval: float = 30, # in seconds
udp_atexit_timeout: float = 3):
"""
Parameters
----------
Expand Down Expand Up @@ -87,6 +88,10 @@ def __init__(self,
If set to 0, only start and end information will be logged, and no periodic monitoring will
be made.
Default: 30 seconds
udp_atexit_timeout : float
The amount of time in seconds to wait for more UDP messages at shutdown, after the last DFK
workflow message is received.
"""

if _db_manager_excepts:
Expand All @@ -106,6 +111,8 @@ def __init__(self,
self.resource_monitoring_enabled = resource_monitoring_enabled
self.resource_monitoring_interval = resource_monitoring_interval

self.udp_atexit_timeout = udp_atexit_timeout

def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None:

logger.debug("Starting MonitoringHub")
Expand Down Expand Up @@ -154,6 +161,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
"zmq_port_range": self.hub_port_range,
"logdir": self.logdir,
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
"udp_atexit_timeout": self.udp_atexit_timeout
},
name="Monitoring-Router-Process",
daemon=True,
Expand Down
11 changes: 8 additions & 3 deletions parsl/monitoring/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self,
monitoring_hub_address: str = "127.0.0.1",
logdir: str = ".",
logging_level: int = logging.INFO,
atexit_timeout: int = 3, # in seconds
atexit_timeout: float = 3, # in seconds
resource_msgs: mpq.Queue,
exit_event: Event,
):
Expand All @@ -52,7 +52,9 @@ def __init__(self,
logging_level : int
Logging level as defined in the logging module. Default: logging.INFO
atexit_timeout : float, optional
The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received.
The amount of time in seconds to wait for more UDP messages at shutdown, after the last DFK
workflow message is received.
resource_msgs : multiprocessing.Queue
A multiprocessing queue to receive messages to be routed onwards to the database process
Expand Down Expand Up @@ -190,6 +192,8 @@ def router_starter(*,
udp_port: Optional[int],
zmq_port_range: Tuple[int, int],

udp_atexit_timeout: float,

logdir: str,
logging_level: int) -> None:
setproctitle("parsl: monitoring router")
Expand All @@ -200,7 +204,8 @@ def router_starter(*,
logdir=logdir,
logging_level=logging_level,
resource_msgs=resource_msgs,
exit_event=exit_event)
exit_event=exit_event,
atexit_timeout=udp_atexit_timeout)
except Exception as e:
logger.error("MonitoringRouter construction failed.", exc_info=True)
comm_q.put(f"Monitoring router construction failed: {e}")
Expand Down
1 change: 1 addition & 0 deletions parsl/tests/test_monitoring/test_stdouterr.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def fresh_config(run_dir):
monitoring=MonitoringHub(
hub_address="localhost",
hub_port=55055,
udp_atexit_timeout=0
)
)

Expand Down

0 comments on commit e836b9c

Please sign in to comment.