diff --git a/docs/reference.rst b/docs/reference.rst index 25ae72e386..076d890de9 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -72,6 +72,7 @@ Executors :nosignatures: parsl.executors.base.ParslExecutor + parsl.executors.status_handling.BlockProviderExecutor parsl.executors.ThreadPoolExecutor parsl.executors.HighThroughputExecutor parsl.executors.WorkQueueExecutor diff --git a/parsl/executors/base.py b/parsl/executors/base.py index e4927ce6bd..579ec1855e 100644 --- a/parsl/executors/base.py +++ b/parsl/executors/base.py @@ -160,7 +160,7 @@ def error_management_enabled(self) -> bool: Some of the scaffolding needed for implementing error management inside executors, including implementations for the status handling methods above, is available in - :class:parsl.executors.status_handling.StatusHandlingExecutor, which, interested executors, + :class:parsl.executors.status_handling.BlockProviderExecutor, which interested executors should inherit from. Noop versions of methods that are related to status handling and running parsl tasks through workers are implemented by :class:parsl.executors.status_handling.NoStatusHandlingExecutor. diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index f38acffa9c..e4d0331daf 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -7,7 +7,7 @@ import pickle from multiprocessing import Queue from typing import Dict # noqa F401 (used in type annotation) -from typing import List, Optional, Tuple, Union, Any +from typing import List, Optional, Tuple, Union import math from parsl.serialize import pack_apply_message, deserialize @@ -20,7 +20,7 @@ UnsupportedFeatureError ) -from parsl.executors.status_handling import StatusHandlingExecutor +from parsl.executors.status_handling import BlockProviderExecutor from parsl.providers.provider_base import ExecutionProvider from parsl.data_provider.staging import Staging from parsl.addresses import get_all_addresses @@ -33,7 +33,7 @@ logger = logging.getLogger(__name__) -class HighThroughputExecutor(StatusHandlingExecutor, RepresentationMixin): +class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin): """Executor designed for cluster-scale The HighThroughputExecutor system has the following components: @@ -191,15 +191,13 @@ def __init__(self, logger.debug("Initializing HighThroughputExecutor") - StatusHandlingExecutor.__init__(self, provider) + BlockProviderExecutor.__init__(self, provider) self.label = label self.launch_cmd = launch_cmd self.worker_debug = worker_debug self.storage_access = storage_access self.working_dir = working_dir self.managed = managed - self.blocks = {} # type: Dict[str, str] - self.block_mapping = {} # type: Dict[str, str] self.cores_per_worker = cores_per_worker self.mem_per_worker = mem_per_worker self.max_workers = max_workers @@ -222,9 +220,9 @@ def __init__(self, self.provider.cores_per_node is not None: cpu_slots = math.floor(self.provider.cores_per_node / cores_per_worker) - self.workers_per_node = min(max_workers, mem_slots, cpu_slots) - if self.workers_per_node == float('inf'): - self.workers_per_node = 1 # our best guess-- we do not have any provider hints + self._workers_per_node = min(max_workers, mem_slots, cpu_slots) + if self._workers_per_node == float('inf'): + self._workers_per_node = 1 # our best guess-- we do not have any provider hints self._task_counter = 0 self.run_id = None # set to the correct run_id in dfk @@ -596,34 +594,9 @@ def create_monitoring_info(self, status): msg.append(d) return msg - def scale_out(self, blocks=1): - """Scales out the number of blocks by "blocks" - """ - if not self.provider: - raise (ScalingFailed(None, "No execution provider available")) - block_ids = [] - for i in range(blocks): - block_id = str(len(self.blocks)) - try: - job_id = self._launch_block(block_id) - self.blocks[block_id] = job_id - self.block_mapping[job_id] = block_id - block_ids.append(block_id) - except Exception as ex: - self._fail_job_async(block_id, - "Failed to start block {}: {}".format(block_id, ex)) - return block_ids - - def _launch_block(self, block_id: str) -> Any: - if self.launch_cmd is None: - raise ScalingFailed(self.provider.label, "No launch command") - launch_cmd = self.launch_cmd.format(block_id=block_id) - job_id = self.provider.submit(launch_cmd, 1) - logger.debug("Launched block {}->{}".format(block_id, job_id)) - if not job_id: - raise(ScalingFailed(self.provider.label, - "Attempts to provision nodes via provider has failed")) - return job_id + @property + def workers_per_node(self) -> Union[int, float]: + return self._workers_per_node def scale_in(self, blocks=None, block_ids=[], force=True, max_idletime=None): """Scale in the number of active blocks by specified amount. @@ -705,15 +678,11 @@ def scale_in(self, blocks=None, block_ids=[], force=True, max_idletime=None): return block_ids_killed - def _get_block_and_job_ids(self) -> Tuple[List[str], List[Any]]: - # Not using self.blocks.keys() and self.blocks.values() simultaneously - # The dictionary may be changed during invoking this function - # As scale_in and scale_out are invoked in multiple threads - block_ids = list(self.blocks.keys()) - job_ids = [] # types: List[Any] - for bid in block_ids: - job_ids.append(self.blocks[bid]) - return block_ids, job_ids + def _get_launch_command(self, block_id: str) -> str: + if self.launch_cmd is None: + raise ScalingFailed(self.provider.label, "No launch command") + launch_cmd = self.launch_cmd.format(block_id=block_id) + return launch_cmd def shutdown(self, hub=True, targets='all', block=False): """Shutdown the executor, including all workers and controllers. diff --git a/parsl/executors/low_latency/executor.py b/parsl/executors/low_latency/executor.py index 87ce2d88c1..94deca584f 100644 --- a/parsl/executors/low_latency/executor.py +++ b/parsl/executors/low_latency/executor.py @@ -11,14 +11,14 @@ from parsl.executors.low_latency import zmq_pipes from parsl.executors.low_latency import interchange from parsl.executors.errors import ScalingFailed, DeserializationError, BadMessage, UnsupportedFeatureError -from parsl.executors.status_handling import StatusHandlingExecutor +from parsl.executors.status_handling import BlockProviderExecutor from parsl.utils import RepresentationMixin from parsl.providers import LocalProvider logger = logging.getLogger(__name__) -class LowLatencyExecutor(StatusHandlingExecutor, RepresentationMixin): +class LowLatencyExecutor(BlockProviderExecutor, RepresentationMixin): """ TODO: docstring for LowLatencyExecutor """ @@ -40,7 +40,7 @@ def __init__(self, ): logger.debug("Initializing LowLatencyExecutor") - StatusHandlingExecutor.__init__(self, provider) + BlockProviderExecutor.__init__(self, provider) self.label = label self.launch_cmd = launch_cmd self.provider = provider diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index e60f39c18b..1a1d4f56ce 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -1,29 +1,57 @@ import logging import threading from itertools import compress -from abc import abstractmethod +from abc import abstractmethod, abstractproperty from concurrent.futures import Future -from typing import List, Any, Dict, Tuple +from typing import List, Any, Dict, Optional, Tuple, Union import parsl # noqa F401 from parsl.executors.base import ParslExecutor +from parsl.executors.errors import ScalingFailed from parsl.providers.provider_base import JobStatus, ExecutionProvider, JobState + logger = logging.getLogger(__name__) -class StatusHandlingExecutor(ParslExecutor): - def __init__(self, provider): +class BlockProviderExecutor(ParslExecutor): + """A base class for executors which scale using blocks. + + This base class is intended to help with executors which: + + - use blocks of workers to execute tasks + - blocks of workers are launched on a batch system through + an `ExecutionProvider` + + An implementing class should implement the abstract methods required by + `ParslExecutor` to submit tasks, as well as BlockProviderExecutor + abstract methods to provide the executor-specific command to start a block + of workers (the ``_get_launch_command`` method), and some basic scaling + information (``outstanding`` and ``workers_per_node`` properties). + + This base class provides a ``scale_out`` method which will launch new + blocks. It does not provide a ``scale_in`` method, because scale-in + behaviour is not well defined in the Parsl scaling model and so behaviour + is left to individual executors. + + Parsl scaling will provide scaling between min_blocks and max_blocks by + invoking scale_out, but it will not initialize the blocks requested by + any init_blocks parameter. Subclasses must implement that behaviour + themselves. + """ + def __init__(self, provider: ExecutionProvider): super().__init__() - self._provider = provider # type: ExecutionProvider + self._provider = provider # errors can happen during the submit call to the provider; this is used # to keep track of such errors so that they can be handled in one place # together with errors reported by status() - self._simulated_status = {} + self._simulated_status: Dict[Any, JobStatus] = {} self._executor_bad_state = threading.Event() - self._executor_exception = None + self._executor_exception: Optional[Exception] = None self._generated_block_id_counter = 1 self._tasks = {} # type: Dict[object, Future] + self.blocks = {} # type: Dict[str, str] + self.block_mapping = {} # type: Dict[str, str] def _make_status_dict(self, block_ids: List[str], status_list: List[JobStatus]) -> Dict[str, JobStatus]: """Given a list of block ids and a list of corresponding status strings, @@ -51,11 +79,6 @@ def status_polling_interval(self): else: return self._provider.status_polling_interval - @abstractmethod - def _get_block_and_job_ids(self) -> Tuple[List[str], List[Any]]: - raise NotImplementedError("Classes inheriting from StatusHandlingExecutor must implement " - "_get_block_and_job_ids()") - def _fail_job_async(self, block_id: Any, message: str): """Marks a job that has failed to start but would not otherwise be included in status() as failed and report it in status() @@ -65,6 +88,13 @@ def _fail_job_async(self, block_id: Any, message: str): self._generated_block_id_counter += 1 self._simulated_status[block_id] = JobStatus(JobState.FAILED, message) + @abstractproperty + def outstanding(self) -> int: + """This should return the number of tasks that the executor has been given to run (waiting to run, and running now)""" + + raise NotImplementedError("Classes inheriting from BlockProviderExecutor must implement " + "outstanding()") + def status(self) -> Dict[str, JobStatus]: """Return status of all blocks.""" @@ -124,6 +154,51 @@ def _filter_scale_in_ids(self, to_kill, killed): # Filters first iterable by bool values in second return list(compress(to_kill, killed)) + def scale_out(self, blocks: int = 1) -> List[str]: + """Scales out the number of blocks by "blocks" + """ + if not self.provider: + raise (ScalingFailed(None, "No execution provider available")) + block_ids = [] + for i in range(blocks): + block_id = str(len(self.blocks)) + try: + job_id = self._launch_block(block_id) + self.blocks[block_id] = job_id + self.block_mapping[job_id] = block_id + block_ids.append(block_id) + except Exception as ex: + self._fail_job_async(block_id, + "Failed to start block {}: {}".format(block_id, ex)) + return block_ids + + def _launch_block(self, block_id: str) -> Any: + launch_cmd = self._get_launch_command(block_id) + job_id = self.provider.submit(launch_cmd, 1) + logger.debug("Launched block {}->{}".format(block_id, job_id)) + if not job_id: + raise(ScalingFailed(self.provider.label, + "Attempts to provision nodes via provider has failed")) + return job_id + + @abstractmethod + def _get_launch_command(self, block_id: str) -> str: + pass + + def _get_block_and_job_ids(self) -> Tuple[List[str], List[Any]]: + # Not using self.blocks.keys() and self.blocks.values() simultaneously + # The dictionary may be changed during invoking this function + # As scale_in and scale_out are invoked in multiple threads + block_ids = list(self.blocks.keys()) + job_ids = [] # types: List[Any] + for bid in block_ids: + job_ids.append(self.blocks[bid]) + return block_ids, job_ids + + @abstractproperty + def workers_per_node(self) -> Union[int, float]: + pass + class NoStatusHandlingExecutor(ParslExecutor): def __init__(self): diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index 24ae164542..65bb8b2a83 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -25,14 +25,13 @@ from parsl.executors.errors import ExecutorError from parsl.data_provider.files import File from parsl.errors import OptionalModuleMissing -from parsl.executors.status_handling import NoStatusHandlingExecutor +from parsl.executors.status_handling import BlockProviderExecutor from parsl.providers.provider_base import ExecutionProvider from parsl.providers import LocalProvider, CondorProvider -from parsl.executors.errors import ScalingFailed from parsl.executors.workqueue import exec_parsl_function import typeguard -from typing import Dict, List, Optional, Set +from typing import Dict, List, Optional, Set, Union from parsl.data_provider.staging import Staging from .errors import WorkQueueTaskFailure @@ -74,7 +73,7 @@ ParslFileToWq = namedtuple('ParslFileToWq', 'parsl_name stage cache') -class WorkQueueExecutor(NoStatusHandlingExecutor, putils.RepresentationMixin): +class WorkQueueExecutor(BlockProviderExecutor, putils.RepresentationMixin): """Executor to use Work Queue batch system The WorkQueueExecutor system utilizes the Work Queue framework to @@ -221,8 +220,7 @@ def __init__(self, worker_options: str = "", full_debug: bool = True, worker_executable: str = 'work_queue_worker'): - NoStatusHandlingExecutor.__init__(self) - self._provider = provider + BlockProviderExecutor.__init__(self, provider) self._scaling_enabled = True if not _work_queue_enabled: @@ -274,6 +272,11 @@ def __init__(self, if self.init_command != "": self.launch_cmd = self.init_command + "; " + self.launch_cmd + def _get_launch_command(self, block_id): + # this executor uses different terminology for worker/launch + # commands than in htex + return self.worker_command + def start(self): """Create submit process and collector thread to create, send, and retrieve Parsl tasks within the Work Queue system. @@ -605,24 +608,21 @@ def initialize_scaling(self): logger.debug("Scaling out failed: {}".format(e)) raise e - def scale_out(self, blocks=1): - """Scale out method. - - We should have the scale out method simply take resource object - which will have the scaling methods, scale_out itself should be a coroutine, since - scaling tasks can be slow. + @property + def outstanding(self) -> int: + """Count the number of outstanding tasks. This is inefficiently + implemented and probably could be replaced with a counter. """ - if self.provider: - for i in range(blocks): - external_block = str(len(self.blocks)) - internal_block = self.provider.submit(self.worker_command, 1) - # Failed to create block with provider - if not internal_block: - raise(ScalingFailed(self.provider.label, "Attempts to create nodes using the provider has failed")) - else: - self.blocks[external_block] = internal_block - else: - logger.error("No execution provider available to scale") + outstanding = 0 + for fut in self.tasks.values(): + if not fut.done(): + outstanding += 1 + logger.debug(f"Counted {outstanding} outstanding tasks") + return outstanding + + @property + def workers_per_node(self) -> Union[int, float]: + return 1 def scale_in(self, count): """Scale in method. Not implemented. diff --git a/parsl/tests/configs/workqueue_blocks.py b/parsl/tests/configs/workqueue_blocks.py new file mode 100644 index 0000000000..f7631cd70d --- /dev/null +++ b/parsl/tests/configs/workqueue_blocks.py @@ -0,0 +1,12 @@ +from parsl.config import Config +from parsl.executors import WorkQueueExecutor + +from parsl.data_provider.http import HTTPInTaskStaging +from parsl.data_provider.ftp import FTPInTaskStaging +from parsl.data_provider.file_noop import NoOpFileStaging + +from parsl.providers import LocalProvider + +config = Config(executors=[WorkQueueExecutor(port=9000, + storage_access=[FTPInTaskStaging(), HTTPInTaskStaging(), NoOpFileStaging()], + provider=LocalProvider(init_blocks=0, min_blocks=0, max_blocks=1))])