Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an MPIExecutor #3423

Merged
merged 18 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ Executors
parsl.executors.status_handling.BlockProviderExecutor
parsl.executors.ThreadPoolExecutor
parsl.executors.HighThroughputExecutor
parsl.executors.MPIExecutor
parsl.executors.WorkQueueExecutor
parsl.executors.taskvine.TaskVineExecutor
parsl.executors.FluxExecutor
Expand Down
372 changes: 88 additions & 284 deletions docs/userguide/mpi_apps.rst

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions parsl/executors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from parsl.executors.threads import ThreadPoolExecutor
from parsl.executors.workqueue.executor import WorkQueueExecutor
from parsl.executors.high_throughput.executor import HighThroughputExecutor
from parsl.executors.high_throughput.mpi_executor import MPIExecutor
from parsl.executors.flux.executor import FluxExecutor

__all__ = ['ThreadPoolExecutor',
'HighThroughputExecutor',
'MPIExecutor',
'WorkQueueExecutor',
'FluxExecutor']
157 changes: 80 additions & 77 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,47 +62,7 @@
"--mpi-launcher={mpi_launcher} "
"--available-accelerators {accelerators}")


class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageInformation):
"""Executor designed for cluster-scale

The HighThroughputExecutor system has the following components:
1. The HighThroughputExecutor instance which is run as part of the Parsl script.
2. The Interchange which acts as a load-balancing proxy between workers and Parsl
3. The multiprocessing based worker pool which coordinates task execution over several
cores on a node.
4. ZeroMQ pipes connect the HighThroughputExecutor, Interchange and the process_worker_pool

Here is a diagram

.. code:: python


| Data | Executor | Interchange | External Process(es)
| Flow | | |
Task | Kernel | | |
+----->|-------->|------------>|->outgoing_q---|-> process_worker_pool
| | | | batching | | |
Parsl<---Fut-| | | load-balancing| result exception
^ | | | watchdogs | | |
| | | Q_mngmnt | | V V
| | | Thread<--|-incoming_q<---|--- +---------+
| | | | | |
| | | | | |
+----update_fut-----+


Each of the workers in each process_worker_pool has access to its local rank through
an environmental variable, ``PARSL_WORKER_RANK``. The local rank is unique for each process
and is an integer in the range from 0 to the number of workers per in the pool minus 1.
The workers also have access to the ID of the worker pool as ``PARSL_WORKER_POOL_ID``
and the size of the worker pool as ``PARSL_WORKER_COUNT``.


Parameters
----------

provider : :class:`~parsl.providers.base.ExecutionProvider`
GENERAL_HTEX_PARAM_DOCS = """provider : :class:`~parsl.providers.base.ExecutionProvider`
Provider to access computation resources. Can be one of :class:`~parsl.providers.aws.aws.EC2Provider`,
:class:`~parsl.providers.cobalt.cobalt.Cobalt`,
:class:`~parsl.providers.condor.condor.Condor`,
Expand Down Expand Up @@ -148,39 +108,6 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageIn
worker_debug : Bool
Enables worker debug logging.

cores_per_worker : float
cores to be assigned to each worker. Oversubscription is possible
by setting cores_per_worker < 1.0. Default=1

mem_per_worker : float
GB of memory required per worker. If this option is specified, the node manager
will check the available memory at startup and limit the number of workers such that
the there's sufficient memory for each worker. Default: None

max_workers : int
Deprecated. Please use max_workers_per_node instead.

max_workers_per_node : int
Caps the number of workers launched per node. Default: None

cpu_affinity: string
Whether or how each worker process sets thread affinity. Options include "none" to forgo
any CPU affinity configuration, "block" to assign adjacent cores to workers
(ex: assign 0-1 to worker 0, 2-3 to worker 1), and
"alternating" to assign cores to workers in round-robin
(ex: assign 0,2 to worker 0, 1,3 to worker 1).
The "block-reverse" option assigns adjacent cores to workers, but assigns
the CPUs with large indices to low index workers (ex: assign 2-3 to worker 1, 0,1 to worker 2)

available_accelerators: int | list
Accelerators available for workers to use. Each worker will be pinned to exactly one of the provided
accelerators, and no more workers will be launched than the number of accelerators.

Either provide the list of accelerator names or the number available. If a number is provided,
Parsl will create names as integers starting with 0.

default: empty list

prefetch_capacity : int
Number of tasks that could be prefetched over available worker capacity.
When there are a few tasks (<100) or when tasks are long running, this option should
Expand Down Expand Up @@ -214,6 +141,85 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageIn
worker_logdir_root : string
In case of a remote file system, specify the path to where logs will be kept.

encrypted : bool
Flag to enable/disable encryption (CurveZMQ). Default is False.
""" # Documentation for params used by both HTEx and MPIEx


class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageInformation):
__doc__ = f"""Executor designed for cluster-scale

The HighThroughputExecutor system has the following components:
1. The HighThroughputExecutor instance which is run as part of the Parsl script.
2. The Interchange which acts as a load-balancing proxy between workers and Parsl
3. The multiprocessing based worker pool which coordinates task execution over several
cores on a node.
4. ZeroMQ pipes connect the HighThroughputExecutor, Interchange and the process_worker_pool

Here is a diagram

.. code:: python


| Data | Executor | Interchange | External Process(es)
| Flow | | |
Task | Kernel | | |
+----->|-------->|------------>|->outgoing_q---|-> process_worker_pool
| | | | batching | | |
Parsl<---Fut-| | | load-balancing| result exception
^ | | | watchdogs | | |
| | | Q_mngmnt | | V V
| | | Thread<--|-incoming_q<---|--- +---------+
| | | | | |
| | | | | |
+----update_fut-----+


Each of the workers in each process_worker_pool has access to its local rank through
an environmental variable, ``PARSL_WORKER_RANK``. The local rank is unique for each process
and is an integer in the range from 0 to the number of workers per in the pool minus 1.
The workers also have access to the ID of the worker pool as ``PARSL_WORKER_POOL_ID``
and the size of the worker pool as ``PARSL_WORKER_COUNT``.


Parameters
----------

{GENERAL_HTEX_PARAM_DOCS}

cores_per_worker : float
cores to be assigned to each worker. Oversubscription is possible
by setting cores_per_worker < 1.0. Default=1

mem_per_worker : float
GB of memory required per worker. If this option is specified, the node manager
will check the available memory at startup and limit the number of workers such that
the there's sufficient memory for each worker. Default: None

max_workers : int
Deprecated. Please use max_workers_per_node instead.

max_workers_per_node : int
Caps the number of workers launched per node. Default: None

cpu_affinity: string
Whether or how each worker process sets thread affinity. Options include "none" to forgo
any CPU affinity configuration, "block" to assign adjacent cores to workers
(ex: assign 0-1 to worker 0, 2-3 to worker 1), and
"alternating" to assign cores to workers in round-robin
(ex: assign 0,2 to worker 0, 1,3 to worker 1).
The "block-reverse" option assigns adjacent cores to workers, but assigns
the CPUs with large indices to low index workers (ex: assign 2-3 to worker 1, 0,1 to worker 2)

available_accelerators: int | list
Accelerators available for workers to use. Each worker will be pinned to exactly one of the provided
accelerators, and no more workers will be launched than the number of accelerators.

Either provide the list of accelerator names or the number available. If a number is provided,
Parsl will create names as integers starting with 0.

default: empty list

enable_mpi_mode: bool
If enabled, MPI launch prefixes will be composed for the batch scheduler based on
the nodes available in each batch job and the resource_specification dict passed
Expand All @@ -224,9 +230,6 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageIn
This field is only used if enable_mpi_mode is set. Select one from the
list of supported MPI launchers = ("srun", "aprun", "mpiexec").
default: "mpiexec"

encrypted : bool
Flag to enable/disable encryption (CurveZMQ). Default is False.
"""

@typeguard.typechecked
Expand Down
85 changes: 85 additions & 0 deletions parsl/executors/high_throughput/mpi_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""A simplified interface for HTEx when running in MPI mode"""
from typing import Optional, Tuple, List, Union, Callable, Dict

import typeguard

from parsl.data_provider.staging import Staging
from parsl.executors.high_throughput.executor import HighThroughputExecutor, GENERAL_HTEX_PARAM_DOCS
from parsl.executors.status_handling import BlockProviderExecutor
from parsl.jobs.states import JobStatus
from parsl.providers import LocalProvider
from parsl.providers.base import ExecutionProvider


class MPIExecutor(HighThroughputExecutor):
__doc__ = f"""A version of :class:`~parsl.HighThroughputExecutor` tuned for executing multi-node (e.g., MPI) tasks.

The Provider _must_ use the :class:`~parsl.launchers.SimpleLauncher`,
which places a single pool of workers on the first node of a block.
Each worker can then make system calls which use an MPI launcher (e.g., ``mpirun``, ``srun``)
to spawn multi-node tasks.

Specify the maximum number of multi-node tasks to run at once using ``max_workers_per_block``.
The maximum number should be smaller than the ``nodes_per_block`` in the Provider.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WardLT: Just looking at this again. Should it be "The value should be less than or equal to the nodes_per_block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should be less than or equal to


Parameters
----------
max_workers_per_block: int
Maximum number of MPI applications to run at once per block

{GENERAL_HTEX_PARAM_DOCS}
"""

@typeguard.typechecked
def __init__(self,
label: str = 'MPIExecutor',
provider: ExecutionProvider = LocalProvider(),
launch_cmd: Optional[str] = None,
address: Optional[str] = None,
worker_ports: Optional[Tuple[int, int]] = None,
worker_port_range: Optional[Tuple[int, int]] = (54000, 55000),
interchange_port_range: Optional[Tuple[int, int]] = (55000, 56000),
storage_access: Optional[List[Staging]] = None,
working_dir: Optional[str] = None,
worker_debug: bool = False,
max_workers_per_block: int = 1,
prefetch_capacity: int = 0,
heartbeat_threshold: int = 120,
heartbeat_period: int = 30,
Comment on lines +47 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to enumerate all of base class' (HighThroughputExecutor) keyword arguments if we aren't modifying the defaults? My concern is if we update the base class, then we might forget to update MPIExecutor. It seems like a safer approach might be to only explicitly have new and updated keyword arguments and then pass any remaining **kwargs to the base class. Thoughts?

Copy link
Contributor Author

@WardLT WardLT May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using **kwargs was my first thought too, but I wanted to try enumerating them to give better IDE support.

I made a unit test that will fail if the signature of the base class changes. Does that seem sufficient?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable to me!

(I really wish IDEs supported better inspection of classes...)

drain_period: Optional[int] = None,
poll_period: int = 10,
address_probe_timeout: Optional[int] = None,
worker_logdir_root: Optional[str] = None,
mpi_launcher: str = "mpiexec",
block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True,
encrypted: bool = False):
super().__init__(
# Hard-coded settings
cores_per_worker=1e-9, # Ensures there will be at least an absurd number of workers
enable_mpi_mode=True,
max_workers_per_node=max_workers_per_block,

# Everything else
label=label,
provider=provider,
launch_cmd=launch_cmd,
address=address,
worker_ports=worker_ports,
worker_port_range=worker_port_range,
interchange_port_range=interchange_port_range,
storage_access=storage_access,
working_dir=working_dir,
worker_debug=worker_debug,
prefetch_capacity=prefetch_capacity,
heartbeat_threshold=heartbeat_threshold,
heartbeat_period=heartbeat_period,
drain_period=drain_period,
poll_period=poll_period,
address_probe_timeout=address_probe_timeout,
worker_logdir_root=worker_logdir_root,
mpi_launcher=mpi_launcher,
block_error_handler=block_error_handler,
encrypted=encrypted
)

self.max_workers_per_block = max_workers_per_block
3 changes: 3 additions & 0 deletions parsl/executors/high_throughput/mpi_resource_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,11 @@ def get_result(self, block: bool, timeout: float):
"""Return result and relinquish provisioned nodes"""
result_pkl = self.pending_result_q.get(block, timeout=timeout)
result_dict = pickle.loads(result_pkl)
# TODO (wardlt): If the task did not request nodes, it won't be in `self._map_tasks_to_nodes`.
# Causes Parsl to hang. See Issue #3427
if result_dict["type"] == "result":
task_id = result_dict["task_id"]
assert task_id in self._map_tasks_to_nodes, "You are about to experience issue #3427"
nodes_to_reallocate = self._map_tasks_to_nodes[task_id]
self._return_nodes(nodes_to_reallocate)
self._schedule_backlog_tasks()
Expand Down
64 changes: 64 additions & 0 deletions parsl/tests/test_mpi_apps/test_mpiex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Tests for the wrapper class"""
from inspect import signature
from pathlib import Path

import pytest

import parsl
from .test_mpi_mode_enabled import get_env_vars
from parsl import HighThroughputExecutor, Config
from parsl.launchers import SimpleLauncher
from parsl.providers import LocalProvider
from parsl.executors.high_throughput.mpi_executor import MPIExecutor

cwd = Path(__file__).parent.absolute()
pbs_nodefile = cwd.joinpath("mocks", "pbs_nodefile")


def local_config():
return Config(
executors=[
MPIExecutor(
max_workers_per_block=1,
provider=LocalProvider(
worker_init=f"export PBS_NODEFILE={pbs_nodefile}",
launcher=SimpleLauncher()
)
)
]
)

benclifford marked this conversation as resolved.
Show resolved Hide resolved

@pytest.mark.local
def test_docstring():
"""Ensure the old kwargs are copied over into the new class"""
assert 'label' in MPIExecutor.__doc__
assert 'max_workers_per_block' in MPIExecutor.__doc__
assert 'available_accelerators' not in MPIExecutor.__doc__


@pytest.mark.local
def test_init():
"""Ensure all relevant kwargs are copied over from HTEx"""

new_kwargs = {'max_workers_per_block'}
excluded_kwargs = {'available_accelerators', 'enable_mpi_mode', 'cores_per_worker', 'max_workers_per_node',
'mem_per_worker', 'cpu_affinity', 'max_workers'}

# Get the kwargs from both HTEx and MPIEx
htex_kwargs = set(signature(HighThroughputExecutor.__init__).parameters)
mpix_kwargs = set(signature(MPIExecutor.__init__).parameters)

assert mpix_kwargs.difference(htex_kwargs) == new_kwargs
assert len(mpix_kwargs.intersection(excluded_kwargs)) == 0
assert mpix_kwargs.union(excluded_kwargs).difference(new_kwargs) == htex_kwargs


@pytest.mark.local
def test_get_env():
future = get_env_vars(parsl_resource_specification={
"num_nodes": 3,
"ranks_per_node": 5,
})
env_vars = future.result()
assert env_vars['PARSL_NUM_RANKS'] == '15'
Loading