Skip to content

Commit

Permalink
Move FluxExecutor ZMQ into thread and explicitly clean it up (#3517)
Browse files Browse the repository at this point in the history
Prior to this PR, there were frequent hangs in CI at cleanup of the ZMQ objects used by the FluxExecutor. See issue #3484 for some more information.

This PR attempts to remove some dangerous behaviour there:

i) creation of ZMQ context and socket is moved into the thread which makes use of them - before this PR, the socket was created on the main thread and passed into the submission thread which uses it. This removes some thread safety issues where a socket cannot be safely moved between threads.

ii) ZMQ context and socket are more explicitly closed (using with-blocks) rather than leaving that to the garbage collector. In the hung tests, the ZMQ context was being garbage collected in the main thread, which is documented as being unsafe when sockets are open belonging to another thread (the submission thread)

On my laptop I could see a hang around 50% of test runs before this PR. After this PR, I have run about 100 iterations of the flux tests without seeing any hangs.
  • Loading branch information
benclifford authored Jul 23, 2024
1 parent 2b1594c commit 9798260
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions parsl/executors/flux/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ def __init__(
raise EnvironmentError("Cannot find Flux installation in PATH")
self.flux_path = os.path.abspath(flux_path)
self._task_id_counter = itertools.count()
self._socket = zmq.Context().socket(zmq.REP)
# Assumes a launch command cannot be None or empty
self.launch_cmd = launch_cmd or self.DEFAULT_LAUNCH_CMD
self._submission_queue: queue.Queue = queue.Queue()
Expand All @@ -213,7 +212,6 @@ def __init__(
args=(
self._submission_queue,
self._stop_event,
self._socket,
self.working_dir,
self.flux_executor_kwargs,
self.provider,
Expand Down Expand Up @@ -306,11 +304,13 @@ def _submit_wrapper(
If an exception is thrown, error out all submitted tasks.
"""
try:
_submit_flux_jobs(submission_queue, stop_event, *args, **kwargs)
except Exception as exc:
_error_out_jobs(submission_queue, stop_event, exc)
raise
with zmq.Context() as ctx:
with ctx.socket(zmq.REP) as socket:
try:
_submit_flux_jobs(submission_queue, stop_event, socket, *args, **kwargs)
except Exception as exc:
_error_out_jobs(submission_queue, stop_event, exc)
raise


def _error_out_jobs(
Expand Down

0 comments on commit 9798260

Please sign in to comment.