From 9798260c06da16f7d1a75dc2859c513d13992dc3 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 23 Jul 2024 19:28:44 +0200 Subject: [PATCH] Move FluxExecutor ZMQ into thread and explicitly clean it up (#3517) Prior to this PR, there were frequent hangs in CI at cleanup of the ZMQ objects used by the FluxExecutor. See issue #3484 for some more information. This PR attempts to remove some dangerous behaviour there: i) creation of ZMQ context and socket is moved into the thread which makes use of them - before this PR, the socket was created on the main thread and passed into the submission thread which uses it. This removes some thread safety issues where a socket cannot be safely moved between threads. ii) ZMQ context and socket are more explicitly closed (using with-blocks) rather than leaving that to the garbage collector. In the hung tests, the ZMQ context was being garbage collected in the main thread, which is documented as being unsafe when sockets are open belonging to another thread (the submission thread) On my laptop I could see a hang around 50% of test runs before this PR. After this PR, I have run about 100 iterations of the flux tests without seeing any hangs. --- parsl/executors/flux/executor.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/parsl/executors/flux/executor.py b/parsl/executors/flux/executor.py index c4926abb68..f1b981f7e0 100644 --- a/parsl/executors/flux/executor.py +++ b/parsl/executors/flux/executor.py @@ -200,7 +200,6 @@ def __init__( raise EnvironmentError("Cannot find Flux installation in PATH") self.flux_path = os.path.abspath(flux_path) self._task_id_counter = itertools.count() - self._socket = zmq.Context().socket(zmq.REP) # Assumes a launch command cannot be None or empty self.launch_cmd = launch_cmd or self.DEFAULT_LAUNCH_CMD self._submission_queue: queue.Queue = queue.Queue() @@ -213,7 +212,6 @@ def __init__( args=( self._submission_queue, self._stop_event, - self._socket, self.working_dir, self.flux_executor_kwargs, self.provider, @@ -306,11 +304,13 @@ def _submit_wrapper( If an exception is thrown, error out all submitted tasks. """ - try: - _submit_flux_jobs(submission_queue, stop_event, *args, **kwargs) - except Exception as exc: - _error_out_jobs(submission_queue, stop_event, exc) - raise + with zmq.Context() as ctx: + with ctx.socket(zmq.REP) as socket: + try: + _submit_flux_jobs(submission_queue, stop_event, socket, *args, **kwargs) + except Exception as exc: + _error_out_jobs(submission_queue, stop_event, exc) + raise def _error_out_jobs(