Skip to content

Commit

Permalink
Implement resource hints for the SlurmProvider (#1217)
Browse files Browse the repository at this point in the history
* Implement resource hints for the SlurmProvider

Partially addresses #942.

This commit adds the `cores_per_node` and `mem_per_node` keyword args to
the SlurmProvider. These default to None, and behavior is not modified
in the default case. Setting either has three effects. First, it
modifies the Slurm submit script to request the appropriate cores and/or
memory.  Second, it sets the environment variables `PARSL_MEMORY_GB` and
`PARSL_CORES` on the node. Finally, the `workers_per_node` attribute is
added to the `HighThroughputExecutor` which will be calculated according
to the resource hints, if they are available. This is read by the
strategy piece, enabling a more accurate calculation for scaling
resources up and down. An example configuration, tested on Midway, is
provided below. This configuration requests 4 1-core workers, each with
3 GB of memory.

```
from parsl.config import Config
from parsl.providers import SlurmProvider
from parsl.addresses import address_by_hostname
from parsl.executors import HighThroughputExecutor

config = Config(
    executors=[
        HighThroughputExecutor(
            cores_per_worker=1,
            mem_per_worker=3,
            address=address_by_hostname(),
            provider=SlurmProvider(
                'broadwl',
                nodes_per_block=1,
                init_blocks=1,
                min_blocks=1,
                max_blocks=1,
                mem_per_node=12,
                cores_per_node=4,
                exclusive=False
            ),
        )
    ],
)
```

* Add default mem_per_node and cores_per_node

* Switch to properties in base class

Also: clarify docstrings.

* Fix flake8

Also: fix incomplete conversion to property.

* Fix setter definition
  • Loading branch information
annawoodard authored Aug 27, 2019
1 parent 4b9afce commit 636ea59
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 11 deletions.
6 changes: 1 addition & 5 deletions parsl/dataflow/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,8 @@ def _strategy_simple(self, tasks, *args, kind=None, **kwargs):
# FIXME probably more of this logic should be moved to the provider
min_blocks = executor.provider.min_blocks
max_blocks = executor.provider.max_blocks
if isinstance(executor, IPyParallelExecutor):
if isinstance(executor, IPyParallelExecutor) or isinstance(executor, HighThroughputExecutor):
tasks_per_node = executor.workers_per_node
elif isinstance(executor, HighThroughputExecutor):
# This is probably wrong calculation, we need this to come from the executor
# since we can't know slots ahead of time.
tasks_per_node = 1
elif isinstance(executor, ExtremeScaleExecutor):
tasks_per_node = executor.ranks_per_node

Expand Down
16 changes: 16 additions & 0 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pickle
from multiprocessing import Process, Queue
from typing import Dict, List, Optional, Tuple, Union
import math

from ipyparallel.serialize import pack_apply_message # ,unpack_apply_message
from ipyparallel.serialize import deserialize_object # ,serialize_object
Expand Down Expand Up @@ -184,6 +185,21 @@ def __init__(self,
self.max_workers = max_workers
self.prefetch_capacity = prefetch_capacity

mem_slots = max_workers
cpu_slots = max_workers
if hasattr(self.provider, 'mem_per_node') and \
self.provider.mem_per_node is not None and \
mem_per_worker is not None and \
mem_per_worker > 0:
mem_slots = math.floor(self.provider.mem_per_node / mem_per_worker)
if hasattr(self.provider, 'cores_per_node') and \
self.provider.cores_per_node is not None:
cpu_slots = math.floor(self.provider.cores_per_node / cores_per_worker)

self.workers_per_node = min(max_workers, mem_slots, cpu_slots)
if self.workers_per_node == float('inf'):
self.workers_per_node = 1 # our best guess-- we do not have any provider hints

self._task_counter = 0
self.address = address
self.hub_address = None # set to the correct hub address in dfk
Expand Down
38 changes: 38 additions & 0 deletions parsl/providers/provider_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class ExecutionProvider(metaclass=ABCMeta):
|
+-------------------
"""
_cores_per_node = None
_mem_per_node = None

@abstractmethod
def submit(self, command, tasks_per_node, job_name="parsl.auto"):
Expand Down Expand Up @@ -96,3 +98,39 @@ def scaling_enabled(self):
def label(self):
''' Provides the label for this provider '''
pass

@property
def mem_per_node(self):
"""Real memory to provision per node in GB.
Providers which set this property should ask for mem_per_node of memory
when provisioning resources, and set the corresponding environment
variable PARSL_MEMORY_GB before executing submitted commands.
If this property is set, executors may use it to calculate how many tasks can
run concurrently per node. This information is used by dataflow.Strategy to estimate
the resources required to run all outstanding tasks.
"""
return self._mem_per_node

@mem_per_node.setter
def mem_per_node(self, value):
self._mem_per_node = value

@property
def cores_per_node(self):
"""Number of cores to provision per node.
Providers which set this property should ask for cores_per_node cores
when provisioning resources, and set the corresponding environment
variable PARSL_CORES before executing submitted commands.
If this property is set, executors may use it to calculate how many tasks can
run concurrently per node. This information is used by dataflow.Strategy to estimate
the resources required to run all outstanding tasks.
"""
return self._cores_per_node

@cores_per_node.setter
def cores_per_node(self, value):
self._cores_per_node = value
32 changes: 26 additions & 6 deletions parsl/providers/slurm/slurm.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import math
import time
import logging

Expand Down Expand Up @@ -43,6 +44,12 @@ class SlurmProvider(ClusterProvider, RepresentationMixin):
:class:`~parsl.channels.SSHInteractiveLoginChannel`.
nodes_per_block : int
Nodes to provision per block.
cores_per_node : int
Specify the number of cores to provision per node. If set to None, executors
will assume all cores on the node are available for computation. Default is None.
mem_per_node : float
Specify the real memory to provision per node in GB. If set to None, no
explicit request to the scheduler will be made. Default is None.
min_blocks : int
Minimum number of blocks to maintain.
max_blocks : int
Expand Down Expand Up @@ -71,6 +78,8 @@ def __init__(self,
partition,
channel=LocalChannel(),
nodes_per_block=1,
cores_per_node=None,
mem_per_node=None,
init_blocks=1,
min_blocks=0,
max_blocks=10,
Expand All @@ -95,13 +104,14 @@ def __init__(self,
launcher=launcher)

self.partition = partition
self.cores_per_node = cores_per_node
self.mem_per_node = mem_per_node
self.exclusive = exclusive
self.move_files = move_files
self.scheduler_options = scheduler_options + '\n'
if exclusive:
self.scheduler_options = "#SBATCH --exclusive\n" + scheduler_options
else:
self.scheduler_options = scheduler_options
self.worker_init = worker_init
self.scheduler_options += "#SBATCH --exclusive\n"
self.worker_init = worker_init + '\n'

def _status(self):
''' Internal: Do not call. Returns the status list for a list of job_ids
Expand Down Expand Up @@ -157,6 +167,16 @@ def submit(self, command, tasks_per_node, job_name="parsl.auto"):
logger.warn("Slurm provider '{}' is at capacity (no more blocks will be added)".format(self.label))
return None

scheduler_options = self.scheduler_options
worker_init = self.worker_init
if self.mem_per_node is not None:
scheduler_options += '#SBATCH --mem={}g\n'.format(self.mem_per_node)
worker_init += 'export PARSL_MEMORY_GB={}\n'.format(self.mem_per_node)
if self.cores_per_node is not None:
cpus_per_task = math.floor(self.cores_per_node / tasks_per_node)
scheduler_options += '#SBATCH --cpus-per-task={}'.format(cpus_per_task)
worker_init += 'export PARSL_CORES={}\n'.format(cpus_per_task)

job_name = "{0}.{1}".format(job_name, time.time())

script_path = "{0}/{1}.submit".format(self.script_dir, job_name)
Expand All @@ -169,8 +189,8 @@ def submit(self, command, tasks_per_node, job_name="parsl.auto"):
job_config["nodes"] = self.nodes_per_block
job_config["tasks_per_node"] = tasks_per_node
job_config["walltime"] = wtime_to_minutes(self.walltime)
job_config["scheduler_options"] = self.scheduler_options
job_config["worker_init"] = self.worker_init
job_config["scheduler_options"] = scheduler_options
job_config["worker_init"] = worker_init
job_config["partition"] = self.partition
job_config["user_script"] = command

Expand Down

0 comments on commit 636ea59

Please sign in to comment.