Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstract more block handling from HighThroughputExecutor and share with WorkQueue #2071

Merged
merged 28 commits into from
Aug 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
800b490
tasks_per_node case in general strategy is a mess:
benclifford Feb 18, 2021
d84591c
Merge branch 'master' into benc/strategy-tasks_per_node-general
benclifford May 1, 2021
7130e29
Existing strategies require an 'outstanding' property. This was not e…
benclifford Feb 18, 2021
7fccd2c
introduce an abstract class above htex, below status handling executor
benclifford Mar 11, 2021
40a8c75
remove unused old scaleout for wq
benclifford May 10, 2021
3011963
Merge branch 't/strategy-tasks_per_node-general' into benc-block-prov…
benclifford May 10, 2021
a0602d0
Move BlockProviderExecutor in status handling source, in preparation …
benclifford May 10, 2021
ce0f086
Merge StatusHandling and BlockProvider executor base classes
benclifford May 10, 2021
0bda48f
Merge remote-tracking branch 'origin/master' into benc-block-provider…
benclifford May 11, 2021
c32e3e6
Merge branch 'master' into benc/strategy-tasks_per_node-general
benclifford May 14, 2021
e8c835d
Merge branch 't/strategy-tasks_per_node-general' into benc-block-prov…
benclifford May 14, 2021
7c46c16
Merge remote-tracking branch 'origin/master' into t/strategy-tasks_pe…
benclifford May 17, 2021
ab1a8e0
Merge branch 't/strategy-tasks_per_node-general' into benc-block-prov…
benclifford May 17, 2021
99e163d
fix mis-resolved merge typo
benclifford May 17, 2021
876b862
Rename StatusHandlingExecutor in two human readable strings
benclifford May 17, 2021
64c9ef1
regenerate doc stubs
benclifford May 17, 2021
2a01ba7
Add a docstring for BlockProviderExecutor
benclifford May 19, 2021
3f3b0a5
More docs and type annotations
benclifford May 19, 2021
66e4b89
fix punctuation typos
benclifford May 19, 2021
de9cc56
Change a TODO into a requirement of the superclass
benclifford May 19, 2021
05d6db2
Tidy TODO in docstring
benclifford May 19, 2021
10a6817
remove stubs
benclifford Jun 30, 2021
eb00704
Merge remote-tracking branch 'origin/master' into benc-block-provider…
benclifford Jun 30, 2021
662fae3
Merge branch 'master' into benc-block-provider-executor-abstraction
benclifford Jul 6, 2021
98d3c74
Merge branch 'master' into benc-block-provider-executor-abstraction
benclifford Jul 19, 2021
dca99b9
Merge branch 'master' into benc-block-provider-executor-abstraction
benclifford Aug 10, 2021
0f07554
Merge remote-tracking branch 'origin/master' into benc-block-provider…
benclifford Aug 25, 2021
47d440b
Remove commented out dead code
benclifford Aug 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Executors
:nosignatures:

parsl.executors.base.ParslExecutor
parsl.executors.status_handling.BlockProviderExecutor
parsl.executors.ThreadPoolExecutor
parsl.executors.HighThroughputExecutor
parsl.executors.WorkQueueExecutor
Expand Down
2 changes: 1 addition & 1 deletion parsl/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def error_management_enabled(self) -> bool:

Some of the scaffolding needed for implementing error management inside executors,
including implementations for the status handling methods above, is available in
:class:parsl.executors.status_handling.StatusHandlingExecutor, which, interested executors,
:class:parsl.executors.status_handling.BlockProviderExecutor, which interested executors
should inherit from. Noop versions of methods that are related to status handling and
running parsl tasks through workers are implemented by
:class:parsl.executors.status_handling.NoStatusHandlingExecutor.
Expand Down
61 changes: 15 additions & 46 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pickle
from multiprocessing import Queue
from typing import Dict # noqa F401 (used in type annotation)
from typing import List, Optional, Tuple, Union, Any
from typing import List, Optional, Tuple, Union
import math

from parsl.serialize import pack_apply_message, deserialize
Expand All @@ -20,7 +20,7 @@
UnsupportedFeatureError
)

from parsl.executors.status_handling import StatusHandlingExecutor
from parsl.executors.status_handling import BlockProviderExecutor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if BlockManagingExecutor might be clearer

from parsl.providers.provider_base import ExecutionProvider
from parsl.data_provider.staging import Staging
from parsl.addresses import get_all_addresses
Expand All @@ -33,7 +33,7 @@
logger = logging.getLogger(__name__)


class HighThroughputExecutor(StatusHandlingExecutor, RepresentationMixin):
class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin):
"""Executor designed for cluster-scale

The HighThroughputExecutor system has the following components:
Expand Down Expand Up @@ -191,15 +191,13 @@ def __init__(self,

logger.debug("Initializing HighThroughputExecutor")

StatusHandlingExecutor.__init__(self, provider)
BlockProviderExecutor.__init__(self, provider)
self.label = label
self.launch_cmd = launch_cmd
self.worker_debug = worker_debug
self.storage_access = storage_access
self.working_dir = working_dir
self.managed = managed
self.blocks = {} # type: Dict[str, str]
self.block_mapping = {} # type: Dict[str, str]
self.cores_per_worker = cores_per_worker
self.mem_per_worker = mem_per_worker
self.max_workers = max_workers
Expand All @@ -222,9 +220,9 @@ def __init__(self,
self.provider.cores_per_node is not None:
cpu_slots = math.floor(self.provider.cores_per_node / cores_per_worker)

self.workers_per_node = min(max_workers, mem_slots, cpu_slots)
if self.workers_per_node == float('inf'):
self.workers_per_node = 1 # our best guess-- we do not have any provider hints
self._workers_per_node = min(max_workers, mem_slots, cpu_slots)
if self._workers_per_node == float('inf'):
self._workers_per_node = 1 # our best guess-- we do not have any provider hints

self._task_counter = 0
self.run_id = None # set to the correct run_id in dfk
Expand Down Expand Up @@ -596,34 +594,9 @@ def create_monitoring_info(self, status):
msg.append(d)
return msg

def scale_out(self, blocks=1):
"""Scales out the number of blocks by "blocks"
"""
if not self.provider:
raise (ScalingFailed(None, "No execution provider available"))
block_ids = []
for i in range(blocks):
block_id = str(len(self.blocks))
try:
job_id = self._launch_block(block_id)
self.blocks[block_id] = job_id
self.block_mapping[job_id] = block_id
block_ids.append(block_id)
except Exception as ex:
self._fail_job_async(block_id,
"Failed to start block {}: {}".format(block_id, ex))
return block_ids

def _launch_block(self, block_id: str) -> Any:
if self.launch_cmd is None:
raise ScalingFailed(self.provider.label, "No launch command")
launch_cmd = self.launch_cmd.format(block_id=block_id)
job_id = self.provider.submit(launch_cmd, 1)
logger.debug("Launched block {}->{}".format(block_id, job_id))
if not job_id:
raise(ScalingFailed(self.provider.label,
"Attempts to provision nodes via provider has failed"))
return job_id
@property
def workers_per_node(self) -> Union[int, float]:
return self._workers_per_node

def scale_in(self, blocks=None, block_ids=[], force=True, max_idletime=None):
"""Scale in the number of active blocks by specified amount.
Expand Down Expand Up @@ -705,15 +678,11 @@ def scale_in(self, blocks=None, block_ids=[], force=True, max_idletime=None):

return block_ids_killed

def _get_block_and_job_ids(self) -> Tuple[List[str], List[Any]]:
# Not using self.blocks.keys() and self.blocks.values() simultaneously
# The dictionary may be changed during invoking this function
# As scale_in and scale_out are invoked in multiple threads
block_ids = list(self.blocks.keys())
job_ids = [] # types: List[Any]
for bid in block_ids:
job_ids.append(self.blocks[bid])
return block_ids, job_ids
def _get_launch_command(self, block_id: str) -> str:
if self.launch_cmd is None:
raise ScalingFailed(self.provider.label, "No launch command")
launch_cmd = self.launch_cmd.format(block_id=block_id)
return launch_cmd

def shutdown(self, hub=True, targets='all', block=False):
"""Shutdown the executor, including all workers and controllers.
Expand Down
6 changes: 3 additions & 3 deletions parsl/executors/low_latency/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
from parsl.executors.low_latency import zmq_pipes
from parsl.executors.low_latency import interchange
from parsl.executors.errors import ScalingFailed, DeserializationError, BadMessage, UnsupportedFeatureError
from parsl.executors.status_handling import StatusHandlingExecutor
from parsl.executors.status_handling import BlockProviderExecutor
from parsl.utils import RepresentationMixin
from parsl.providers import LocalProvider

logger = logging.getLogger(__name__)


class LowLatencyExecutor(StatusHandlingExecutor, RepresentationMixin):
class LowLatencyExecutor(BlockProviderExecutor, RepresentationMixin):
"""
TODO: docstring for LowLatencyExecutor
"""
Expand All @@ -40,7 +40,7 @@ def __init__(self,
):
logger.debug("Initializing LowLatencyExecutor")

StatusHandlingExecutor.__init__(self, provider)
BlockProviderExecutor.__init__(self, provider)
self.label = label
self.launch_cmd = launch_cmd
self.provider = provider
Expand Down
99 changes: 87 additions & 12 deletions parsl/executors/status_handling.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,57 @@
import logging
import threading
from itertools import compress
from abc import abstractmethod
from abc import abstractmethod, abstractproperty
from concurrent.futures import Future
from typing import List, Any, Dict, Tuple
from typing import List, Any, Dict, Optional, Tuple, Union

import parsl # noqa F401
from parsl.executors.base import ParslExecutor
from parsl.executors.errors import ScalingFailed
from parsl.providers.provider_base import JobStatus, ExecutionProvider, JobState


logger = logging.getLogger(__name__)


class StatusHandlingExecutor(ParslExecutor):
def __init__(self, provider):
class BlockProviderExecutor(ParslExecutor):
"""A base class for executors which scale using blocks.

This base class is intended to help with executors which:

- use blocks of workers to execute tasks
- blocks of workers are launched on a batch system through
an `ExecutionProvider`

An implementing class should implement the abstract methods required by
`ParslExecutor` to submit tasks, as well as BlockProviderExecutor
abstract methods to provide the executor-specific command to start a block
of workers (the ``_get_launch_command`` method), and some basic scaling
information (``outstanding`` and ``workers_per_node`` properties).

This base class provides a ``scale_out`` method which will launch new
blocks. It does not provide a ``scale_in`` method, because scale-in
behaviour is not well defined in the Parsl scaling model and so behaviour
is left to individual executors.

Parsl scaling will provide scaling between min_blocks and max_blocks by
invoking scale_out, but it will not initialize the blocks requested by
any init_blocks parameter. Subclasses must implement that behaviour
themselves.
"""
def __init__(self, provider: ExecutionProvider):
super().__init__()
self._provider = provider # type: ExecutionProvider
self._provider = provider
# errors can happen during the submit call to the provider; this is used
# to keep track of such errors so that they can be handled in one place
# together with errors reported by status()
self._simulated_status = {}
self._simulated_status: Dict[Any, JobStatus] = {}
self._executor_bad_state = threading.Event()
self._executor_exception = None
self._executor_exception: Optional[Exception] = None
self._generated_block_id_counter = 1
self._tasks = {} # type: Dict[object, Future]
self.blocks = {} # type: Dict[str, str]
self.block_mapping = {} # type: Dict[str, str]

def _make_status_dict(self, block_ids: List[str], status_list: List[JobStatus]) -> Dict[str, JobStatus]:
"""Given a list of block ids and a list of corresponding status strings,
Expand Down Expand Up @@ -51,11 +79,6 @@ def status_polling_interval(self):
else:
return self._provider.status_polling_interval

@abstractmethod
def _get_block_and_job_ids(self) -> Tuple[List[str], List[Any]]:
raise NotImplementedError("Classes inheriting from StatusHandlingExecutor must implement "
"_get_block_and_job_ids()")

def _fail_job_async(self, block_id: Any, message: str):
"""Marks a job that has failed to start but would not otherwise be included in status()
as failed and report it in status()
Expand All @@ -65,6 +88,13 @@ def _fail_job_async(self, block_id: Any, message: str):
self._generated_block_id_counter += 1
self._simulated_status[block_id] = JobStatus(JobState.FAILED, message)

@abstractproperty
def outstanding(self) -> int:
"""This should return the number of tasks that the executor has been given to run (waiting to run, and running now)"""

raise NotImplementedError("Classes inheriting from BlockProviderExecutor must implement "
"outstanding()")

def status(self) -> Dict[str, JobStatus]:
"""Return status of all blocks."""

Expand Down Expand Up @@ -124,6 +154,51 @@ def _filter_scale_in_ids(self, to_kill, killed):
# Filters first iterable by bool values in second
return list(compress(to_kill, killed))

def scale_out(self, blocks: int = 1) -> List[str]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we move scale_in here? I'm guessing it's because WQ doesn't have the hold block and then cancel mechanism? Maybe we could add the simpler method here, and have HTEX override it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scaling out and scaling in, despite having similar names, work really differently.

We've had experience with block style scaling out with multiple generations of parsl executors - ipp, htex(+exex) and wq. We don't have much experience, and the experience we've had so far has been pretty poor, with managing scaling in.

This PR attempts to factor out the stuff that we had long, positive experience with: scaling out.

I don't want it to try to have abstractions for things we do not understand well / cannot do well: scaling in.

"""Scales out the number of blocks by "blocks"
"""
if not self.provider:
raise (ScalingFailed(None, "No execution provider available"))
block_ids = []
for i in range(blocks):
block_id = str(len(self.blocks))
try:
job_id = self._launch_block(block_id)
self.blocks[block_id] = job_id
self.block_mapping[job_id] = block_id
block_ids.append(block_id)
except Exception as ex:
self._fail_job_async(block_id,
"Failed to start block {}: {}".format(block_id, ex))
return block_ids

def _launch_block(self, block_id: str) -> Any:
launch_cmd = self._get_launch_command(block_id)
job_id = self.provider.submit(launch_cmd, 1)
logger.debug("Launched block {}->{}".format(block_id, job_id))
if not job_id:
raise(ScalingFailed(self.provider.label,
"Attempts to provision nodes via provider has failed"))
return job_id

@abstractmethod
def _get_launch_command(self, block_id: str) -> str:
pass

def _get_block_and_job_ids(self) -> Tuple[List[str], List[Any]]:
# Not using self.blocks.keys() and self.blocks.values() simultaneously
# The dictionary may be changed during invoking this function
# As scale_in and scale_out are invoked in multiple threads
block_ids = list(self.blocks.keys())
job_ids = [] # types: List[Any]
for bid in block_ids:
job_ids.append(self.blocks[bid])
return block_ids, job_ids

@abstractproperty
def workers_per_node(self) -> Union[int, float]:
pass


class NoStatusHandlingExecutor(ParslExecutor):
def __init__(self):
Expand Down
46 changes: 23 additions & 23 deletions parsl/executors/workqueue/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@
from parsl.executors.errors import ExecutorError
from parsl.data_provider.files import File
from parsl.errors import OptionalModuleMissing
from parsl.executors.status_handling import NoStatusHandlingExecutor
from parsl.executors.status_handling import BlockProviderExecutor
from parsl.providers.provider_base import ExecutionProvider
from parsl.providers import LocalProvider, CondorProvider
from parsl.executors.errors import ScalingFailed
from parsl.executors.workqueue import exec_parsl_function

import typeguard
from typing import Dict, List, Optional, Set
from typing import Dict, List, Optional, Set, Union
from parsl.data_provider.staging import Staging

from .errors import WorkQueueTaskFailure
Expand Down Expand Up @@ -74,7 +73,7 @@
ParslFileToWq = namedtuple('ParslFileToWq', 'parsl_name stage cache')


class WorkQueueExecutor(NoStatusHandlingExecutor, putils.RepresentationMixin):
class WorkQueueExecutor(BlockProviderExecutor, putils.RepresentationMixin):
"""Executor to use Work Queue batch system

The WorkQueueExecutor system utilizes the Work Queue framework to
Expand Down Expand Up @@ -221,8 +220,7 @@ def __init__(self,
worker_options: str = "",
full_debug: bool = True,
worker_executable: str = 'work_queue_worker'):
NoStatusHandlingExecutor.__init__(self)
self._provider = provider
BlockProviderExecutor.__init__(self, provider)
self._scaling_enabled = True

if not _work_queue_enabled:
Expand Down Expand Up @@ -274,6 +272,11 @@ def __init__(self,
if self.init_command != "":
self.launch_cmd = self.init_command + "; " + self.launch_cmd

def _get_launch_command(self, block_id):
# this executor uses different terminology for worker/launch
# commands than in htex
return self.worker_command

def start(self):
"""Create submit process and collector thread to create, send, and
retrieve Parsl tasks within the Work Queue system.
Expand Down Expand Up @@ -605,24 +608,21 @@ def initialize_scaling(self):
logger.debug("Scaling out failed: {}".format(e))
raise e

def scale_out(self, blocks=1):
"""Scale out method.

We should have the scale out method simply take resource object
which will have the scaling methods, scale_out itself should be a coroutine, since
scaling tasks can be slow.
@property
def outstanding(self) -> int:
"""Count the number of outstanding tasks. This is inefficiently
implemented and probably could be replaced with a counter.
"""
if self.provider:
for i in range(blocks):
external_block = str(len(self.blocks))
internal_block = self.provider.submit(self.worker_command, 1)
# Failed to create block with provider
if not internal_block:
raise(ScalingFailed(self.provider.label, "Attempts to create nodes using the provider has failed"))
else:
self.blocks[external_block] = internal_block
else:
logger.error("No execution provider available to scale")
outstanding = 0
for fut in self.tasks.values():
if not fut.done():
outstanding += 1
logger.debug(f"Counted {outstanding} outstanding tasks")
return outstanding

@property
def workers_per_node(self) -> Union[int, float]:
return 1

def scale_in(self, count):
"""Scale in method. Not implemented.
Expand Down
Loading