Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make result queue poll for shutdown, and tidy up at shutdown #3709

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,9 @@ def __init__(self,
interchange_launch_cmd = DEFAULT_INTERCHANGE_LAUNCH_CMD
self.interchange_launch_cmd = interchange_launch_cmd

self._result_queue_thread_exit = threading.Event()
self._result_queue_thread: Optional[threading.Thread] = None

radio_mode = "htex"
enable_mpi_mode: bool = False
mpi_launcher: str = "mpiexec"
Expand Down Expand Up @@ -455,9 +458,11 @@ def _result_queue_worker(self):
"""
logger.debug("Result queue worker starting")

while not self.bad_state_is_set:
while not self.bad_state_is_set and not self._result_queue_thread_exit.is_set():
try:
msgs = self.incoming_q.get()
msgs = self.incoming_q.get(timeout_ms=self.poll_period)
if msgs is None: # timeout
continue
Comment on lines -460 to +465
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good. This exact procedure has been on my mind for quite awhile, so thank you for this!

I'm thinking at some point we can go farther and simply block completely, but I haven't fully gotten through the shutdown logic for this route yet. But "on my mind," at some point.


except IOError as e:
logger.exception("Caught broken queue with exception code {}: {}".format(e.errno, e))
Expand Down Expand Up @@ -515,6 +520,8 @@ def _result_queue_worker(self):
else:
raise BadMessage("Message received with unknown type {}".format(msg['type']))

logger.info("Closing result ZMQ pipe")
self.incoming_q.close()
logger.info("Result queue worker finished")

def _start_local_interchange_process(self) -> None:
Expand Down Expand Up @@ -817,6 +824,8 @@ def shutdown(self, timeout: float = 10.0):

logger.info("Attempting HighThroughputExecutor shutdown")

logger.info("Terminating interchange and result queue thread")
self._result_queue_thread_exit.set()
self.interchange_proc.terminate()
try:
self.interchange_proc.wait(timeout=timeout)
Expand All @@ -841,6 +850,10 @@ def shutdown(self, timeout: float = 10.0):
logger.info("Closing command client")
self.command_client.close()

logger.info("Waiting for result queue thread exit")
if self._result_queue_thread:
self._result_queue_thread.join()

logger.info("Finished HighThroughputExecutor shutdown attempt")

def get_usage_information(self):
Expand Down
17 changes: 13 additions & 4 deletions parsl/executors/high_throughput/zmq_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,21 @@ def __init__(self, ip_address, port_range, cert_dir: Optional[str] = None):
self.port = self.results_receiver.bind_to_random_port(tcp_url(ip_address),
min_port=port_range[0],
max_port=port_range[1])
self.poller = zmq.Poller()
self.poller.register(self.results_receiver, zmq.POLLIN)

def get(self):
def get(self, timeout_ms=None):
"""Get a message from the queue, returning None if timeout expires
without a message. timeout is measured in milliseconds.
"""
logger.debug("Waiting for ResultsIncoming message")
m = self.results_receiver.recv_multipart()
logger.debug("Received ResultsIncoming message")
return m
socks = dict(self.poller.poll(timeout=timeout_ms))
if self.results_receiver in socks and socks[self.results_receiver] == zmq.POLLIN:
m = self.results_receiver.recv_multipart()
logger.debug("Received ResultsIncoming message")
return m
else:
return None

def close(self):
self.results_receiver.close()
Expand Down