From b2147140e415fa3bd0eb8118bfdc0e0fe53ccdad Mon Sep 17 00:00:00 2001 From: Logan Ward Date: Fri, 17 May 2024 08:04:11 -0400 Subject: [PATCH] Add an MPIExecutor (#3423) Add MPIExecutor -- a wrapper class over HTEx which fixes or removes options irrelevant when enable_mpi_mode=True. --- docs/reference.rst | 1 + docs/userguide/mpi_apps.rst | 372 +++++------------- parsl/executors/__init__.py | 2 + parsl/executors/high_throughput/executor.py | 157 ++++---- .../executors/high_throughput/mpi_executor.py | 85 ++++ .../mpi_resource_management.py | 3 + parsl/tests/test_mpi_apps/test_mpiex.py | 64 +++ 7 files changed, 323 insertions(+), 361 deletions(-) create mode 100644 parsl/executors/high_throughput/mpi_executor.py create mode 100644 parsl/tests/test_mpi_apps/test_mpiex.py diff --git a/docs/reference.rst b/docs/reference.rst index 04731bdcbd..4c5573a59c 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -75,6 +75,7 @@ Executors parsl.executors.status_handling.BlockProviderExecutor parsl.executors.ThreadPoolExecutor parsl.executors.HighThroughputExecutor + parsl.executors.MPIExecutor parsl.executors.WorkQueueExecutor parsl.executors.taskvine.TaskVineExecutor parsl.executors.FluxExecutor diff --git a/docs/userguide/mpi_apps.rst b/docs/userguide/mpi_apps.rst index c9664f2aef..a40c03e004 100644 --- a/docs/userguide/mpi_apps.rst +++ b/docs/userguide/mpi_apps.rst @@ -1,173 +1,72 @@ -MPI Apps -======== +MPI and Multi-node Apps +======================= -MPI applications run multiple copies of a program that complete a single task by -coordinating using messages passed within or across nodes. -Starting MPI application requires invoking a "launcher" code (e.g., ``mpiexec``) from one node -with options that define how the copies of a program should be distributed to others. -Parsl simplifies this by composing the "launcher" command from the resources specified at the time -each app is invoked. - -The broad strokes of a complete solution involves the following components: - -1. Configuring the :class:`~parsl.executors.high_throughput.executor.HighThroughputExecutor` with: - ``enable_mpi_mode=True`` -2. Specify an MPI Launcher from one of the supported launchers ("aprun", "srun", "mpiexec") for the - :class:`~parsl.executors.high_throughput.executor.HighThroughputExecutor` with: ``mpi_launcher="srun"`` -3. Specify the provider that matches your cluster, (eg. user ``SlurmProvider`` for Slurm clusters) -4. Set the non-mpi launcher to :class:`~parsl.launchers.SimpleLauncher` -5. Specify resources required by the application via ``resource_specification`` as shown below: - - -.. code-block:: python - - # Define HighThroughputExecutor(enable_mpi_mode=True, mpi_launcher="mpiexec", ...) - - @bash_app - def lammps_mpi_application(infile: File, parsl_resource_specification: Dict): - # PARSL_MPI_PREFIX will resolve to `mpiexec -n 4 -ppn 2 -hosts NODE001,NODE002` - return f"$PARSL_MPI_PREFIX lmp_mpi -in {infile.filepath}" - - # Resources in terms of nodes and how ranks are to be distributed are set on a per app - # basis via the resource_spec dictionary. - resource_spec = { - "num_nodes" = 2, - "ranks_per_node" = 2, - "num_ranks" = 4, - } - future = lammps_mpi_application(File('in.file'), resource_specification=resource_spec) - - -HTEX and MPI Tasks ------------------- - -The :class:`~parsl.executors.high_throughput.executor.HighThroughputExecutor` (HTEX) is the -default executor available through Parsl. -Parsl Apps which invoke MPI code require MPI specific configuration such that: +The :class:`~parsl.executors.MPIExecutor` supports running MPI applications or other computations which can +run on multiple compute nodes. -1. All workers are started on the lead-node (mom-node in case of Crays) -2. Resource requirements of Apps are propagated to workers who provision the required number of nodes from within the batch job. +Background +---------- +MPI applications run multiple copies of a program that complete a single task by +coordinating using messages passed within or across nodes. -Configuring the Provider -++++++++++++++++++++++++ - -Parsl must be configured to deploy workers on exactly one node per block. This part is -simple. Instead of defining a launcher which will place an executor on each node in the -block, simply use the :class:`~parsl.launchers.SimpleLauncher`. -The MPI Launcher that the application will use is to be specified via ``HighThroughputExecutor(mpi_launcher="LAUNCHER")`` +Starting MPI application requires invoking a "launcher" code (e.g., ``mpiexec``) +with options that define how the copies of a program should be distributed. -It is also necessary to specify the desired number of blocks for the executor. -Parsl cannot determine the number of blocks needed to run a set of MPI Tasks, -so they must bet set explicitly (see `Issue #1647 `_). -The easiest route is to set the ``max_blocks`` and ``min_blocks`` of the provider -to the desired number of blocks. +The launcher includes options that control how copies of the program are distributed +across the nodes (e.g., how many copies per node) and +how each copy is configured (e.g., which CPU cores it can use). -Configuring the Executor -++++++++++++++++++++++++ +The options for launchers vary between MPI implementations and compute clusters. -Here are the steps for configuring the executor: +Configuring ``MPIExecutor`` +--------------------------- -1. Set ``HighThroughputExecutor(enable_mpi_mode=True)`` -2. Set ``HighThroughputExecutor(mpi_launcher="LAUNCHER")`` to one from ("srun", "aprun", "mpiexec") -3. Set the ``max_workers`` to the number of MPI Apps you expect to run per scheduler job (block). -4. Set ``cores_per_worker=1e-6`` to prevent HTEx from reducing the number of workers if you request more workers than cores. +The :class:`~parsl.executors.MPIExecutor` is a wrapper over +:class:`~parsl.executors.high_throughput.executor.HighThroughputExecutor` +which eliminates options that are irrelevant for MPI applications. +Define a configuration for :class:`~parsl.executors.MPIExecutor` by -Example Configuration -~~~~~~~~~~~~~~~~~~~~~ +1. Setting ``max_workers_per_block`` to the maximum number of tasks to run per block of compute nodes. + This value is typically the number of nodes per block divided by the number of nodes per task. +2. Setting ``mpi_launcher`` to the launcher used for your application. +3. Specifying a provider that matches your cluster and use the :class:`~parsl.launchers.SimpleLauncher`, + which will ensure that no Parsl processes are placed on the compute nodes. -Here's an example configuration which runs MPI tasks on ALCF's Polaris Supercomputer +An example for ALCF's Polaris supercomputer that will run 3 MPI tasks of 2 nodes each at the same time: .. code-block:: python - import parsl - from typing import Dict - from parsl.config import Config - - # PBSPro is the right provider for Polaris: - from parsl.providers import PBSProProvider - # The high throughput executor is for scaling to HPC systems: - from parsl.executors import HighThroughputExecutor - # address_by_interface is needed for the HighThroughputExecutor: - from parsl.addresses import address_by_interface - # For checkpointing: - from parsl.utils import get_all_checkpoints - - # Adjust your user-specific options here: - # run_dir="/lus/grand/projects/yourproject/yourrundir/" - - user_opts = { - "worker_init": "module load conda; conda activate parsl_mpi_py310", - "scheduler_options":"#PBS -l filesystems=home:eagle:grand\n#PBS -l place=scatter" , - "account": SET_YOUR_ALCF_ALLOCATION_HERE, - "queue": "debug-scaling", - "walltime": "1:00:00", - "nodes_per_block": 8, - "available_accelerators": 4, # Each Polaris node has 4 GPUs, setting this ensures one worker per GPU - "cores_per_worker": 8, # this will set the number of cpu hardware threads per worker. - } - config = Config( - executors=[ - HighThroughputExecutor( - label="htex", - enable_mpi_mode=True, - mpi_launcher="mpiexec", - cores_per_worker=user_opts["cores_per_worker"], - address=address_by_interface("bond0"), - provider=PBSProProvider( - account=user_opts["account"], - queue=user_opts["queue"], - # PBS directives (header lines): for array jobs pass '-J' option - scheduler_options=user_opts["scheduler_options"], - # Command to be run before starting a worker, such as: - worker_init=user_opts["worker_init"], - # number of compute nodes allocated for each block - nodes_per_block=user_opts["nodes_per_block"], - init_blocks=1, - min_blocks=0, - max_blocks=1, # Can increase more to have more parallel jobs - walltime=user_opts["walltime"] - ), + executors=[ + MPIExecutor( + address=address_by_interface('bond0'), + max_workers_per_block=3, # Assuming 2 nodes per task + provider=PBSProProvider( + account="parsl", + worker_init=f"""module load miniconda; source activate /lus/eagle/projects/parsl/env""", + walltime="1:00:00", + queue="debug", + scheduler_options="#PBS -l filesystems=home:eagle:grand", + launcher=SimpleLauncher(), + select_options="ngpus=4", + nodes_per_block=6, + max_blocks=1, + cpus_per_node=64, ), - ], - - -Writing MPI-Compatible Apps -++++++++++++++++++++++++++++ - -In MPI mode, the :class:`~parsl.executors.high_throughput.executor.HighThroughputExecutor` can execute both Python or Bash Apps which invokes the MPI application. -However, it is important to note that Python Apps that directly use ``mpi4py`` is not supported. - -For multi-node MPI applications, especially when running multiple applications within a single batch job, -it is important to specify the resource requirements for the app so that the Parsl worker can provision -the appropriate resources before the application starts. For eg, your Parsl script might contain a molecular -dynamics application that requires 8 ranks over 1 node for certain inputs and 32 ranks over 4 nodes for some -depending on the size of the molecules being simulated. By specifying resources via ``resource_specification``, -parsl workers will provision the requested resources and then compose MPI launch command prefixes -(Eg: ``mpiexec -n -ppn -hosts ``). These launch command prefixes are -shared with the app via environment variables. - -.. code-block:: python - - @bash_app - def echo_hello(n: int, stderr='std.err', stdout='std.out', parsl_resource_specification: Dict): - return f'$PARSL_MPI_PREFIX hostname' + ), + ] + ) - # The following app will echo the hostname from several MPI ranks - # Alternatively, you could also use the resource_specification to compose a launch - # command using env vars set by Parsl from the resource_specification like this: - @bash_app - def echo_hostname(n: int, stderr='std.err', stdout='std.out', parsl_resource_specification: Dict): - total_ranks = os.environ("") - return f'aprun -N $PARSL_RANKS_PER_NODE -n {total_ranks} /bin/hostname' +Writing an MPI App +------------------ -All valid key-value pairs set in the resource_specification are exported to the application via env vars, -for eg. ``parsl_resource_specification = {'RANKS_PER_NODE': 4} `` will set the env var ``PARSL_RANKS_PER_NODE`` +:class:`~parsl.executors.high_throughput.MPIExecutor` can execute both Python or Bash Apps which invoke an MPI application. -However, the following options are **required** for MPI applications : +Create the app by first defining a function which includes ``parsl_resource_specification`` keyword argument. +The resource specification is a dictionary which defines the number of nodes and ranks used by the application: .. code-block:: python @@ -177,159 +76,64 @@ However, the following options are **required** for MPI applications : 'num_ranks': , # Number of ranks in total } - # The above are made available in the worker env vars: - # echo $PARSL_NUM_NODES, $PARSL_RANKS_PER_NODE, $PARSL_NUM_RANKS - -When the above are supplied, the following launch command prefixes are set: - -.. code-block:: +Then, replace the call to the MPI launcher with ``$PARSL_MPI_PREFIX``. +``$PARSL_MPI_PREFIX`` references an environmental variable which will be replaced with +the correct MPI launcher configured for the resource list provided when calling the function +and with options that map the task to nodes which Parsl knows to be available. - PARSL_MPIEXEC_PREFIX: mpiexec launch command which works for a large number of batch systems especially PBS systems - PARSL_SRUN_PREFIX: srun launch command for Slurm based clusters - PARSL_APRUN_PREFIX: aprun launch command prefix for some Cray machines - PARSL_MPI_PREFIX: Parsl sets the MPI prefix to match the mpi_launcher specified to `HighThroughputExecutor` - PARSL_MPI_NODELIST: List of assigned nodes separated by commas (Eg, NODE1,NODE2) - PARSL_WORKER_POOL_ID: Alphanumeric string identifier for the worker pool - PARSL_WORKER_BLOCK_ID: Batch job ID that the worker belongs to - - -Example Application: CosmicTagger -+++++++++++++++++++++++++++++++++ - -TODO: Blurb about what CosmicTagger does -CosmicTagger implements models and training utilities to train convolutional networks to -separate cosmic pixels, background pixels, and neutrino pixels in a neutrinos dataset. -There are several variations. A detailed description of the code can be found in: - -`Cosmic Background Removal with Deep Neural Networks in SBND `_ - -Cosmic Background Removal with Deep Neural Networks in SBND -This network is implemented in both PyTorch and TensorFlow. To select between the networks, you can use the --framework parameter. It accepts either tensorflow or torch. The model is available in a development version with sparse convolutions in the torch framework. - -This example is broken down into three components. First, configure the Executor for Polaris at -ALCF. The configuration will use the :class:`~parsl.providers.PBSProProvider` to connect to the batch scheduler. -With the goal of running MPI applications, we set the +The function can be a Bash app .. code-block:: python - import parsl - from typing import Dict - from parsl.config import Config - - # PBSPro is the right provider for Polaris: - from parsl.providers import PBSProProvider - # The high throughput executor is for scaling to HPC systems: - from parsl.executors import HighThroughputExecutor - # address_by_interface is needed for the HighThroughputExecutor: - from parsl.addresses import address_by_interface - - user_opts = { - # Make sure to setup a conda environment before using this config - "worker_init": "module load conda; conda activate parsl_mpi_py310", - "scheduler_options":"#PBS -l filesystems=home:eagle:grand\n#PBS -l place=scatter" , - "account": , - "queue": "debug-scaling", - "walltime": "1:00:00", - "nodes_per_block": 8, - "available_accelerators": 4, # Each Polaris node has 4 GPUs, setting this ensures one worker per GPU - "cores_per_worker": 8, # this will set the number of cpu hardware threads per worker. - } - - config = Config( - executors=[ - HighThroughputExecutor( - label="htex", - enable_mpi_mode=True, - mpi_launcher="mpiexec", - cores_per_worker=user_opts["cores_per_worker"], - address=address_by_interface("bond0"), - provider=PBSProProvider( - account=user_opts["account"], - queue=user_opts["queue"], - # PBS directives (header lines): for array jobs pass '-J' option - scheduler_options=user_opts["scheduler_options"], - # Command to be run before starting a worker, such as: - worker_init=user_opts["worker_init"], - # number of compute nodes allocated for each block - nodes_per_block=user_opts["nodes_per_block"], - init_blocks=1, - min_blocks=0, - max_blocks=1, # Can increase more to have more parallel jobs - walltime=user_opts["walltime"] - ), - ), - ], - ) - + @bash_app + def lammps_mpi_application(infile: File, parsl_resource_specification: Dict): + # PARSL_MPI_PREFIX will resolve to `mpiexec -n 4 -ppn 2 -hosts NODE001,NODE002` + return f"$PARSL_MPI_PREFIX lmp_mpi -in {infile.filepath}" -Next we define the CosmicTagger MPI application. TODO: Ask Khalid for help. +or a Python app: .. code-block:: python - @parsl.bash_app - def cosmic_tagger(workdir: str, - datatype: str = "float32", - batchsize: int = 8, - framework: str = "torch", - iterations: int = 500, - trial: int = 2, - stdout=parsl.AUTO_LOGNAME, - stderr=parsl.AUTO_LOGNAME, - parsl_resource_specification:Dict={}): - NRANKS = parsl_resource_specification['num_ranks'] - - return f""" - module purge - module use /soft/modulefiles/ - module load conda/2023-10-04 - conda activate - - echo "PARSL_MPI_PREFIX : $PARSL_MPI_PREFIX" - - $PARSL_MPI_PREFIX --cpu-bind numa \ - python {workdir}/bin/exec.py --config-name a21 \ - run.id=run_plrs_ParslDemo_g${NRANKS}_{datatype}_b{batchsize}_{framework}_i{iterations}_T{trial} \ - run.compute_mode=GPU \ - run.distributed=True \ - framework={framework} \ - run.minibatch_size={batchsize} \ - run.precision={datatype} \ - mode.optimizer.loss_balance_scheme=light \ - run.iterations={iterations} - """ - -In this example, we run a simple test that does an exploration over the ``batchsize`` parameter -while launching the application over 2-4 nodes. - -.. code-block:: python + @python_app + def lammps_mpi_application(infile: File, parsl_resource_specification: Dict): + from subprocess import run + with open('stdout.lmp', 'w') as fp, open('stderr.lmp', 'w') as fe: + proc = run(['$PARSL_MPI_PREFIX', '-i', 'in.lmp'], stdout=fp, stderr=fe) + return proc.returncode - def run_cosmic_tagger(): - futures = {} - for num_nodes in [2, 4]: - for batchsize in [2, 4, 8]: - parsl_res_spec = {"num_nodes": num_nodes, - "num_tasks": num_nodes * 4, - "ranks_per_node": 4} - future = cosmic_tagger(workdir="/home/yadunand/CosmicTagger", - datatype="float32", - batchsize=str(batchsize), - parsl_resource_specification=parsl_res_spec) +Run either App by calling with its arguments and a resource specification which defines how to execute it +.. code-block:: python - print(f"Stdout : {future.stdout}") - print(f"Stderr : {future.stderr}") - futures[(num_nodes, batchsize)] = future + # Resources in terms of nodes and how ranks are to be distributed are set on a per app + # basis via the resource_spec dictionary. + resource_spec = { + "num_nodes": 2, + "ranks_per_node": 2, + "num_ranks": 4, + } + future = lammps_mpi_application(File('in.file'), parsl_resource_specification=resource_spec) +Advanced: More Environment Variables +++++++++++++++++++++++++++++++++++++ - for key in futures: - print(f"Got result for {key}: {futures[key].result()}") +Parsl Apps which run using :class:`~parsl.executors.high_throughput.MPIExecutor` +can make their own MPI invocation using other environment variables. +These other variables include versions of the launch command for different launchers - run_cosmic_tagger() +- ``PARSL_MPIEXEC_PREFIX``: mpiexec launch command which works for a large number of batch systems especially PBS systems +- ``PARSL_SRUN_PREFIX``: srun launch command for Slurm based clusters +- ``PARSL_APRUN_PREFIX``: aprun launch command prefix for some Cray machines +And the information used by Parsl when assembling the launcher commands: +- ``PARSL_NUM_RANKS``: Total number of ranks to use for the MPI application +- ``PARSL_NUM_NODES``: Number of nodes to use for the calculation +- ``PARSL_MPI_NODELIST``: List of assigned nodes separated by commas (Eg, NODE1,NODE2) +- ``PARSL_RANKS_PER_NODE``: Number of ranks per node Limitations +++++++++++ diff --git a/parsl/executors/__init__.py b/parsl/executors/__init__.py index 8661342ab8..707c336caf 100644 --- a/parsl/executors/__init__.py +++ b/parsl/executors/__init__.py @@ -1,9 +1,11 @@ from parsl.executors.threads import ThreadPoolExecutor from parsl.executors.workqueue.executor import WorkQueueExecutor from parsl.executors.high_throughput.executor import HighThroughputExecutor +from parsl.executors.high_throughput.mpi_executor import MPIExecutor from parsl.executors.flux.executor import FluxExecutor __all__ = ['ThreadPoolExecutor', 'HighThroughputExecutor', + 'MPIExecutor', 'WorkQueueExecutor', 'FluxExecutor'] diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 284a974828..73c2f36c7c 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -62,47 +62,7 @@ "--mpi-launcher={mpi_launcher} " "--available-accelerators {accelerators}") - -class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageInformation): - """Executor designed for cluster-scale - - The HighThroughputExecutor system has the following components: - 1. The HighThroughputExecutor instance which is run as part of the Parsl script. - 2. The Interchange which acts as a load-balancing proxy between workers and Parsl - 3. The multiprocessing based worker pool which coordinates task execution over several - cores on a node. - 4. ZeroMQ pipes connect the HighThroughputExecutor, Interchange and the process_worker_pool - - Here is a diagram - - .. code:: python - - - | Data | Executor | Interchange | External Process(es) - | Flow | | | - Task | Kernel | | | - +----->|-------->|------------>|->outgoing_q---|-> process_worker_pool - | | | | batching | | | - Parsl<---Fut-| | | load-balancing| result exception - ^ | | | watchdogs | | | - | | | Q_mngmnt | | V V - | | | Thread<--|-incoming_q<---|--- +---------+ - | | | | | | - | | | | | | - +----update_fut-----+ - - - Each of the workers in each process_worker_pool has access to its local rank through - an environmental variable, ``PARSL_WORKER_RANK``. The local rank is unique for each process - and is an integer in the range from 0 to the number of workers per in the pool minus 1. - The workers also have access to the ID of the worker pool as ``PARSL_WORKER_POOL_ID`` - and the size of the worker pool as ``PARSL_WORKER_COUNT``. - - - Parameters - ---------- - - provider : :class:`~parsl.providers.base.ExecutionProvider` +GENERAL_HTEX_PARAM_DOCS = """provider : :class:`~parsl.providers.base.ExecutionProvider` Provider to access computation resources. Can be one of :class:`~parsl.providers.aws.aws.EC2Provider`, :class:`~parsl.providers.cobalt.cobalt.Cobalt`, :class:`~parsl.providers.condor.condor.Condor`, @@ -148,39 +108,6 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageIn worker_debug : Bool Enables worker debug logging. - cores_per_worker : float - cores to be assigned to each worker. Oversubscription is possible - by setting cores_per_worker < 1.0. Default=1 - - mem_per_worker : float - GB of memory required per worker. If this option is specified, the node manager - will check the available memory at startup and limit the number of workers such that - the there's sufficient memory for each worker. Default: None - - max_workers : int - Deprecated. Please use max_workers_per_node instead. - - max_workers_per_node : int - Caps the number of workers launched per node. Default: None - - cpu_affinity: string - Whether or how each worker process sets thread affinity. Options include "none" to forgo - any CPU affinity configuration, "block" to assign adjacent cores to workers - (ex: assign 0-1 to worker 0, 2-3 to worker 1), and - "alternating" to assign cores to workers in round-robin - (ex: assign 0,2 to worker 0, 1,3 to worker 1). - The "block-reverse" option assigns adjacent cores to workers, but assigns - the CPUs with large indices to low index workers (ex: assign 2-3 to worker 1, 0,1 to worker 2) - - available_accelerators: int | list - Accelerators available for workers to use. Each worker will be pinned to exactly one of the provided - accelerators, and no more workers will be launched than the number of accelerators. - - Either provide the list of accelerator names or the number available. If a number is provided, - Parsl will create names as integers starting with 0. - - default: empty list - prefetch_capacity : int Number of tasks that could be prefetched over available worker capacity. When there are a few tasks (<100) or when tasks are long running, this option should @@ -214,6 +141,85 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageIn worker_logdir_root : string In case of a remote file system, specify the path to where logs will be kept. + encrypted : bool + Flag to enable/disable encryption (CurveZMQ). Default is False. +""" # Documentation for params used by both HTEx and MPIEx + + +class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageInformation): + __doc__ = f"""Executor designed for cluster-scale + + The HighThroughputExecutor system has the following components: + 1. The HighThroughputExecutor instance which is run as part of the Parsl script. + 2. The Interchange which acts as a load-balancing proxy between workers and Parsl + 3. The multiprocessing based worker pool which coordinates task execution over several + cores on a node. + 4. ZeroMQ pipes connect the HighThroughputExecutor, Interchange and the process_worker_pool + + Here is a diagram + + .. code:: python + + + | Data | Executor | Interchange | External Process(es) + | Flow | | | + Task | Kernel | | | + +----->|-------->|------------>|->outgoing_q---|-> process_worker_pool + | | | | batching | | | + Parsl<---Fut-| | | load-balancing| result exception + ^ | | | watchdogs | | | + | | | Q_mngmnt | | V V + | | | Thread<--|-incoming_q<---|--- +---------+ + | | | | | | + | | | | | | + +----update_fut-----+ + + + Each of the workers in each process_worker_pool has access to its local rank through + an environmental variable, ``PARSL_WORKER_RANK``. The local rank is unique for each process + and is an integer in the range from 0 to the number of workers per in the pool minus 1. + The workers also have access to the ID of the worker pool as ``PARSL_WORKER_POOL_ID`` + and the size of the worker pool as ``PARSL_WORKER_COUNT``. + + + Parameters + ---------- + + {GENERAL_HTEX_PARAM_DOCS} + + cores_per_worker : float + cores to be assigned to each worker. Oversubscription is possible + by setting cores_per_worker < 1.0. Default=1 + + mem_per_worker : float + GB of memory required per worker. If this option is specified, the node manager + will check the available memory at startup and limit the number of workers such that + the there's sufficient memory for each worker. Default: None + + max_workers : int + Deprecated. Please use max_workers_per_node instead. + + max_workers_per_node : int + Caps the number of workers launched per node. Default: None + + cpu_affinity: string + Whether or how each worker process sets thread affinity. Options include "none" to forgo + any CPU affinity configuration, "block" to assign adjacent cores to workers + (ex: assign 0-1 to worker 0, 2-3 to worker 1), and + "alternating" to assign cores to workers in round-robin + (ex: assign 0,2 to worker 0, 1,3 to worker 1). + The "block-reverse" option assigns adjacent cores to workers, but assigns + the CPUs with large indices to low index workers (ex: assign 2-3 to worker 1, 0,1 to worker 2) + + available_accelerators: int | list + Accelerators available for workers to use. Each worker will be pinned to exactly one of the provided + accelerators, and no more workers will be launched than the number of accelerators. + + Either provide the list of accelerator names or the number available. If a number is provided, + Parsl will create names as integers starting with 0. + + default: empty list + enable_mpi_mode: bool If enabled, MPI launch prefixes will be composed for the batch scheduler based on the nodes available in each batch job and the resource_specification dict passed @@ -224,9 +230,6 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageIn This field is only used if enable_mpi_mode is set. Select one from the list of supported MPI launchers = ("srun", "aprun", "mpiexec"). default: "mpiexec" - - encrypted : bool - Flag to enable/disable encryption (CurveZMQ). Default is False. """ @typeguard.typechecked diff --git a/parsl/executors/high_throughput/mpi_executor.py b/parsl/executors/high_throughput/mpi_executor.py new file mode 100644 index 0000000000..4b2afee619 --- /dev/null +++ b/parsl/executors/high_throughput/mpi_executor.py @@ -0,0 +1,85 @@ +"""A simplified interface for HTEx when running in MPI mode""" +from typing import Optional, Tuple, List, Union, Callable, Dict + +import typeguard + +from parsl.data_provider.staging import Staging +from parsl.executors.high_throughput.executor import HighThroughputExecutor, GENERAL_HTEX_PARAM_DOCS +from parsl.executors.status_handling import BlockProviderExecutor +from parsl.jobs.states import JobStatus +from parsl.providers import LocalProvider +from parsl.providers.base import ExecutionProvider + + +class MPIExecutor(HighThroughputExecutor): + __doc__ = f"""A version of :class:`~parsl.HighThroughputExecutor` tuned for executing multi-node (e.g., MPI) tasks. + + The Provider _must_ use the :class:`~parsl.launchers.SimpleLauncher`, + which places a single pool of workers on the first node of a block. + Each worker can then make system calls which use an MPI launcher (e.g., ``mpirun``, ``srun``) + to spawn multi-node tasks. + + Specify the maximum number of multi-node tasks to run at once using ``max_workers_per_block``. + The maximum number should be smaller than the ``nodes_per_block`` in the Provider. + + Parameters + ---------- + max_workers_per_block: int + Maximum number of MPI applications to run at once per block + + {GENERAL_HTEX_PARAM_DOCS} + """ + + @typeguard.typechecked + def __init__(self, + label: str = 'MPIExecutor', + provider: ExecutionProvider = LocalProvider(), + launch_cmd: Optional[str] = None, + address: Optional[str] = None, + worker_ports: Optional[Tuple[int, int]] = None, + worker_port_range: Optional[Tuple[int, int]] = (54000, 55000), + interchange_port_range: Optional[Tuple[int, int]] = (55000, 56000), + storage_access: Optional[List[Staging]] = None, + working_dir: Optional[str] = None, + worker_debug: bool = False, + max_workers_per_block: int = 1, + prefetch_capacity: int = 0, + heartbeat_threshold: int = 120, + heartbeat_period: int = 30, + drain_period: Optional[int] = None, + poll_period: int = 10, + address_probe_timeout: Optional[int] = None, + worker_logdir_root: Optional[str] = None, + mpi_launcher: str = "mpiexec", + block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True, + encrypted: bool = False): + super().__init__( + # Hard-coded settings + cores_per_worker=1e-9, # Ensures there will be at least an absurd number of workers + enable_mpi_mode=True, + max_workers_per_node=max_workers_per_block, + + # Everything else + label=label, + provider=provider, + launch_cmd=launch_cmd, + address=address, + worker_ports=worker_ports, + worker_port_range=worker_port_range, + interchange_port_range=interchange_port_range, + storage_access=storage_access, + working_dir=working_dir, + worker_debug=worker_debug, + prefetch_capacity=prefetch_capacity, + heartbeat_threshold=heartbeat_threshold, + heartbeat_period=heartbeat_period, + drain_period=drain_period, + poll_period=poll_period, + address_probe_timeout=address_probe_timeout, + worker_logdir_root=worker_logdir_root, + mpi_launcher=mpi_launcher, + block_error_handler=block_error_handler, + encrypted=encrypted + ) + + self.max_workers_per_block = max_workers_per_block diff --git a/parsl/executors/high_throughput/mpi_resource_management.py b/parsl/executors/high_throughput/mpi_resource_management.py index 4434749827..adb5ee252e 100644 --- a/parsl/executors/high_throughput/mpi_resource_management.py +++ b/parsl/executors/high_throughput/mpi_resource_management.py @@ -208,8 +208,11 @@ def get_result(self, block: bool, timeout: float): """Return result and relinquish provisioned nodes""" result_pkl = self.pending_result_q.get(block, timeout=timeout) result_dict = pickle.loads(result_pkl) + # TODO (wardlt): If the task did not request nodes, it won't be in `self._map_tasks_to_nodes`. + # Causes Parsl to hang. See Issue #3427 if result_dict["type"] == "result": task_id = result_dict["task_id"] + assert task_id in self._map_tasks_to_nodes, "You are about to experience issue #3427" nodes_to_reallocate = self._map_tasks_to_nodes[task_id] self._return_nodes(nodes_to_reallocate) self._schedule_backlog_tasks() diff --git a/parsl/tests/test_mpi_apps/test_mpiex.py b/parsl/tests/test_mpi_apps/test_mpiex.py new file mode 100644 index 0000000000..b9f1e41072 --- /dev/null +++ b/parsl/tests/test_mpi_apps/test_mpiex.py @@ -0,0 +1,64 @@ +"""Tests for the wrapper class""" +from inspect import signature +from pathlib import Path + +import pytest + +import parsl +from .test_mpi_mode_enabled import get_env_vars +from parsl import HighThroughputExecutor, Config +from parsl.launchers import SimpleLauncher +from parsl.providers import LocalProvider +from parsl.executors.high_throughput.mpi_executor import MPIExecutor + +cwd = Path(__file__).parent.absolute() +pbs_nodefile = cwd.joinpath("mocks", "pbs_nodefile") + + +def local_config(): + return Config( + executors=[ + MPIExecutor( + max_workers_per_block=1, + provider=LocalProvider( + worker_init=f"export PBS_NODEFILE={pbs_nodefile}", + launcher=SimpleLauncher() + ) + ) + ] + ) + + +@pytest.mark.local +def test_docstring(): + """Ensure the old kwargs are copied over into the new class""" + assert 'label' in MPIExecutor.__doc__ + assert 'max_workers_per_block' in MPIExecutor.__doc__ + assert 'available_accelerators' not in MPIExecutor.__doc__ + + +@pytest.mark.local +def test_init(): + """Ensure all relevant kwargs are copied over from HTEx""" + + new_kwargs = {'max_workers_per_block'} + excluded_kwargs = {'available_accelerators', 'enable_mpi_mode', 'cores_per_worker', 'max_workers_per_node', + 'mem_per_worker', 'cpu_affinity', 'max_workers'} + + # Get the kwargs from both HTEx and MPIEx + htex_kwargs = set(signature(HighThroughputExecutor.__init__).parameters) + mpix_kwargs = set(signature(MPIExecutor.__init__).parameters) + + assert mpix_kwargs.difference(htex_kwargs) == new_kwargs + assert len(mpix_kwargs.intersection(excluded_kwargs)) == 0 + assert mpix_kwargs.union(excluded_kwargs).difference(new_kwargs) == htex_kwargs + + +@pytest.mark.local +def test_get_env(): + future = get_env_vars(parsl_resource_specification={ + "num_nodes": 3, + "ranks_per_node": 5, + }) + env_vars = future.result() + assert env_vars['PARSL_NUM_RANKS'] == '15'