Skip to content

Commit

Permalink
Implement resource_dict for file executor (#456)
Browse files Browse the repository at this point in the history
* Implement resource_dict for file executor

* Use pysqa as default backend for FileExecutor

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
jan-janssen and pre-commit-ci[bot] authored Oct 28, 2024
1 parent ceda4c0 commit de646f9
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 47 deletions.
4 changes: 2 additions & 2 deletions executorlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ def __new__(
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
max_cores (int): defines the number cores which can be used in parallel
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores_per_worker (int): number of MPI cores to be used for each function call
- cores (int): number of MPI cores to be used for each function call
- threads_per_core (int): number of OpenMP threads to be used for each function call
- gpus_per_worker (int): number of GPUs per worker - defaults to 0
- gpus_per_core (int): number of GPUs per worker - defaults to 0
- cwd (str/None): current working directory where the parallel python task is executed
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI
and SLURM only) - default False
Expand Down
28 changes: 21 additions & 7 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,19 @@
)
from executorlib.standalone.thread import RaisingThread

try:
from executorlib.standalone.cache.queue import execute_with_pysqa
except ImportError:
# If pysqa is not available fall back to executing tasks in a subprocess
execute_with_pysqa = execute_in_subprocess


class FileExecutor(ExecutorBase):
def __init__(
self,
cache_directory: str = "cache",
cores_per_worker: int = 1,
cwd: Optional[str] = None,
execute_function: callable = execute_in_subprocess,
resource_dict: Optional[dict] = None,
execute_function: callable = execute_with_pysqa,
terminate_function: Optional[callable] = None,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
Expand All @@ -26,14 +31,24 @@ def __init__(
Args:
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores (int): number of MPI cores to be used for each function call
- cwd (str/None): current working directory where the parallel python task is executed
execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess.
cores_per_worker (int, optional): The number of CPU cores per worker. Defaults to 1.
terminate_function (callable, optional): The function to terminate the tasks.
cwd (str, optional): current working directory where the parallel python task is executed
config_directory (str, optional): path to the config directory.
backend (str, optional): name of the backend used to spawn tasks.
"""
super().__init__()
default_resource_dict = {
"cores": 1,
"cwd": None,
}
if resource_dict is None:
resource_dict = {}
resource_dict.update(
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
)
if execute_function == execute_in_subprocess and terminate_function is None:
terminate_function = terminate_subprocess
cache_directory_path = os.path.abspath(cache_directory)
Expand All @@ -45,8 +60,7 @@ def __init__(
"future_queue": self._future_queue,
"execute_function": execute_function,
"cache_directory": cache_directory_path,
"cores_per_worker": cores_per_worker,
"cwd": cwd,
"resource_dict": resource_dict,
"terminate_function": terminate_function,
"config_directory": config_directory,
"backend": backend,
Expand Down
23 changes: 11 additions & 12 deletions executorlib/cache/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ def execute_tasks_h5(
future_queue: queue.Queue,
cache_directory: str,
execute_function: callable,
cores_per_worker: int = 1,
cwd: Optional[str] = None,
resource_dict: dict,
terminate_function: Optional[callable] = None,
config_directory: Optional[str] = None,
backend: Optional[str] = None,
Expand All @@ -62,9 +61,10 @@ def execute_tasks_h5(
Args:
future_queue (queue.Queue): The queue containing the tasks.
cache_directory (str): The directory to store the HDF5 files.
cores_per_worker (int): The number of cores per worker.
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores (int): number of MPI cores to be used for each function call
- cwd (str/None): current working directory where the parallel python task is executed
execute_function (callable): The function to execute the tasks.
cwd (str/None): current working directory where the parallel python task is executed
terminate_function (callable): The function to terminate the tasks.
config_directory (str, optional): path to the config directory.
backend (str, optional): name of the backend used to spawn tasks.
Expand Down Expand Up @@ -97,16 +97,15 @@ def execute_tasks_h5(
memory_dict=memory_dict,
file_name_dict=file_name_dict,
)
resource_dict = task_dict["resource_dict"].copy()
if "cores" not in resource_dict:
resource_dict["cores"] = cores_per_worker
if "cwd" not in resource_dict:
resource_dict["cwd"] = cwd
task_resource_dict = task_dict["resource_dict"].copy()
task_resource_dict.update(
{k: v for k, v in resource_dict.items() if k not in task_resource_dict}
)
task_key, data_dict = serialize_funct_h5(
fn=task_dict["fn"],
fn_args=task_args,
fn_kwargs=task_kwargs,
resource_dict=resource_dict,
resource_dict=task_resource_dict,
)
if task_key not in memory_dict.keys():
if task_key + ".h5out" not in os.listdir(cache_directory):
Expand All @@ -115,12 +114,12 @@ def execute_tasks_h5(
process_dict[task_key] = execute_function(
command=_get_execute_command(
file_name=file_name,
cores=cores_per_worker,
cores=task_resource_dict["cores"],
),
task_dependent_lst=[
process_dict[k] for k in future_wait_key_lst
],
resource_dict=resource_dict,
resource_dict=task_resource_dict,
config_directory=config_directory,
backend=backend,
)
Expand Down
4 changes: 2 additions & 2 deletions executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,9 @@ def create_executor(
max_cores (int): defines the number cores which can be used in parallel
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores_per_worker (int): number of MPI cores to be used for each function call
- cores (int): number of MPI cores to be used for each function call
- threads_per_core (int): number of OpenMP threads to be used for each function call
- gpus_per_worker (int): number of GPUs per worker - defaults to 0
- gpus_per_core (int): number of GPUs per worker - defaults to 0
- cwd (str/None): current working directory where the parallel python task is executed
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
SLURM only) - default False
Expand Down
14 changes: 9 additions & 5 deletions tests/test_cache_executor_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import shutil
import unittest

from executorlib.standalone.cache.spawner import execute_in_subprocess


try:
from executorlib import FileExecutor

skip_h5io_test = False
skip_h5py_test = False
except ImportError:
skip_h5io_test = True
skip_h5py_test = True


skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None
Expand All @@ -24,12 +26,14 @@ def mpi_funct(i):


@unittest.skipIf(
skip_h5io_test or skip_mpi4py_test,
"h5io or mpi4py are not installed, so the h5io and mpi4py tests are skipped.",
skip_h5py_test or skip_mpi4py_test,
"h5py or mpi4py are not installed, so the h5py and mpi4py tests are skipped.",
)
class TestCacheExecutorMPI(unittest.TestCase):
def test_executor(self):
with FileExecutor(cores_per_worker=2) as exe:
with FileExecutor(
resource_dict={"cores": 2}, execute_function=execute_in_subprocess
) as exe:
fs1 = exe.submit(mpi_funct, 1)
self.assertFalse(fs1.done())
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])
Expand Down
4 changes: 1 addition & 3 deletions tests/test_cache_executor_pysqa_flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
try:
import flux.job
from executorlib import FileExecutor
from executorlib.standalone.cache.queue import execute_with_pysqa

skip_flux_test = "FLUX_URI" not in os.environ
pmi = os.environ.get("PYMPIPOOL_PMIX", None)
Expand All @@ -32,8 +31,7 @@ def mpi_funct(i):
class TestCacheExecutorPysqa(unittest.TestCase):
def test_executor(self):
with FileExecutor(
cores_per_worker=2,
execute_function=execute_with_pysqa,
resource_dict={"cores": 2},
backend="flux",
) as exe:
fs1 = exe.submit(mpi_funct, 1)
Expand Down
28 changes: 15 additions & 13 deletions tests/test_cache_executor_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@
import shutil
import unittest

from executorlib.standalone.cache.spawner import (
execute_in_subprocess,
terminate_subprocess,
)
from executorlib.standalone.thread import RaisingThread

try:
from executorlib import FileExecutor
from executorlib.cache.shared import execute_tasks_h5
from executorlib.standalone.cache.spawner import (
execute_in_subprocess,
terminate_subprocess,
)

skip_h5io_test = False
skip_h5py_test = False
except ImportError:
skip_h5io_test = True
skip_h5py_test = True


def my_funct(a, b):
Expand All @@ -28,18 +28,18 @@ def list_files_in_working_directory():


@unittest.skipIf(
skip_h5io_test, "h5io is not installed, so the h5io tests are skipped."
skip_h5py_test, "h5py is not installed, so the h5py tests are skipped."
)
class TestCacheExecutorSerial(unittest.TestCase):
def test_executor_mixed(self):
with FileExecutor() as exe:
with FileExecutor(execute_function=execute_in_subprocess) as exe:
fs1 = exe.submit(my_funct, 1, b=2)
self.assertFalse(fs1.done())
self.assertEqual(fs1.result(), 3)
self.assertTrue(fs1.done())

def test_executor_dependence_mixed(self):
with FileExecutor() as exe:
with FileExecutor(execute_function=execute_in_subprocess) as exe:
fs1 = exe.submit(my_funct, 1, b=2)
fs2 = exe.submit(my_funct, 1, b=fs1)
self.assertFalse(fs2.done())
Expand All @@ -48,7 +48,9 @@ def test_executor_dependence_mixed(self):

def test_executor_working_directory(self):
cwd = os.path.join(os.path.dirname(__file__), "executables")
with FileExecutor(cwd=cwd) as exe:
with FileExecutor(
resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess
) as exe:
fs1 = exe.submit(list_files_in_working_directory)
self.assertEqual(fs1.result(), os.listdir(cwd))

Expand All @@ -72,7 +74,7 @@ def test_executor_function(self):
"future_queue": q,
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"cores_per_worker": 1,
"resource_dict": {"cores": 1, "cwd": None},
"terminate_function": terminate_subprocess,
},
)
Expand Down Expand Up @@ -113,7 +115,7 @@ def test_executor_function_dependence_kwargs(self):
"future_queue": q,
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"cores_per_worker": 1,
"resource_dict": {"cores": 1, "cwd": None},
"terminate_function": terminate_subprocess,
},
)
Expand Down Expand Up @@ -154,7 +156,7 @@ def test_executor_function_dependence_args(self):
"future_queue": q,
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"cores_per_worker": 1,
"resource_dict": {"cores": 1, "cwd": None},
"terminate_function": terminate_subprocess,
},
)
Expand Down
6 changes: 3 additions & 3 deletions tests/test_cache_hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
try:
from executorlib.standalone.hdf import dump, load

skip_h5io_test = False
skip_h5py_test = False
except ImportError:
skip_h5io_test = True
skip_h5py_test = True


def my_funct(a, b):
return a + b


@unittest.skipIf(
skip_h5io_test, "h5io is not installed, so the h5io tests are skipped."
skip_h5py_test, "h5py is not installed, so the h5io tests are skipped."
)
class TestSharedFunctions(unittest.TestCase):
def test_hdf_mixed(self):
Expand Down

0 comments on commit de646f9

Please sign in to comment.