From 2b0af733699a6e58a0658aca0f6e406166107106 Mon Sep 17 00:00:00 2001 From: Aymen Alsaadi <27039262+AymenFJA@users.noreply.github.com> Date: Sat, 14 Dec 2024 12:45:04 -0500 Subject: [PATCH] Make radical bulk collector exit more explicitly (#3718) In addition to the work happening [here](https://github.com/radical-cybertools/radical.pilot/pull/3269) to address this issue https://github.com/Parsl/parsl/issues/3708. This PR ensures the cleanup of threads generated by RPEX. - Bug fix (partially https://github.com/Parsl/parsl/issues/3708) --------- Co-authored-by: Ben Clifford --- parsl/executors/radical/executor.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/parsl/executors/radical/executor.py b/parsl/executors/radical/executor.py index 93b4b38bbd..aa191f49e5 100644 --- a/parsl/executors/radical/executor.py +++ b/parsl/executors/radical/executor.py @@ -133,6 +133,7 @@ def __init__(self, self.resource = resource self._uid = RPEX.lower() self.bulk_mode = bulk_mode + self._terminate = mt.Event() self.working_dir = working_dir self.pilot_kwargs = rpex_pilot_kwargs self.future_tasks: Dict[str, Future] = {} @@ -532,7 +533,7 @@ def _bulk_collector(self): bulk = list() - while True: + while not self._terminate.is_set(): now = time.time() # time of last submission @@ -552,6 +553,9 @@ def _bulk_collector(self): if len(bulk) >= self._max_bulk_size: break + if self._terminate.is_set(): + break + if bulk: logger.debug('submit bulk: %d', len(bulk)) self.tmgr.submit_tasks(bulk) @@ -588,6 +592,13 @@ def submit(self, func, resource_specification, *args, **kwargs): def shutdown(self, hub=True, targets='all', block=False): """Shutdown the executor, including all RADICAL-Pilot components.""" logger.info("RadicalPilotExecutor is terminating...") + + self._terminate.set() + + # ensure we are in the bulk submssion mode + if self.bulk_mode: + self._bulk_thread.join() + self.session.close(download=True) logger.info("RadicalPilotExecutor is terminated.")