diff --git a/tests/test_local_executor.py b/tests/test_local_executor.py index 29c5e72b..33a1fadb 100644 --- a/tests/test_local_executor.py +++ b/tests/test_local_executor.py @@ -1,7 +1,9 @@ +import os from concurrent.futures import CancelledError, Future import importlib.util from queue import Queue from time import sleep +import shutil import unittest import numpy as np @@ -16,6 +18,12 @@ from executorlib.standalone.interactive.backend import call_funct from executorlib.standalone.serialize import cloudpickle_register +try: + import h5py + + skip_h5py_test = False +except ImportError: + skip_h5py_test = True skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None @@ -32,6 +40,11 @@ def echo_funct(i): return i +def calc_sleep(i): + sleep(i) + return i + + def get_global(memory=None): return memory @@ -473,3 +486,40 @@ def test_execute_task_parallel(self): ) self.assertEqual(f.result(), [np.array(4), np.array(4)]) q.join() + + +class TestFuturePoolCache(unittest.TestCase): + def tearDown(self): + shutil.rmtree("./cache") + + @unittest.skipIf( + skip_h5py_test, "h5py is not installed, so the h5py tests are skipped." + ) + def test_execute_task_cache(self): + f1 = Future() + f2 = Future() + q1 = Queue() + q2 = Queue() + q1.put({"fn": calc_sleep, "args": (), "kwargs": {"i": 1}, "future": f1}) + q1.put({"shutdown": True, "wait": True}) + q2.put({"fn": calc_sleep, "args": (), "kwargs": {"i": 1}, "future": f2}) + q2.put({"shutdown": True, "wait": True}) + cloudpickle_register(ind=1) + execute_parallel_tasks( + future_queue=q1, + cores=1, + openmpi_oversubscribe=False, + spawner=MpiExecSpawner, + cache_directory="./cache", + ) + execute_parallel_tasks( + future_queue=q2, + cores=1, + openmpi_oversubscribe=False, + spawner=MpiExecSpawner, + cache_directory="./cache", + ) + self.assertEqual(f1.result(), 1) + self.assertEqual(f2.result(), 1) + q1.join() + q2.join()