diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 3337653a..6da76e57 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -29,7 +29,6 @@ class Executor: cores which can be used in parallel - just like the max_cores parameter. Using max_cores is recommended, as computers have a limited number of compute cores. backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local". - cache_directory (str, optional): The directory to store cache files. Defaults to "cache". max_cores (int): defines the number cores which can be used in parallel resource_dict (dict): A dictionary of resources required by the task. With the following keys: - cores_per_worker (int): number of MPI cores to be used for each function call @@ -85,7 +84,6 @@ def __init__( self, max_workers: Optional[int] = None, backend: str = "local", - cache_directory: Optional[str] = None, max_cores: Optional[int] = None, resource_dict: Optional[dict] = None, flux_executor=None, @@ -106,7 +104,6 @@ def __new__( cls, max_workers: Optional[int] = None, backend: str = "local", - cache_directory: Optional[str] = None, max_cores: Optional[int] = None, resource_dict: Optional[dict] = None, flux_executor=None, @@ -133,7 +130,6 @@ def __new__( number of cores which can be used in parallel - just like the max_cores parameter. Using max_cores is recommended, as computers have a limited number of compute cores. backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local". - cache_directory (str, optional): The directory to store cache files. Defaults to "cache". max_cores (int): defines the number cores which can be used in parallel resource_dict (dict): A dictionary of resources required by the task. With the following keys: - cores (int): number of MPI cores to be used for each function call @@ -186,7 +182,6 @@ def __new__( max_workers=max_workers, backend=backend, max_cores=max_cores, - cache_directory=cache_directory, resource_dict=resource_dict, flux_executor=flux_executor, flux_executor_pmi_mode=flux_executor_pmi_mode, @@ -202,7 +197,6 @@ def __new__( return ExecutorWithDependencies( max_workers=max_workers, backend=backend, - cache_directory=cache_directory, max_cores=max_cores, resource_dict=resource_dict, flux_executor=flux_executor, @@ -221,7 +215,6 @@ def __new__( return create_executor( max_workers=max_workers, backend=backend, - cache_directory=cache_directory, max_cores=max_cores, resource_dict=resource_dict, flux_executor=flux_executor, diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index ea42bad8..d91c7f69 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -26,7 +26,6 @@ class FileExecutor(ExecutorBase): def __init__( self, - cache_directory: str = "cache", resource_dict: Optional[dict] = None, execute_function: callable = execute_with_pysqa, terminate_function: Optional[callable] = None, @@ -38,7 +37,6 @@ def __init__( Initialize the FileExecutor. Args: - cache_directory (str, optional): The directory to store cache files. Defaults to "cache". resource_dict (dict): A dictionary of resources required by the task. With the following keys: - cores (int): number of MPI cores to be used for each function call - cwd (str/None): current working directory where the parallel python task is executed @@ -52,6 +50,7 @@ def __init__( default_resource_dict = { "cores": 1, "cwd": None, + "cache": "cache", } if resource_dict is None: resource_dict = {} @@ -60,7 +59,7 @@ def __init__( ) if execute_function == execute_in_subprocess and terminate_function is None: terminate_function = terminate_subprocess - cache_directory_path = os.path.abspath(cache_directory) + cache_directory_path = os.path.abspath(resource_dict.pop("cache")) os.makedirs(cache_directory_path, exist_ok=True) self._set_process( RaisingThread( @@ -83,7 +82,6 @@ def create_file_executor( max_workers: int = 1, backend: str = "pysqa_flux", max_cores: int = 1, - cache_directory: Optional[str] = None, resource_dict: Optional[dict] = None, flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, @@ -94,8 +92,6 @@ def create_file_executor( init_function: Optional[callable] = None, disable_dependencies: bool = False, ): - if cache_directory is None: - cache_directory = "executorlib_cache" if block_allocation: raise ValueError( "The option block_allocation is not available with the pysqa based backend." @@ -110,7 +106,6 @@ def create_file_executor( check_executor(executor=flux_executor) check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) return FileExecutor( - cache_directory=cache_directory, resource_dict=resource_dict, pysqa_config_directory=pysqa_config_directory, backend=backend.split("pysqa_")[-1], diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index 067dec50..1d9fe504 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -150,7 +150,6 @@ def create_executor( max_workers: Optional[int] = None, backend: str = "local", max_cores: Optional[int] = None, - cache_directory: Optional[str] = None, resource_dict: Optional[dict] = None, flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, @@ -173,7 +172,6 @@ def create_executor( recommended, as computers have a limited number of compute cores. backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local". max_cores (int): defines the number cores which can be used in parallel - cache_directory (str, optional): The directory to store cache files. Defaults to "cache". resource_dict (dict): A dictionary of resources required by the task. With the following keys: - cores (int): number of MPI cores to be used for each function call - threads_per_core (int): number of OpenMP threads to be used for each function call @@ -202,7 +200,6 @@ def create_executor( backend = "flux" check_pmi(backend=backend, pmi=flux_executor_pmi_mode) cores_per_worker = resource_dict["cores"] - resource_dict["cache_directory"] = cache_directory resource_dict["hostname_localhost"] = hostname_localhost if backend == "flux": check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"]) diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index 7fdc95f9..109dd988 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -203,7 +203,6 @@ def execute_parallel_tasks( spawner: BaseSpawner = MpiExecSpawner, hostname_localhost: Optional[bool] = None, init_function: Optional[Callable] = None, - cache_directory: Optional[str] = None, **kwargs, ) -> None: """ @@ -221,7 +220,6 @@ def execute_parallel_tasks( this look up for security reasons. So on MacOS it is required to set this option to true init_function (callable): optional function to preset arguments for functions which are submitted later - cache_directory (str, optional): The directory to store cache files. Defaults to "cache". """ interface = interface_bootup( command_lst=_get_backend_path( @@ -242,7 +240,8 @@ def execute_parallel_tasks( future_queue.join() break elif "fn" in task_dict.keys() and "future" in task_dict.keys(): - if cache_directory is None: + resource_dict = task_dict.get("resource_dict", {}) + if "cache" in resource_dict: _execute_task( interface=interface, task_dict=task_dict, future_queue=future_queue ) @@ -251,7 +250,7 @@ def execute_parallel_tasks( interface=interface, task_dict=task_dict, future_queue=future_queue, - cache_directory=cache_directory, + cache_directory=resource_dict["cache"], ) diff --git a/tests/test_cache_executor_pysqa_flux.py b/tests/test_cache_executor_pysqa_flux.py index 995e876a..88ffec9d 100644 --- a/tests/test_cache_executor_pysqa_flux.py +++ b/tests/test_cache_executor_pysqa_flux.py @@ -34,9 +34,8 @@ class TestCacheExecutorPysqa(unittest.TestCase): def test_executor(self): with Executor( backend="pysqa_flux", - resource_dict={"cores": 2, "cwd": "cache"}, + resource_dict={"cores": 2, "cwd": "cache", "cache": "cache"}, block_allocation=False, - cache_directory="cache", ) as exe: cloudpickle_register(ind=1) fs1 = exe.submit(mpi_funct, 1) diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py index 4876cbfc..8f682fd7 100644 --- a/tests/test_executor_backend_mpi.py +++ b/tests/test_executor_backend_mpi.py @@ -98,10 +98,9 @@ def tearDown(self): def test_meta_executor_parallel_cache(self): with Executor( max_workers=2, - resource_dict={"cores": 2}, + resource_dict={"cores": 2, "cache": "./cache"}, backend="local", block_allocation=True, - cache_directory="./cache", ) as exe: cloudpickle_register(ind=1) time_1 = time.time()