Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to specify resources during submit() call rather than on the executor level #293

Merged
merged 21 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/unittest-flux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
timeout-minutes: 5
run: >
flux start
coverage run -a --omit="pympipool/_version.py,tests/*" -m unittest tests/test_flux_executor.py;
coverage run -a --omit="pympipool/_version.py,tests/*" -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py;
env:
OMPI_MCA_plm: 'isolated'
OMPI_MCA_rmaps_base_oversubscribe: 'yes'
Expand Down
114 changes: 80 additions & 34 deletions pympipool/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
import shutil
from typing import Optional
from ._version import get_versions
from pympipool.mpi.executor import PyMPIExecutor
from pympipool.mpi.executor import PyMPIExecutor, PyMPIStepExecutor
from pympipool.shared.interface import SLURM_COMMAND
from pympipool.shell.executor import SubprocessExecutor
from pympipool.shell.interactive import ShellExecutor
from pympipool.slurm.executor import PySlurmExecutor
from pympipool.slurm.executor import PySlurmExecutor, PySlurmStepExecutor

try: # The PyFluxExecutor requires flux-core to be installed.
from pympipool.flux.executor import PyFluxExecutor
from pympipool.flux.executor import PyFluxExecutor, PyFluxStepExecutor

flux_installed = "FLUX_URI" in os.environ
except ImportError:
Expand All @@ -32,12 +32,11 @@ class Executor:
an interactive Jupyter notebook.

Args:
max_workers (int): defines the number workers which can execute functions in parallel
max_cores (int): defines the number cores which can be used in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_worker (int): number of GPUs per worker - defaults to 0
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Expand All @@ -46,6 +45,13 @@ class Executor:
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
backend (str): Switch between the different backends "flux", "mpi" or "slurm". Alternatively, when "auto"
is selected (the default) the available backend is determined automatically.
block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource
requirements, pympipool supports block allocation. In this case all resources have
to be defined on the executor, rather than during the submission of the individual
function.
init_function (None): optional function to preset arguments for functions which are submitted later

Examples:
```
Expand All @@ -70,32 +76,34 @@ class Executor:

def __init__(
self,
max_workers: int = 1,
max_cores: int = 1,
cores_per_worker: int = 1,
threads_per_core: int = 1,
gpus_per_worker: int = 0,
oversubscribe: bool = False,
init_function: Optional[callable] = None,
cwd: Optional[str] = None,
executor=None,
hostname_localhost: bool = False,
backend="auto",
block_allocation: bool = True,
init_function: Optional[callable] = None,
):
# Use __new__() instead of __init__(). This function is only implemented to enable auto-completion.
pass

def __new__(
cls,
max_workers: int = 1,
max_cores: int = 1,
cores_per_worker: int = 1,
threads_per_core: int = 1,
gpus_per_worker: int = 0,
oversubscribe: bool = False,
init_function: Optional[callable] = None,
cwd: Optional[str] = None,
executor=None,
hostname_localhost: bool = False,
backend: str = "auto",
block_allocation: bool = False,
init_function: Optional[callable] = None,
):
"""
Instead of returning a pympipool.Executor object this function returns either a pympipool.mpi.PyMPIExecutor,
Expand All @@ -106,12 +114,11 @@ def __new__(
requires the SLURM workload manager to be installed on the system.

Args:
max_workers (int): defines the number workers which can execute functions in parallel
max_cores (int): defines the number cores which can be used in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_worker (int): number of GPUs per worker - defaults to 0
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Expand All @@ -122,8 +129,15 @@ def __new__(
option to true
backend (str): Switch between the different backends "flux", "mpi" or "slurm". Alternatively, when "auto"
is selected (the default) the available backend is determined automatically.
block_allocation (boolean): To accelerate the submission of a series of python functions with the same
resource requirements, pympipool supports block allocation. In this case all
resources have to be defined on the executor, rather than during the submission
of the individual function.
init_function (None): optional function to preset arguments for functions which are submitted later

"""
if not block_allocation and init_function is not None:
raise ValueError("")
if backend not in ["auto", "mpi", "slurm", "flux"]:
raise ValueError(
'The currently implemented backends are ["flux", "mpi", "slurm"]. '
Expand All @@ -137,23 +151,47 @@ def __new__(
"Oversubscribing is not supported for the pympipool.flux.PyFLuxExecutor backend."
"Please use oversubscribe=False instead of oversubscribe=True."
)
return PyFluxExecutor(
max_workers=max_workers,
cores_per_worker=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_worker=gpus_per_worker,
init_function=init_function,
cwd=cwd,
hostname_localhost=hostname_localhost,
)
if block_allocation:
return PyFluxExecutor(
max_workers=int(max_cores / cores_per_worker),
cores_per_worker=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_worker=gpus_per_worker,
init_function=init_function,
cwd=cwd,
hostname_localhost=hostname_localhost,
)
else:
return PyFluxStepExecutor(
max_cores=max_cores,
cores_per_worker=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_worker=gpus_per_worker,
cwd=cwd,
hostname_localhost=hostname_localhost,
)
elif backend == "slurm" or (backend == "auto" and slurm_installed):
return PySlurmExecutor(
max_workers=max_workers,
cores_per_worker=cores_per_worker,
init_function=init_function,
cwd=cwd,
hostname_localhost=hostname_localhost,
)
if block_allocation:
return PySlurmExecutor(
max_workers=int(max_cores / cores_per_worker),
cores_per_worker=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_worker=gpus_per_worker,
oversubscribe=oversubscribe,
init_function=init_function,
cwd=cwd,
hostname_localhost=hostname_localhost,
)
else:
return PySlurmStepExecutor(
max_cores=max_cores,
cores_per_worker=cores_per_worker,
threads_per_core=threads_per_core,
gpus_per_worker=gpus_per_worker,
oversubscribe=oversubscribe,
cwd=cwd,
hostname_localhost=hostname_localhost,
)
else: # backend="mpi"
if threads_per_core != 1:
raise TypeError(
Expand All @@ -169,10 +207,18 @@ def __new__(
+ str(gpus_per_worker)
+ "."
)
return PyMPIExecutor(
max_workers=max_workers,
cores_per_worker=cores_per_worker,
init_function=init_function,
cwd=cwd,
hostname_localhost=hostname_localhost,
)
if block_allocation:
return PyMPIExecutor(
max_workers=int(max_cores / cores_per_worker),
cores_per_worker=cores_per_worker,
init_function=init_function,
cwd=cwd,
hostname_localhost=hostname_localhost,
)
else:
return PyMPIStepExecutor(
max_cores=max_cores,
cores_per_worker=cores_per_worker,
cwd=cwd,
hostname_localhost=hostname_localhost,
)
2 changes: 1 addition & 1 deletion pympipool/flux/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from pympipool.flux.executor import PyFluxExecutor
from pympipool.flux.executor import PyFluxExecutor, PyFluxStepExecutor
77 changes: 76 additions & 1 deletion pympipool/flux/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import flux.job

from pympipool.shared.executorbase import (
ExecutorBroker,
execute_parallel_tasks,
execute_separate_tasks,
ExecutorBroker,
ExecutorSteps,
)
from pympipool.shared.interface import BaseInterface
from pympipool.shared.thread import RaisingThread
Expand Down Expand Up @@ -90,6 +92,79 @@ def __init__(
)


class PyFluxStepExecutor(ExecutorSteps):
"""
The pympipool.flux.PyFluxStepExecutor leverages the flux framework to distribute python tasks within a queuing
system allocation. In analogy to the pympipool.slurm.PySlurmExecutur it provides the option to specify the number
of threads per worker as well as the number of GPUs per worker in addition to specifying the number of cores per
worker.

Args:
max_cores (int): defines the number workers which can execute functions in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_worker (int): number of GPUs per worker - defaults to 0
init_function (None): optional function to preset arguments for functions which are submitted later
cwd (str/None): current working directory where the parallel python task is executed
executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
in principle any computer should be able to resolve that their own hostname
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true

Examples:

>>> import numpy as np
>>> from pympipool.flux import PyFluxStepExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
>>> size = MPI.COMM_WORLD.Get_size()
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> with PyFluxStepExecutor(max_cores=2) as p:
>>> fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2})
>>> print(fs.result())

[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]

"""

def __init__(
self,
max_cores: int = 1,
cores_per_worker: int = 1,
threads_per_core: int = 1,
gpus_per_worker: int = 0,
cwd: Optional[str] = None,
executor: Optional[flux.job.FluxExecutor] = None,
hostname_localhost: Optional[bool] = False,
):
super().__init__()
self._set_process(
RaisingThread(
target=execute_separate_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores_per_worker,
"interface_class": FluxPythonInterface,
"max_cores": max_cores,
"hostname_localhost": hostname_localhost,
# Interface Arguments
"threads_per_core": threads_per_core,
"gpus_per_core": int(gpus_per_worker / cores_per_worker),
"cwd": cwd,
"executor": executor,
},
)
)


class FluxPythonInterface(BaseInterface):
def __init__(
self,
Expand Down
2 changes: 1 addition & 1 deletion pympipool/mpi/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from pympipool.mpi.executor import PyMPIExecutor
from pympipool.mpi.executor import PyMPIExecutor, PyMPIStepExecutor
69 changes: 69 additions & 0 deletions pympipool/mpi/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

from pympipool.shared.executorbase import (
execute_parallel_tasks,
execute_separate_tasks,
ExecutorBroker,
ExecutorSteps,
)
from pympipool.shared.interface import MpiExecInterface
from pympipool.shared.thread import RaisingThread
Expand Down Expand Up @@ -80,3 +82,70 @@ def __init__(
for _ in range(max_workers)
],
)


class PyMPIStepExecutor(ExecutorSteps):
"""
The pympipool.mpi.PyMPIStepExecutor leverages the message passing interface MPI to distribute python tasks within an
MPI allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.mpi.PyMPIStepExecutor can be executed
in a serial python process and does not require the python script to be executed with MPI. Consequently, it is
primarily an abstraction of its functionality to improve the usability in particular when used in combination with \
Jupyter notebooks.

Args:
max_cores (int): defines the number cores which can be used in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False
cwd (str/None): current working directory where the parallel python task is executed
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
in principle any computer should be able to resolve that their own hostname
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true

Examples:

>>> import numpy as np
>>> from pympipool.mpi import PyMPIStepExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
>>> size = MPI.COMM_WORLD.Get_size()
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> with PyMPIStepExecutor(max_cores=2) as p:
>>> fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2})
>>> print(fs.result())

[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]

"""

def __init__(
self,
max_cores: int = 1,
cores_per_worker: int = 1,
oversubscribe: bool = False,
cwd: Optional[str] = None,
hostname_localhost: bool = False,
):
super().__init__()
self._set_process(
RaisingThread(
target=execute_separate_tasks,
kwargs={
# Executor Arguments
"future_queue": self._future_queue,
"cores": cores_per_worker,
"interface_class": MpiExecInterface,
"max_cores": max_cores,
"hostname_localhost": hostname_localhost,
# Interface Arguments
"cwd": cwd,
"oversubscribe": oversubscribe,
},
)
)
Loading
Loading