Skip to content

Commit

Permalink
Add an MPIExecutor (#3423)
Browse files Browse the repository at this point in the history
Add MPIExecutor -- a wrapper class over HTEx which fixes or removes options irrelevant when enable_mpi_mode=True.
  • Loading branch information
WardLT authored May 17, 2024
1 parent 562194d commit b214714
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 361 deletions.
1 change: 1 addition & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ Executors
parsl.executors.status_handling.BlockProviderExecutor
parsl.executors.ThreadPoolExecutor
parsl.executors.HighThroughputExecutor
parsl.executors.MPIExecutor
parsl.executors.WorkQueueExecutor
parsl.executors.taskvine.TaskVineExecutor
parsl.executors.FluxExecutor
Expand Down
372 changes: 88 additions & 284 deletions docs/userguide/mpi_apps.rst

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions parsl/executors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from parsl.executors.threads import ThreadPoolExecutor
from parsl.executors.workqueue.executor import WorkQueueExecutor
from parsl.executors.high_throughput.executor import HighThroughputExecutor
from parsl.executors.high_throughput.mpi_executor import MPIExecutor
from parsl.executors.flux.executor import FluxExecutor

__all__ = ['ThreadPoolExecutor',
'HighThroughputExecutor',
'MPIExecutor',
'WorkQueueExecutor',
'FluxExecutor']
157 changes: 80 additions & 77 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,47 +62,7 @@
"--mpi-launcher={mpi_launcher} "
"--available-accelerators {accelerators}")


class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageInformation):
"""Executor designed for cluster-scale
The HighThroughputExecutor system has the following components:
1. The HighThroughputExecutor instance which is run as part of the Parsl script.
2. The Interchange which acts as a load-balancing proxy between workers and Parsl
3. The multiprocessing based worker pool which coordinates task execution over several
cores on a node.
4. ZeroMQ pipes connect the HighThroughputExecutor, Interchange and the process_worker_pool
Here is a diagram
.. code:: python
| Data | Executor | Interchange | External Process(es)
| Flow | | |
Task | Kernel | | |
+----->|-------->|------------>|->outgoing_q---|-> process_worker_pool
| | | | batching | | |
Parsl<---Fut-| | | load-balancing| result exception
^ | | | watchdogs | | |
| | | Q_mngmnt | | V V
| | | Thread<--|-incoming_q<---|--- +---------+
| | | | | |
| | | | | |
+----update_fut-----+
Each of the workers in each process_worker_pool has access to its local rank through
an environmental variable, ``PARSL_WORKER_RANK``. The local rank is unique for each process
and is an integer in the range from 0 to the number of workers per in the pool minus 1.
The workers also have access to the ID of the worker pool as ``PARSL_WORKER_POOL_ID``
and the size of the worker pool as ``PARSL_WORKER_COUNT``.
Parameters
----------
provider : :class:`~parsl.providers.base.ExecutionProvider`
GENERAL_HTEX_PARAM_DOCS = """provider : :class:`~parsl.providers.base.ExecutionProvider`
Provider to access computation resources. Can be one of :class:`~parsl.providers.aws.aws.EC2Provider`,
:class:`~parsl.providers.cobalt.cobalt.Cobalt`,
:class:`~parsl.providers.condor.condor.Condor`,
Expand Down Expand Up @@ -148,39 +108,6 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageIn
worker_debug : Bool
Enables worker debug logging.
cores_per_worker : float
cores to be assigned to each worker. Oversubscription is possible
by setting cores_per_worker < 1.0. Default=1
mem_per_worker : float
GB of memory required per worker. If this option is specified, the node manager
will check the available memory at startup and limit the number of workers such that
the there's sufficient memory for each worker. Default: None
max_workers : int
Deprecated. Please use max_workers_per_node instead.
max_workers_per_node : int
Caps the number of workers launched per node. Default: None
cpu_affinity: string
Whether or how each worker process sets thread affinity. Options include "none" to forgo
any CPU affinity configuration, "block" to assign adjacent cores to workers
(ex: assign 0-1 to worker 0, 2-3 to worker 1), and
"alternating" to assign cores to workers in round-robin
(ex: assign 0,2 to worker 0, 1,3 to worker 1).
The "block-reverse" option assigns adjacent cores to workers, but assigns
the CPUs with large indices to low index workers (ex: assign 2-3 to worker 1, 0,1 to worker 2)
available_accelerators: int | list
Accelerators available for workers to use. Each worker will be pinned to exactly one of the provided
accelerators, and no more workers will be launched than the number of accelerators.
Either provide the list of accelerator names or the number available. If a number is provided,
Parsl will create names as integers starting with 0.
default: empty list
prefetch_capacity : int
Number of tasks that could be prefetched over available worker capacity.
When there are a few tasks (<100) or when tasks are long running, this option should
Expand Down Expand Up @@ -214,6 +141,85 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageIn
worker_logdir_root : string
In case of a remote file system, specify the path to where logs will be kept.
encrypted : bool
Flag to enable/disable encryption (CurveZMQ). Default is False.
""" # Documentation for params used by both HTEx and MPIEx


class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageInformation):
__doc__ = f"""Executor designed for cluster-scale
The HighThroughputExecutor system has the following components:
1. The HighThroughputExecutor instance which is run as part of the Parsl script.
2. The Interchange which acts as a load-balancing proxy between workers and Parsl
3. The multiprocessing based worker pool which coordinates task execution over several
cores on a node.
4. ZeroMQ pipes connect the HighThroughputExecutor, Interchange and the process_worker_pool
Here is a diagram
.. code:: python
| Data | Executor | Interchange | External Process(es)
| Flow | | |
Task | Kernel | | |
+----->|-------->|------------>|->outgoing_q---|-> process_worker_pool
| | | | batching | | |
Parsl<---Fut-| | | load-balancing| result exception
^ | | | watchdogs | | |
| | | Q_mngmnt | | V V
| | | Thread<--|-incoming_q<---|--- +---------+
| | | | | |
| | | | | |
+----update_fut-----+
Each of the workers in each process_worker_pool has access to its local rank through
an environmental variable, ``PARSL_WORKER_RANK``. The local rank is unique for each process
and is an integer in the range from 0 to the number of workers per in the pool minus 1.
The workers also have access to the ID of the worker pool as ``PARSL_WORKER_POOL_ID``
and the size of the worker pool as ``PARSL_WORKER_COUNT``.
Parameters
----------
{GENERAL_HTEX_PARAM_DOCS}
cores_per_worker : float
cores to be assigned to each worker. Oversubscription is possible
by setting cores_per_worker < 1.0. Default=1
mem_per_worker : float
GB of memory required per worker. If this option is specified, the node manager
will check the available memory at startup and limit the number of workers such that
the there's sufficient memory for each worker. Default: None
max_workers : int
Deprecated. Please use max_workers_per_node instead.
max_workers_per_node : int
Caps the number of workers launched per node. Default: None
cpu_affinity: string
Whether or how each worker process sets thread affinity. Options include "none" to forgo
any CPU affinity configuration, "block" to assign adjacent cores to workers
(ex: assign 0-1 to worker 0, 2-3 to worker 1), and
"alternating" to assign cores to workers in round-robin
(ex: assign 0,2 to worker 0, 1,3 to worker 1).
The "block-reverse" option assigns adjacent cores to workers, but assigns
the CPUs with large indices to low index workers (ex: assign 2-3 to worker 1, 0,1 to worker 2)
available_accelerators: int | list
Accelerators available for workers to use. Each worker will be pinned to exactly one of the provided
accelerators, and no more workers will be launched than the number of accelerators.
Either provide the list of accelerator names or the number available. If a number is provided,
Parsl will create names as integers starting with 0.
default: empty list
enable_mpi_mode: bool
If enabled, MPI launch prefixes will be composed for the batch scheduler based on
the nodes available in each batch job and the resource_specification dict passed
Expand All @@ -224,9 +230,6 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageIn
This field is only used if enable_mpi_mode is set. Select one from the
list of supported MPI launchers = ("srun", "aprun", "mpiexec").
default: "mpiexec"
encrypted : bool
Flag to enable/disable encryption (CurveZMQ). Default is False.
"""

@typeguard.typechecked
Expand Down
85 changes: 85 additions & 0 deletions parsl/executors/high_throughput/mpi_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""A simplified interface for HTEx when running in MPI mode"""
from typing import Optional, Tuple, List, Union, Callable, Dict

import typeguard

from parsl.data_provider.staging import Staging
from parsl.executors.high_throughput.executor import HighThroughputExecutor, GENERAL_HTEX_PARAM_DOCS
from parsl.executors.status_handling import BlockProviderExecutor
from parsl.jobs.states import JobStatus
from parsl.providers import LocalProvider
from parsl.providers.base import ExecutionProvider


class MPIExecutor(HighThroughputExecutor):
__doc__ = f"""A version of :class:`~parsl.HighThroughputExecutor` tuned for executing multi-node (e.g., MPI) tasks.
The Provider _must_ use the :class:`~parsl.launchers.SimpleLauncher`,
which places a single pool of workers on the first node of a block.
Each worker can then make system calls which use an MPI launcher (e.g., ``mpirun``, ``srun``)
to spawn multi-node tasks.
Specify the maximum number of multi-node tasks to run at once using ``max_workers_per_block``.
The maximum number should be smaller than the ``nodes_per_block`` in the Provider.
Parameters
----------
max_workers_per_block: int
Maximum number of MPI applications to run at once per block
{GENERAL_HTEX_PARAM_DOCS}
"""

@typeguard.typechecked
def __init__(self,
label: str = 'MPIExecutor',
provider: ExecutionProvider = LocalProvider(),
launch_cmd: Optional[str] = None,
address: Optional[str] = None,
worker_ports: Optional[Tuple[int, int]] = None,
worker_port_range: Optional[Tuple[int, int]] = (54000, 55000),
interchange_port_range: Optional[Tuple[int, int]] = (55000, 56000),
storage_access: Optional[List[Staging]] = None,
working_dir: Optional[str] = None,
worker_debug: bool = False,
max_workers_per_block: int = 1,
prefetch_capacity: int = 0,
heartbeat_threshold: int = 120,
heartbeat_period: int = 30,
drain_period: Optional[int] = None,
poll_period: int = 10,
address_probe_timeout: Optional[int] = None,
worker_logdir_root: Optional[str] = None,
mpi_launcher: str = "mpiexec",
block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True,
encrypted: bool = False):
super().__init__(
# Hard-coded settings
cores_per_worker=1e-9, # Ensures there will be at least an absurd number of workers
enable_mpi_mode=True,
max_workers_per_node=max_workers_per_block,

# Everything else
label=label,
provider=provider,
launch_cmd=launch_cmd,
address=address,
worker_ports=worker_ports,
worker_port_range=worker_port_range,
interchange_port_range=interchange_port_range,
storage_access=storage_access,
working_dir=working_dir,
worker_debug=worker_debug,
prefetch_capacity=prefetch_capacity,
heartbeat_threshold=heartbeat_threshold,
heartbeat_period=heartbeat_period,
drain_period=drain_period,
poll_period=poll_period,
address_probe_timeout=address_probe_timeout,
worker_logdir_root=worker_logdir_root,
mpi_launcher=mpi_launcher,
block_error_handler=block_error_handler,
encrypted=encrypted
)

self.max_workers_per_block = max_workers_per_block
3 changes: 3 additions & 0 deletions parsl/executors/high_throughput/mpi_resource_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,11 @@ def get_result(self, block: bool, timeout: float):
"""Return result and relinquish provisioned nodes"""
result_pkl = self.pending_result_q.get(block, timeout=timeout)
result_dict = pickle.loads(result_pkl)
# TODO (wardlt): If the task did not request nodes, it won't be in `self._map_tasks_to_nodes`.
# Causes Parsl to hang. See Issue #3427
if result_dict["type"] == "result":
task_id = result_dict["task_id"]
assert task_id in self._map_tasks_to_nodes, "You are about to experience issue #3427"
nodes_to_reallocate = self._map_tasks_to_nodes[task_id]
self._return_nodes(nodes_to_reallocate)
self._schedule_backlog_tasks()
Expand Down
64 changes: 64 additions & 0 deletions parsl/tests/test_mpi_apps/test_mpiex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Tests for the wrapper class"""
from inspect import signature
from pathlib import Path

import pytest

import parsl
from .test_mpi_mode_enabled import get_env_vars
from parsl import HighThroughputExecutor, Config
from parsl.launchers import SimpleLauncher
from parsl.providers import LocalProvider
from parsl.executors.high_throughput.mpi_executor import MPIExecutor

cwd = Path(__file__).parent.absolute()
pbs_nodefile = cwd.joinpath("mocks", "pbs_nodefile")


def local_config():
return Config(
executors=[
MPIExecutor(
max_workers_per_block=1,
provider=LocalProvider(
worker_init=f"export PBS_NODEFILE={pbs_nodefile}",
launcher=SimpleLauncher()
)
)
]
)


@pytest.mark.local
def test_docstring():
"""Ensure the old kwargs are copied over into the new class"""
assert 'label' in MPIExecutor.__doc__
assert 'max_workers_per_block' in MPIExecutor.__doc__
assert 'available_accelerators' not in MPIExecutor.__doc__


@pytest.mark.local
def test_init():
"""Ensure all relevant kwargs are copied over from HTEx"""

new_kwargs = {'max_workers_per_block'}
excluded_kwargs = {'available_accelerators', 'enable_mpi_mode', 'cores_per_worker', 'max_workers_per_node',
'mem_per_worker', 'cpu_affinity', 'max_workers'}

# Get the kwargs from both HTEx and MPIEx
htex_kwargs = set(signature(HighThroughputExecutor.__init__).parameters)
mpix_kwargs = set(signature(MPIExecutor.__init__).parameters)

assert mpix_kwargs.difference(htex_kwargs) == new_kwargs
assert len(mpix_kwargs.intersection(excluded_kwargs)) == 0
assert mpix_kwargs.union(excluded_kwargs).difference(new_kwargs) == htex_kwargs


@pytest.mark.local
def test_get_env():
future = get_env_vars(parsl_resource_specification={
"num_nodes": 3,
"ranks_per_node": 5,
})
env_vars = future.result()
assert env_vars['PARSL_NUM_RANKS'] == '15'

0 comments on commit b214714

Please sign in to comment.