Skip to content

Commit

Permalink
Move cache into the resource dict
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-janssen committed Nov 14, 2024
1 parent a4c7c7b commit 04ee989
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 25 deletions.
7 changes: 0 additions & 7 deletions executorlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class Executor:
cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
recommended, as computers have a limited number of compute cores.
backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
max_cores (int): defines the number cores which can be used in parallel
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores_per_worker (int): number of MPI cores to be used for each function call
Expand Down Expand Up @@ -85,7 +84,6 @@ def __init__(
self,
max_workers: Optional[int] = None,
backend: str = "local",
cache_directory: Optional[str] = None,
max_cores: Optional[int] = None,
resource_dict: Optional[dict] = None,
flux_executor=None,
Expand All @@ -106,7 +104,6 @@ def __new__(
cls,
max_workers: Optional[int] = None,
backend: str = "local",
cache_directory: Optional[str] = None,
max_cores: Optional[int] = None,
resource_dict: Optional[dict] = None,
flux_executor=None,
Expand All @@ -133,7 +130,6 @@ def __new__(
number of cores which can be used in parallel - just like the max_cores parameter. Using
max_cores is recommended, as computers have a limited number of compute cores.
backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
max_cores (int): defines the number cores which can be used in parallel
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores (int): number of MPI cores to be used for each function call
Expand Down Expand Up @@ -186,7 +182,6 @@ def __new__(
max_workers=max_workers,
backend=backend,
max_cores=max_cores,
cache_directory=cache_directory,
resource_dict=resource_dict,
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
Expand All @@ -202,7 +197,6 @@ def __new__(
return ExecutorWithDependencies(
max_workers=max_workers,
backend=backend,
cache_directory=cache_directory,
max_cores=max_cores,
resource_dict=resource_dict,
flux_executor=flux_executor,
Expand All @@ -221,7 +215,6 @@ def __new__(
return create_executor(
max_workers=max_workers,
backend=backend,
cache_directory=cache_directory,
max_cores=max_cores,
resource_dict=resource_dict,
flux_executor=flux_executor,
Expand Down
9 changes: 2 additions & 7 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
class FileExecutor(ExecutorBase):
def __init__(
self,
cache_directory: str = "cache",
resource_dict: Optional[dict] = None,
execute_function: callable = execute_with_pysqa,
terminate_function: Optional[callable] = None,
Expand All @@ -38,7 +37,6 @@ def __init__(
Initialize the FileExecutor.
Args:
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores (int): number of MPI cores to be used for each function call
- cwd (str/None): current working directory where the parallel python task is executed
Expand All @@ -52,6 +50,7 @@ def __init__(
default_resource_dict = {
"cores": 1,
"cwd": None,
"cache": "cache",
}
if resource_dict is None:
resource_dict = {}
Expand All @@ -60,7 +59,7 @@ def __init__(
)
if execute_function == execute_in_subprocess and terminate_function is None:
terminate_function = terminate_subprocess
cache_directory_path = os.path.abspath(cache_directory)
cache_directory_path = os.path.abspath(resource_dict.pop("cache"))
os.makedirs(cache_directory_path, exist_ok=True)
self._set_process(
RaisingThread(
Expand All @@ -83,7 +82,6 @@ def create_file_executor(
max_workers: int = 1,
backend: str = "pysqa_flux",
max_cores: int = 1,
cache_directory: Optional[str] = None,
resource_dict: Optional[dict] = None,
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
Expand All @@ -94,8 +92,6 @@ def create_file_executor(
init_function: Optional[callable] = None,
disable_dependencies: bool = False,
):
if cache_directory is None:
cache_directory = "executorlib_cache"
if block_allocation:
raise ValueError(
"The option block_allocation is not available with the pysqa based backend."
Expand All @@ -110,7 +106,6 @@ def create_file_executor(
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
return FileExecutor(
cache_directory=cache_directory,
resource_dict=resource_dict,
pysqa_config_directory=pysqa_config_directory,
backend=backend.split("pysqa_")[-1],
Expand Down
3 changes: 0 additions & 3 deletions executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ def create_executor(
max_workers: Optional[int] = None,
backend: str = "local",
max_cores: Optional[int] = None,
cache_directory: Optional[str] = None,
resource_dict: Optional[dict] = None,
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
Expand All @@ -173,7 +172,6 @@ def create_executor(
recommended, as computers have a limited number of compute cores.
backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
max_cores (int): defines the number cores which can be used in parallel
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores (int): number of MPI cores to be used for each function call
- threads_per_core (int): number of OpenMP threads to be used for each function call
Expand Down Expand Up @@ -202,7 +200,6 @@ def create_executor(
backend = "flux"
check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
cores_per_worker = resource_dict["cores"]
resource_dict["cache_directory"] = cache_directory
resource_dict["hostname_localhost"] = hostname_localhost
if backend == "flux":
check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"])
Expand Down
7 changes: 3 additions & 4 deletions executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ def execute_parallel_tasks(
spawner: BaseSpawner = MpiExecSpawner,
hostname_localhost: Optional[bool] = None,
init_function: Optional[Callable] = None,
cache_directory: Optional[str] = None,
**kwargs,
) -> None:
"""
Expand All @@ -221,7 +220,6 @@ def execute_parallel_tasks(
this look up for security reasons. So on MacOS it is required to set this
option to true
init_function (callable): optional function to preset arguments for functions which are submitted later
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
"""
interface = interface_bootup(
command_lst=_get_backend_path(
Expand All @@ -242,7 +240,8 @@ def execute_parallel_tasks(
future_queue.join()
break
elif "fn" in task_dict.keys() and "future" in task_dict.keys():
if cache_directory is None:
resource_dict = task_dict.get("resource_dict", {})
if "cache" in resource_dict:
_execute_task(
interface=interface, task_dict=task_dict, future_queue=future_queue
)
Expand All @@ -251,7 +250,7 @@ def execute_parallel_tasks(
interface=interface,
task_dict=task_dict,
future_queue=future_queue,
cache_directory=cache_directory,
cache_directory=resource_dict["cache"],
)


Expand Down
3 changes: 1 addition & 2 deletions tests/test_cache_executor_pysqa_flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ class TestCacheExecutorPysqa(unittest.TestCase):
def test_executor(self):
with Executor(
backend="pysqa_flux",
resource_dict={"cores": 2, "cwd": "cache"},
resource_dict={"cores": 2, "cwd": "cache", "cache": "cache"},
block_allocation=False,
cache_directory="cache",
) as exe:
cloudpickle_register(ind=1)
fs1 = exe.submit(mpi_funct, 1)
Expand Down
3 changes: 1 addition & 2 deletions tests/test_executor_backend_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,9 @@ def tearDown(self):
def test_meta_executor_parallel_cache(self):
with Executor(
max_workers=2,
resource_dict={"cores": 2},
resource_dict={"cores": 2, "cache": "./cache"},
backend="local",
block_allocation=True,
cache_directory="./cache",
) as exe:
cloudpickle_register(ind=1)
time_1 = time.time()
Expand Down

0 comments on commit 04ee989

Please sign in to comment.