Skip to content

Commit

Permalink
Make UDP radio receiver shutdown period configurable
Browse files Browse the repository at this point in the history
.. and use that new parameter in stderr/out tests which don't make use of
the UDP radio.
  • Loading branch information
benclifford committed Aug 16, 2024
1 parent 92c9621 commit 782efa6
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 4 deletions.
10 changes: 9 additions & 1 deletion parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def __init__(self,
logdir: Optional[str] = None,
monitoring_debug: bool = False,
resource_monitoring_enabled: bool = True,
resource_monitoring_interval: float = 30): # in seconds
resource_monitoring_interval: float = 30, # in seconds
udp_atexit_timeout: float = 3):
"""
Parameters
----------
Expand Down Expand Up @@ -87,6 +88,10 @@ def __init__(self,
If set to 0, only start and end information will be logged, and no periodic monitoring will
be made.
Default: 30 seconds
udp_atexit_timeout : float
The amount of time in seconds to wait for more UDP messages at shutdown, after the last DFK
workflow message is received.
"""

if _db_manager_excepts:
Expand All @@ -106,6 +111,8 @@ def __init__(self,
self.resource_monitoring_enabled = resource_monitoring_enabled
self.resource_monitoring_interval = resource_monitoring_interval

self.udp_atexit_timeout = udp_atexit_timeout

def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None:

logger.debug("Starting MonitoringHub")
Expand Down Expand Up @@ -166,6 +173,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
"zmq_port_range": self.hub_port_range,
"logdir": self.logdir,
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
"udp_atexit_timeout": self.udp_atexit_timeout
},
name="Monitoring-Router-Process",
daemon=True,
Expand Down
10 changes: 7 additions & 3 deletions parsl/monitoring/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(self,
monitoring_hub_address: str = "127.0.0.1",
logdir: str = ".",
logging_level: int = logging.INFO,
atexit_timeout: int = 3, # in seconds
atexit_timeout: float = 3, # in seconds
priority_msgs: mpq.Queue,
node_msgs: mpq.Queue,
block_msgs: mpq.Queue,
Expand All @@ -56,7 +56,8 @@ def __init__(self,
logging_level : int
Logging level as defined in the logging module. Default: logging.INFO
atexit_timeout : float, optional
The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received.
The amount of time in seconds to wait for more UDP messages at shutdown, after the last DFK
workflow message is received.
*_msgs : Queue
Four multiprocessing queues to receive messages, routed by type tag, and sometimes modified according to type tag.
Expand Down Expand Up @@ -217,6 +218,8 @@ def router_starter(*,
udp_port: Optional[int],
zmq_port_range: Tuple[int, int],

udp_atexit_timeout: float,

logdir: str,
logging_level: int) -> None:
setproctitle("parsl: monitoring router")
Expand All @@ -230,7 +233,8 @@ def router_starter(*,
node_msgs=node_msgs,
block_msgs=block_msgs,
resource_msgs=resource_msgs,
exit_event=exit_event)
exit_event=exit_event,
atexit_timeout=udp_atexit_timeout)
except Exception as e:
logger.error("MonitoringRouter construction failed.", exc_info=True)
comm_q.put(f"Monitoring router construction failed: {e}")
Expand Down
1 change: 1 addition & 0 deletions parsl/tests/test_monitoring/test_stdouterr.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def fresh_config(run_dir):
monitoring=MonitoringHub(
hub_address="localhost",
hub_port=55055,
udp_atexit_timeout=0
)
)

Expand Down

0 comments on commit 782efa6

Please sign in to comment.