From 96f1e630f71ac4cc09475eafc1cd0efbad227cb5 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sun, 21 Apr 2024 14:02:52 +0000 Subject: [PATCH] count fds --- parsl/dataflow/dflow.py | 10 ++++++++-- parsl/log_utils.py | 6 +++--- parsl/monitoring/monitoring.py | 6 +++--- parsl/monitoring/router.py | 6 +++--- parsl/tests/conftest.py | 9 +++++++++ 5 files changed, 26 insertions(+), 11 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 344173c4b1..ac875308d5 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -93,7 +93,7 @@ def __init__(self, config: Config) -> None: self.run_dir = make_rundir(config.run_dir) if config.initialize_logging: - parsl.set_file_logger("{}/parsl.log".format(self.run_dir), level=logging.DEBUG) + _, self.logging_handler = parsl.set_file_logger("{}/parsl.log".format(self.run_dir), level=logging.DEBUG) logger.info("Starting DataFlowKernel with config\n{}".format(config)) @@ -1321,7 +1321,13 @@ def cleanup(self) -> None: else: logger.debug("Cleaning up non-default DFK - not unregistering") - logger.info("DFK cleanup complete") + # TODO: do this in parsl/logutils.py + logger.info("DFK cleanup complete - removing parsl.log handler") + logger_to_remove = logging.getLogger("parsl") + logger_to_remove.removeHandler(self.logging_handler) + self.logging_handler.close() + + logger.info("handler closed - is this going to break things?") def checkpoint(self, tasks: Optional[Sequence[TaskRecord]] = None) -> str: """Checkpoint the dfk incrementally to a checkpoint file. diff --git a/parsl/log_utils.py b/parsl/log_utils.py index ff25249b1b..b5d483b8e6 100644 --- a/parsl/log_utils.py +++ b/parsl/log_utils.py @@ -12,7 +12,7 @@ """ import io import logging -from typing import Optional +from typing import Optional, Tuple import typeguard @@ -65,7 +65,7 @@ def set_stream_logger(name: str = 'parsl', def set_file_logger(filename: str, name: str = 'parsl', level: int = logging.DEBUG, - format_string: Optional[str] = None) -> logging.Logger: + format_string: Optional[str] = None) -> Tuple[logging.Logger, logging.FileHandler]: """Add a file log handler. Args: @@ -93,4 +93,4 @@ def set_file_logger(filename: str, futures_logger = logging.getLogger("concurrent.futures") futures_logger.addHandler(handler) - return logger + return (logger, handler) diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index d23f1a4700..5ee9282d09 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -288,9 +288,9 @@ def close(self) -> None: @wrap_with_logs def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage]", run_dir: str) -> None: - logger = set_file_logger("{}/monitoring_filesystem_radio.log".format(logdir), - name="monitoring_filesystem_radio", - level=logging.INFO) + logger, _ = set_file_logger("{}/monitoring_filesystem_radio.log".format(logdir), + name="monitoring_filesystem_radio", + level=logging.INFO) logger.info("Starting filesystem radio receiver") setproctitle("parsl: monitoring filesystem receiver") diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index d397d8e6af..f87b86c7b4 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -65,9 +65,9 @@ def __init__(self, An event that the main Parsl process will set to signal that the monitoring router should shut down. """ os.makedirs(logdir, exist_ok=True) - self.logger = set_file_logger("{}/monitoring_router.log".format(logdir), - name="monitoring_router", - level=logging_level) + self.logger, _ = set_file_logger("{}/monitoring_router.log".format(logdir), + name="monitoring_router", + level=logging_level) self.logger.debug("Monitoring router starting") self.hub_address = hub_address diff --git a/parsl/tests/conftest.py b/parsl/tests/conftest.py index 2e227904fa..5cf9446906 100644 --- a/parsl/tests/conftest.py +++ b/parsl/tests/conftest.py @@ -217,6 +217,9 @@ def load_dfk_session(request, pytestconfig, tmpd_cwd_session): logger.error(f"BENC: end open fds: {end_fds}") assert threading.active_count() == 1, "test left threads running: " + repr(threading.enumerate()) + end_fds = this_process.num_fds() + logger.error(f"BENC: end open fds: {end_fds} (vs {start_fds} at start)") + assert start_fds == end_fds, "number of open fds changed across test run" else: yield @@ -281,6 +284,12 @@ def load_dfk_local_module(request, pytestconfig, tmpd_cwd_session): logger.error(f"BENC: end threads: {threading.active_count()}") assert threading.active_count() == 1, "test left threads running: " + repr(threading.enumerate()) + end_fds = this_process.num_fds() + logger.error(f"BENC: open fds END: {end_fds}") + if end_fds > start_fds: + logger.error(f"Open files (not all fds, though?): {this_process.open_files()!r}") + os.system(f"ls -l /proc/{os.getpid()}/fd") + pytest.fail("BENC: number of open fds increased across test") else: yield