From d540f1565134027d2c267c5cdbad4f17f12729c9 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sat, 9 Nov 2024 16:27:10 +0100 Subject: [PATCH 01/13] Update validate_number_of_cores() --- executorlib/interactive/executor.py | 22 ++++++++++++++++---- executorlib/interactive/shared.py | 31 ++++++++++++++++++++++------ executorlib/standalone/inputcheck.py | 18 +++++++++------- 3 files changed, 54 insertions(+), 17 deletions(-) diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index cd5ce42c..6e55ae5a 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -197,7 +197,6 @@ def create_executor( of the individual function. init_function (None): optional function to preset arguments for functions which are submitted later """ - max_cores = validate_number_of_cores(max_cores=max_cores, max_workers=max_workers) check_init_function(block_allocation=block_allocation, init_function=init_function) if flux_executor is not None and backend != "flux": backend = "flux" @@ -218,13 +217,18 @@ def create_executor( if block_allocation: resource_dict["init_function"] = init_function return InteractiveExecutor( - max_workers=int(max_cores / cores_per_worker), + max_workers=validate_number_of_cores( + max_cores=max_cores, + max_workers=max_workers, + set_local_cores=False, + ), executor_kwargs=resource_dict, spawner=FluxPythonSpawner, ) else: return InteractiveStepExecutor( max_cores=max_cores, + max_workers=max_workers, executor_kwargs=resource_dict, spawner=FluxPythonSpawner, ) @@ -234,13 +238,18 @@ def create_executor( if block_allocation: resource_dict["init_function"] = init_function return InteractiveExecutor( - max_workers=int(max_cores / cores_per_worker), + max_workers=validate_number_of_cores( + max_cores=max_cores, + max_workers=max_workers, + set_local_cores=False, + ), executor_kwargs=resource_dict, spawner=SrunSpawner, ) else: return InteractiveStepExecutor( max_cores=max_cores, + max_workers=max_workers, executor_kwargs=resource_dict, spawner=SrunSpawner, ) @@ -258,13 +267,18 @@ def create_executor( if block_allocation: resource_dict["init_function"] = init_function return InteractiveExecutor( - max_workers=int(max_cores / cores_per_worker), + max_workers=validate_number_of_cores( + max_cores=max_cores, + max_workers=max_workers, + set_local_cores=True, + ), executor_kwargs=resource_dict, spawner=MpiExecSpawner, ) else: return InteractiveStepExecutor( max_cores=max_cores, + max_workers=max_workers, executor_kwargs=resource_dict, spawner=MpiExecSpawner, ) diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index aebca15d..dc8014bf 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -179,7 +179,8 @@ class InteractiveStepExecutor(ExecutorBase): def __init__( self, - max_cores: int = 1, + max_cores: Optional[int] = None, + max_workers: Optional[int] = None, executor_kwargs: dict = {}, spawner: BaseSpawner = MpiExecSpawner, ): @@ -187,6 +188,7 @@ def __init__( executor_kwargs["future_queue"] = self._future_queue executor_kwargs["spawner"] = spawner executor_kwargs["max_cores"] = max_cores + executor_kwargs["max_workers"] = max_workers self._set_process( RaisingThread( target=execute_separate_tasks, @@ -256,7 +258,8 @@ def execute_parallel_tasks( def execute_separate_tasks( future_queue: queue.Queue, spawner: BaseSpawner = MpiExecSpawner, - max_cores: int = 1, + max_cores: Optional[int] = None, + max_workers: Optional[int] = None, hostname_localhost: Optional[bool] = None, **kwargs, ): @@ -267,6 +270,9 @@ def execute_separate_tasks( future_queue (queue.Queue): task queue of dictionary objects which are submitted to the parallel process spawner (BaseSpawner): Interface to start process on selected compute resources max_cores (int): defines the number cores which can be used in parallel + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of + cores which can be used in parallel - just like the max_cores parameter. Using max_cores is + recommended, as computers have a limited number of compute cores. hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -296,6 +302,7 @@ def execute_separate_tasks( spawner=spawner, executor_kwargs=kwargs, max_cores=max_cores, + max_workers=max_workers, hostname_localhost=hostname_localhost, ) qtask_lst.append(qtask) @@ -389,7 +396,7 @@ def _get_backend_path( def _wait_for_free_slots( - active_task_dict: dict, cores_requested: int, max_cores: int + active_task_dict: dict, cores_requested: int, max_cores: Optional[int] = None, max_workers: Optional[int] = None, ) -> dict: """ Wait for available computing resources to become available. @@ -398,12 +405,19 @@ def _wait_for_free_slots( active_task_dict (dict): Dictionary containing the future objects and the number of cores they require cores_requested (int): Number of cores required for executing the next task max_cores (int): Maximum number cores which can be used + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of + cores which can be used in parallel - just like the max_cores parameter. Using max_cores is + recommended, as computers have a limited number of compute cores. Returns: dict: Dictionary containing the future objects and the number of cores they require """ - while sum(active_task_dict.values()) + cores_requested > max_cores: - active_task_dict = {k: v for k, v in active_task_dict.items() if not k.done()} + if max_cores is not None: + while sum(active_task_dict.values()) + cores_requested > max_cores: + active_task_dict = {k: v for k, v in active_task_dict.items() if not k.done()} + elif max_workers is not None and max_cores is None: + while len(active_task_dict.values()) + 1 > max_workers: + active_task_dict = {k: v for k, v in active_task_dict.items() if not k.done()} return active_task_dict @@ -490,7 +504,8 @@ def _submit_function_to_separate_process( qtask: queue.Queue, spawner: BaseSpawner, executor_kwargs: dict, - max_cores: int = 1, + max_cores: Optional[int] = None, + max_workers: Optional[int] = None, hostname_localhost: Optional[bool] = None, ): """ @@ -503,6 +518,9 @@ def _submit_function_to_separate_process( spawner (BaseSpawner): Interface to start process on selected compute resources executor_kwargs (dict): keyword parameters used to initialize the Executor max_cores (int): defines the number cores which can be used in parallel + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of + cores which can be used in parallel - just like the max_cores parameter. Using max_cores is + recommended, as computers have a limited number of compute cores. hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -525,6 +543,7 @@ def _submit_function_to_separate_process( active_task_dict=active_task_dict, cores_requested=resource_dict["cores"], max_cores=max_cores, + max_workers=max_workers, ) active_task_dict[task_dict["future"]] = resource_dict["cores"] task_kwargs = executor_kwargs.copy() diff --git a/executorlib/standalone/inputcheck.py b/executorlib/standalone/inputcheck.py index 76f6c823..64429882 100644 --- a/executorlib/standalone/inputcheck.py +++ b/executorlib/standalone/inputcheck.py @@ -170,14 +170,18 @@ def check_pysqa_config_directory(pysqa_config_directory: Optional[str]) -> None: def validate_number_of_cores( - max_cores: Optional[int], max_workers: Optional[int] + max_cores: Optional[int] = None, max_workers: Optional[int] = None, set_local_cores: bool = False, ) -> int: """ Validate the number of cores and return the appropriate value. """ - if max_workers is None and max_cores is None: - return multiprocessing.cpu_count() - elif max_workers is not None and max_cores is None: - return max_workers - else: - return max_cores + if max_cores is None and max_workers is None: + if not set_local_cores: + raise ValueError( + "Block allocation requires a fixed set of computational resources. Neither max_cores nor max_workers are defined." + ) + else: + max_workers = multiprocessing.cpu_count() + elif max_cores is not None and max_workers is None: + max_workers = int(max_cores / cores_per_worker) + return max_workers From c859e07d43ea4f561c4314e493debe7ff68e9115 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 9 Nov 2024 15:28:08 +0000 Subject: [PATCH 02/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/interactive/shared.py | 13 ++++++++++--- executorlib/standalone/inputcheck.py | 4 +++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index dc8014bf..727aa58f 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -396,7 +396,10 @@ def _get_backend_path( def _wait_for_free_slots( - active_task_dict: dict, cores_requested: int, max_cores: Optional[int] = None, max_workers: Optional[int] = None, + active_task_dict: dict, + cores_requested: int, + max_cores: Optional[int] = None, + max_workers: Optional[int] = None, ) -> dict: """ Wait for available computing resources to become available. @@ -414,10 +417,14 @@ def _wait_for_free_slots( """ if max_cores is not None: while sum(active_task_dict.values()) + cores_requested > max_cores: - active_task_dict = {k: v for k, v in active_task_dict.items() if not k.done()} + active_task_dict = { + k: v for k, v in active_task_dict.items() if not k.done() + } elif max_workers is not None and max_cores is None: while len(active_task_dict.values()) + 1 > max_workers: - active_task_dict = {k: v for k, v in active_task_dict.items() if not k.done()} + active_task_dict = { + k: v for k, v in active_task_dict.items() if not k.done() + } return active_task_dict diff --git a/executorlib/standalone/inputcheck.py b/executorlib/standalone/inputcheck.py index 64429882..d776cf6b 100644 --- a/executorlib/standalone/inputcheck.py +++ b/executorlib/standalone/inputcheck.py @@ -170,7 +170,9 @@ def check_pysqa_config_directory(pysqa_config_directory: Optional[str]) -> None: def validate_number_of_cores( - max_cores: Optional[int] = None, max_workers: Optional[int] = None, set_local_cores: bool = False, + max_cores: Optional[int] = None, + max_workers: Optional[int] = None, + set_local_cores: bool = False, ) -> int: """ Validate the number of cores and return the appropriate value. From 28e214aa8ceef799f2d092f2ad993f42c09b22db Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sat, 9 Nov 2024 16:34:21 +0100 Subject: [PATCH 03/13] fixes --- executorlib/interactive/executor.py | 3 +++ executorlib/standalone/inputcheck.py | 1 + tests/test_shared_input_check.py | 11 ++++++----- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index 6e55ae5a..9dee5abd 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -220,6 +220,7 @@ def create_executor( max_workers=validate_number_of_cores( max_cores=max_cores, max_workers=max_workers, + cores_per_worker=cores_per_worker, set_local_cores=False, ), executor_kwargs=resource_dict, @@ -241,6 +242,7 @@ def create_executor( max_workers=validate_number_of_cores( max_cores=max_cores, max_workers=max_workers, + cores_per_worker=cores_per_worker, set_local_cores=False, ), executor_kwargs=resource_dict, @@ -270,6 +272,7 @@ def create_executor( max_workers=validate_number_of_cores( max_cores=max_cores, max_workers=max_workers, + cores_per_worker=cores_per_worker, set_local_cores=True, ), executor_kwargs=resource_dict, diff --git a/executorlib/standalone/inputcheck.py b/executorlib/standalone/inputcheck.py index d776cf6b..040bbe51 100644 --- a/executorlib/standalone/inputcheck.py +++ b/executorlib/standalone/inputcheck.py @@ -172,6 +172,7 @@ def check_pysqa_config_directory(pysqa_config_directory: Optional[str]) -> None: def validate_number_of_cores( max_cores: Optional[int] = None, max_workers: Optional[int] = None, + cores_per_worker: Optional[int] = None, set_local_cores: bool = False, ) -> int: """ diff --git a/tests/test_shared_input_check.py b/tests/test_shared_input_check.py index 44f5e599..21cc089d 100644 --- a/tests/test_shared_input_check.py +++ b/tests/test_shared_input_check.py @@ -98,12 +98,13 @@ def test_check_pysqa_config_directory(self): check_pysqa_config_directory(pysqa_config_directory="path/to/config") def test_validate_number_of_cores(self): + with self.assertRaises(ValueError): + validate_number_of_cores(max_cores=None, max_workers=None, cores_per_worker=None) + with self.assertRaises(TypeError): + validate_number_of_cores(max_cores=1, max_workers=None, cores_per_worker=None) self.assertIsInstance( - validate_number_of_cores(max_cores=None, max_workers=None), int - ) - self.assertIsInstance( - validate_number_of_cores(max_cores=1, max_workers=None), int + validate_number_of_cores(max_cores=1, max_workers=None, cores_per_worker=1), int ) self.assertIsInstance( - validate_number_of_cores(max_cores=None, max_workers=1), int + validate_number_of_cores(max_cores=None, max_workers=1, cores_per_worker=None), int ) From a345de5896443a2c125eeb5ed19572f7a8634638 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 9 Nov 2024 15:34:29 +0000 Subject: [PATCH 04/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/test_shared_input_check.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/tests/test_shared_input_check.py b/tests/test_shared_input_check.py index 21cc089d..5e3b0766 100644 --- a/tests/test_shared_input_check.py +++ b/tests/test_shared_input_check.py @@ -99,12 +99,20 @@ def test_check_pysqa_config_directory(self): def test_validate_number_of_cores(self): with self.assertRaises(ValueError): - validate_number_of_cores(max_cores=None, max_workers=None, cores_per_worker=None) + validate_number_of_cores( + max_cores=None, max_workers=None, cores_per_worker=None + ) with self.assertRaises(TypeError): - validate_number_of_cores(max_cores=1, max_workers=None, cores_per_worker=None) + validate_number_of_cores( + max_cores=1, max_workers=None, cores_per_worker=None + ) self.assertIsInstance( - validate_number_of_cores(max_cores=1, max_workers=None, cores_per_worker=1), int + validate_number_of_cores(max_cores=1, max_workers=None, cores_per_worker=1), + int, ) self.assertIsInstance( - validate_number_of_cores(max_cores=None, max_workers=1, cores_per_worker=None), int + validate_number_of_cores( + max_cores=None, max_workers=1, cores_per_worker=None + ), + int, ) From 5d4671592a16a4032f05e1dc19855009fbd3368a Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 10 Nov 2024 08:43:53 +0100 Subject: [PATCH 05/13] fix test --- tests/test_executor_backend_mpi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py index 9a4309b2..29a8d58d 100644 --- a/tests/test_executor_backend_mpi.py +++ b/tests/test_executor_backend_mpi.py @@ -91,7 +91,7 @@ def tearDown(self): ) def test_meta_executor_parallel_cache(self): with Executor( - max_workers=2, + max_cores=2, resource_dict={"cores": 2}, backend="local", block_allocation=True, From 9efd8a5f99d024406f93ee153de7f71f2065f635 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 10 Nov 2024 10:37:12 +0100 Subject: [PATCH 06/13] If the cache is empty execute the task again --- executorlib/interactive/shared.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index 3eb79986..35611a4b 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -632,7 +632,16 @@ def _execute_task_with_cache( data_dict["output"] = future.result() dump(file_name=file_name, data_dict=data_dict) else: - _, result = get_output(file_name=file_name) - future = task_dict["future"] - future.set_result(result) - future_queue.task_done() + exe_flag, result = get_output(file_name=file_name) + if exe_flag: + future = task_dict["future"] + future.set_result(result) + future_queue.task_done() + else: + _execute_task( + interface=interface, + task_dict=task_dict, + future_queue=future_queue, + ) + data_dict["output"] = future.result() + dump(file_name=file_name, data_dict=data_dict) From 81183ed86e96df25acdc4efd50612ed5ed95fb5c Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 10 Nov 2024 17:26:03 +0100 Subject: [PATCH 07/13] Fix for the situation when the same task is executed twice (parallel workers) --- executorlib/standalone/hdf.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/executorlib/standalone/hdf.py b/executorlib/standalone/hdf.py index 9e8c8798..669c2a09 100644 --- a/executorlib/standalone/hdf.py +++ b/executorlib/standalone/hdf.py @@ -22,10 +22,13 @@ def dump(file_name: str, data_dict: dict) -> None: with h5py.File(file_name, "a") as fname: for data_key, data_value in data_dict.items(): if data_key in group_dict.keys(): - fname.create_dataset( - name="/" + group_dict[data_key], - data=np.void(cloudpickle.dumps(data_value)), - ) + try: + fname.create_dataset( + name="/" + group_dict[data_key], + data=np.void(cloudpickle.dumps(data_value)), + ) + except ValueError: + pass def load(file_name: str) -> dict: From cdba297e31c552c71f8d5ab7a517ee6ae77d7108 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 10 Nov 2024 17:26:09 +0100 Subject: [PATCH 08/13] Add tests --- tests/test_executor_backend_mpi.py | 31 ++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py index 9a002136..954c4ca4 100644 --- a/tests/test_executor_backend_mpi.py +++ b/tests/test_executor_backend_mpi.py @@ -14,6 +14,11 @@ def calc(i): return i +def calc_sleep(i): + time.sleep(i) + return i + + def mpi_funct(i): from mpi4py import MPI @@ -92,6 +97,32 @@ class TestExecutorBackendCache(unittest.TestCase): def tearDown(self): shutil.rmtree("./cache") + def test_executor_cache_bypass(self): + with Executor(max_workers=2, backend="local", block_allocation=True, cache_directory="./cache") as exe: + cloudpickle_register(ind=1) + time_1 = time.time() + fs_1 = exe.submit(calc_sleep, 1) + fs_2 = exe.submit(calc_sleep, 1) + self.assertEqual(fs_1.result(), 1) + self.assertTrue(fs_1.done()) + time_2 = time.time() + self.assertEqual(fs_2.result(), 1) + self.assertTrue(fs_2.done()) + time_3 = time.time() + self.assertTrue(time_2 - time_1 > 1) + self.assertTrue(time_3 - time_1 > 1) + time_4 = time.time() + fs_3 = exe.submit(calc_sleep, 1) + fs_4 = exe.submit(calc_sleep, 1) + self.assertEqual(fs_3.result(), 1) + self.assertTrue(fs_3.done()) + time_5 = time.time() + self.assertEqual(fs_4.result(), 1) + self.assertTrue(fs_4.done()) + time_6 = time.time() + self.assertTrue(time_5 - time_4 < 1) + self.assertTrue(time_6 - time_4 < 1) + @unittest.skipIf( skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." ) From e34b5de8a5be2a4cd546e0f9967ac71cb9f55c8b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 10 Nov 2024 16:26:17 +0000 Subject: [PATCH 09/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/test_executor_backend_mpi.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py index 954c4ca4..bc60c83b 100644 --- a/tests/test_executor_backend_mpi.py +++ b/tests/test_executor_backend_mpi.py @@ -98,7 +98,12 @@ def tearDown(self): shutil.rmtree("./cache") def test_executor_cache_bypass(self): - with Executor(max_workers=2, backend="local", block_allocation=True, cache_directory="./cache") as exe: + with Executor( + max_workers=2, + backend="local", + block_allocation=True, + cache_directory="./cache", + ) as exe: cloudpickle_register(ind=1) time_1 = time.time() fs_1 = exe.submit(calc_sleep, 1) From a6079bf8af3af5061658afb11564f71400f35555 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 10 Nov 2024 22:26:32 +0100 Subject: [PATCH 10/13] one more fix --- tests/test_executor_backend_mpi.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py index bc60c83b..f04745d4 100644 --- a/tests/test_executor_backend_mpi.py +++ b/tests/test_executor_backend_mpi.py @@ -1,11 +1,17 @@ import importlib.util import shutil -import time +import timefix import unittest from executorlib import Executor from executorlib.standalone.serialize import cloudpickle_register +try: + import h5py + + skip_h5py_test = False +except ImportError: + skip_h5py_test = True skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None @@ -97,6 +103,9 @@ class TestExecutorBackendCache(unittest.TestCase): def tearDown(self): shutil.rmtree("./cache") + @unittest.skipIf( + skip_h5py_test, "h5py is not installed, so the h5py tests are skipped." + ) def test_executor_cache_bypass(self): with Executor( max_workers=2, From 7f151ccbd661f4a7dd47a6044fc982348e8b7140 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 10 Nov 2024 22:29:09 +0100 Subject: [PATCH 11/13] clean up --- tests/test_executor_backend_mpi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py index f04745d4..aa4a4b90 100644 --- a/tests/test_executor_backend_mpi.py +++ b/tests/test_executor_backend_mpi.py @@ -1,6 +1,6 @@ import importlib.util import shutil -import timefix +import time import unittest from executorlib import Executor From 0f614d5bccab53ce1295eef7c633c87e48704c99 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 10 Nov 2024 22:47:52 +0100 Subject: [PATCH 12/13] Add missing test --- tests/test_local_executor.py | 50 ++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/tests/test_local_executor.py b/tests/test_local_executor.py index 29c5e72b..33a1fadb 100644 --- a/tests/test_local_executor.py +++ b/tests/test_local_executor.py @@ -1,7 +1,9 @@ +import os from concurrent.futures import CancelledError, Future import importlib.util from queue import Queue from time import sleep +import shutil import unittest import numpy as np @@ -16,6 +18,12 @@ from executorlib.standalone.interactive.backend import call_funct from executorlib.standalone.serialize import cloudpickle_register +try: + import h5py + + skip_h5py_test = False +except ImportError: + skip_h5py_test = True skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None @@ -32,6 +40,11 @@ def echo_funct(i): return i +def calc_sleep(i): + sleep(i) + return i + + def get_global(memory=None): return memory @@ -473,3 +486,40 @@ def test_execute_task_parallel(self): ) self.assertEqual(f.result(), [np.array(4), np.array(4)]) q.join() + + +class TestFuturePoolCache(unittest.TestCase): + def tearDown(self): + shutil.rmtree("./cache") + + @unittest.skipIf( + skip_h5py_test, "h5py is not installed, so the h5py tests are skipped." + ) + def test_execute_task_cache(self): + f1 = Future() + f2 = Future() + q1 = Queue() + q2 = Queue() + q1.put({"fn": calc_sleep, "args": (), "kwargs": {"i": 1}, "future": f1}) + q1.put({"shutdown": True, "wait": True}) + q2.put({"fn": calc_sleep, "args": (), "kwargs": {"i": 1}, "future": f2}) + q2.put({"shutdown": True, "wait": True}) + cloudpickle_register(ind=1) + execute_parallel_tasks( + future_queue=q1, + cores=1, + openmpi_oversubscribe=False, + spawner=MpiExecSpawner, + cache_directory="./cache", + ) + execute_parallel_tasks( + future_queue=q2, + cores=1, + openmpi_oversubscribe=False, + spawner=MpiExecSpawner, + cache_directory="./cache", + ) + self.assertEqual(f1.result(), 1) + self.assertEqual(f2.result(), 1) + q1.join() + q2.join() From 382f86ef7baf3fa6603fedf16f5c72fde49dd3d5 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sun, 10 Nov 2024 22:53:36 +0100 Subject: [PATCH 13/13] Update test_local_executor.py --- tests/test_local_executor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_local_executor.py b/tests/test_local_executor.py index 33a1fadb..71aa6991 100644 --- a/tests/test_local_executor.py +++ b/tests/test_local_executor.py @@ -512,6 +512,7 @@ def test_execute_task_cache(self): spawner=MpiExecSpawner, cache_directory="./cache", ) + sleep(0.5) execute_parallel_tasks( future_queue=q2, cores=1,