Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement resource hints for the SlurmProvider #1217

Merged
merged 6 commits into from
Aug 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions parsl/dataflow/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,8 @@ def _strategy_simple(self, tasks, *args, kind=None, **kwargs):
# FIXME probably more of this logic should be moved to the provider
min_blocks = executor.provider.min_blocks
max_blocks = executor.provider.max_blocks
if isinstance(executor, IPyParallelExecutor):
if isinstance(executor, IPyParallelExecutor) or isinstance(executor, HighThroughputExecutor):
tasks_per_node = executor.workers_per_node
elif isinstance(executor, HighThroughputExecutor):
# This is probably wrong calculation, we need this to come from the executor
# since we can't know slots ahead of time.
tasks_per_node = 1
elif isinstance(executor, ExtremeScaleExecutor):
tasks_per_node = executor.ranks_per_node

Expand Down
16 changes: 16 additions & 0 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pickle
from multiprocessing import Process, Queue
from typing import Dict, List, Optional, Tuple, Union
import math

from ipyparallel.serialize import pack_apply_message # ,unpack_apply_message
from ipyparallel.serialize import deserialize_object # ,serialize_object
Expand Down Expand Up @@ -184,6 +185,21 @@ def __init__(self,
self.max_workers = max_workers
self.prefetch_capacity = prefetch_capacity

mem_slots = max_workers
cpu_slots = max_workers
if hasattr(self.provider, 'mem_per_node') and \
self.provider.mem_per_node is not None and \
mem_per_worker is not None and \
mem_per_worker > 0:
mem_slots = math.floor(self.provider.mem_per_node / mem_per_worker)
if hasattr(self.provider, 'cores_per_node') and \
self.provider.cores_per_node is not None:
cpu_slots = math.floor(self.provider.cores_per_node / cores_per_worker)

self.workers_per_node = min(max_workers, mem_slots, cpu_slots)
if self.workers_per_node == float('inf'):
self.workers_per_node = 1 # our best guess-- we do not have any provider hints

self._task_counter = 0
self.address = address
self.hub_address = None # set to the correct hub address in dfk
Expand Down
38 changes: 38 additions & 0 deletions parsl/providers/provider_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class ExecutionProvider(metaclass=ABCMeta):
|
+-------------------
"""
_cores_per_node = None
_mem_per_node = None

@abstractmethod
def submit(self, command, tasks_per_node, job_name="parsl.auto"):
Expand Down Expand Up @@ -96,3 +98,39 @@ def scaling_enabled(self):
def label(self):
''' Provides the label for this provider '''
pass

@property
def mem_per_node(self):
"""Real memory to provision per node in GB.

Providers which set this property should ask for mem_per_node of memory
when provisioning resources, and set the corresponding environment
variable PARSL_MEMORY_GB before executing submitted commands.

If this property is set, executors may use it to calculate how many tasks can
run concurrently per node. This information is used by dataflow.Strategy to estimate
the resources required to run all outstanding tasks.
"""
return self._mem_per_node

@mem_per_node.setter
def mem_per_node(self, value):
self._mem_per_node = value

@property
def cores_per_node(self):
"""Number of cores to provision per node.

Providers which set this property should ask for cores_per_node cores
when provisioning resources, and set the corresponding environment
variable PARSL_CORES before executing submitted commands.

If this property is set, executors may use it to calculate how many tasks can
run concurrently per node. This information is used by dataflow.Strategy to estimate
the resources required to run all outstanding tasks.
"""
return self._cores_per_node

@cores_per_node.setter
def cores_per_node(self, value):
self._cores_per_node = value
32 changes: 26 additions & 6 deletions parsl/providers/slurm/slurm.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import math
import time
import logging

Expand Down Expand Up @@ -43,6 +44,12 @@ class SlurmProvider(ClusterProvider, RepresentationMixin):
:class:`~parsl.channels.SSHInteractiveLoginChannel`.
nodes_per_block : int
Nodes to provision per block.
cores_per_node : int
Specify the number of cores to provision per node. If set to None, executors
will assume all cores on the node are available for computation. Default is None.
mem_per_node : float
Specify the real memory to provision per node in GB. If set to None, no
explicit request to the scheduler will be made. Default is None.
min_blocks : int
Minimum number of blocks to maintain.
max_blocks : int
Expand Down Expand Up @@ -71,6 +78,8 @@ def __init__(self,
partition,
channel=LocalChannel(),
nodes_per_block=1,
cores_per_node=None,
mem_per_node=None,
init_blocks=1,
min_blocks=0,
max_blocks=10,
Expand All @@ -95,13 +104,14 @@ def __init__(self,
launcher=launcher)

self.partition = partition
self.cores_per_node = cores_per_node
self.mem_per_node = mem_per_node
self.exclusive = exclusive
self.move_files = move_files
self.scheduler_options = scheduler_options + '\n'
if exclusive:
self.scheduler_options = "#SBATCH --exclusive\n" + scheduler_options
else:
self.scheduler_options = scheduler_options
self.worker_init = worker_init
self.scheduler_options += "#SBATCH --exclusive\n"
self.worker_init = worker_init + '\n'

def _status(self):
''' Internal: Do not call. Returns the status list for a list of job_ids
Expand Down Expand Up @@ -157,6 +167,16 @@ def submit(self, command, tasks_per_node, job_name="parsl.auto"):
logger.warn("Slurm provider '{}' is at capacity (no more blocks will be added)".format(self.label))
return None

scheduler_options = self.scheduler_options
worker_init = self.worker_init
if self.mem_per_node is not None:
scheduler_options += '#SBATCH --mem={}g\n'.format(self.mem_per_node)
worker_init += 'export PARSL_MEMORY_GB={}\n'.format(self.mem_per_node)
if self.cores_per_node is not None:
cpus_per_task = math.floor(self.cores_per_node / tasks_per_node)
scheduler_options += '#SBATCH --cpus-per-task={}'.format(cpus_per_task)
worker_init += 'export PARSL_CORES={}\n'.format(cpus_per_task)

job_name = "{0}.{1}".format(job_name, time.time())

script_path = "{0}/{1}.submit".format(self.script_dir, job_name)
Expand All @@ -169,8 +189,8 @@ def submit(self, command, tasks_per_node, job_name="parsl.auto"):
job_config["nodes"] = self.nodes_per_block
job_config["tasks_per_node"] = tasks_per_node
job_config["walltime"] = wtime_to_minutes(self.walltime)
job_config["scheduler_options"] = self.scheduler_options
job_config["worker_init"] = self.worker_init
job_config["scheduler_options"] = scheduler_options
job_config["worker_init"] = worker_init
job_config["partition"] = self.partition
job_config["user_script"] = command

Expand Down