Skip to content

Commit

Permalink
TaskVineExecutor: add new features (#2809)
Browse files Browse the repository at this point in the history
tphung3
Contributor
tphung3 commented 2 weeks ago
Description
This PR brings new updates to the TaskVineExecutor:

Refactor the constructor of TaskVineExecutor. This helps de-clutter the current executor constructor and separates (almost all) configurations of different components (Executor interface that talks with DataFlowKernel, Manager process that manages tasks, and Factory process that manages workers)
Add a new option to use TaskVine factory as an alternative to Parsl provider. The benefits of the TaskVine factory are carried over to Parsl, with abilities to handle variable number of workers, dynamically size the number of workers according to the number of outstanding/pending tasks, ask for workers with an exact amount of resources (e.g., 8 cores 16 GBs memory worker), etc. Other features can be ported/added to upon request, but the features in this PR should serve well as the core/main use. The default factory will spawn a local process like the local provider, but much faster (reason: factory is run as a different process, while the provider shares CPU with the executor upon startup). The default worker provider is still Parsl provider for now.
Add a new keyword exec_mode to apps and introduce a new execution mode to apps. Now the executor supports regular execution mode, which uses the executor's serialization methods (i.e., exec_parsl_function.py and its friends), and python execution mode, which uses TaskVine manager's serialization methods (in short, works in the same principle as exec_parsl_function.py but uses cloudpickle and PythonTask, which are more native to TaskVine). The serverless feature (function service) was intended to be included in here but there seems to be a bug within TaskVine serverless tasks so it's delayed for now. Apps that use TaskVineExecutor will have a signature of something a long this line:
@python_app
def f(x, call_specs={'cores':1,'memory':4000,'gpus':4,'exec_mode': 'serverless'}):
    ...

@python_app
def g(x, call_specs={'cores':4, 'exec_mode': 'python'}):
    ...

@bash_app
def h(x, call_specs={'exec_mode': 'regular'}):
    ...
Add support for automatic conda environment packaging. This helps users with the ease of transition from local application execution to remote execution given that they use conda package manager to manage software dependencies. This works by packaging a local conda environment (the one that users are using), replicating this environment to workers, and running apps within this environment. From the users' perspective, one line of configuration supplying the conda environment name or path is enough for everything to work out.
A big chunk of changes to many little issues that arise when developing this PR. If needed I'll list them here, but there's really a lot and I want to publish these changes first for discussions.
  • Loading branch information
tphung3 authored Jul 21, 2023
1 parent 6bb3373 commit 41357c6
Show file tree
Hide file tree
Showing 22 changed files with 965 additions and 576 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
test ! -e stubs
# check we can build the docs without warnings
make SPHINXOPTS=-W html
PYTHONPATH=/tmp/cctools/lib/python3.8/site-packages make SPHINXOPTS=-W html
cd ..
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ $(CCTOOLS_INSTALL): #CCtools contains both taskvine and workqueue so install onl
parsl/executors/taskvine/install-taskvine.sh

.PHONY: vineex_local_test
vineex_local_test: $(CCTOOLS_INSTALL) ## run all tests with vineex_local config
vineex_local_test: $(CCTOOLS_INSTALL) ## run all tests with taskvine_ex config
PYTHONPATH=/tmp/cctools/lib/python3.8/site-packages pytest parsl/tests/ -k "not cleannet and not issue363" --config parsl/tests/configs/taskvine_ex.py --random-order --durations 10

.PHONY: wqex_local_test
wqex_local_test: $(CCTOOLS_INSTALL) ## run all tests with wqex_local config
wqex_local_test: $(CCTOOLS_INSTALL) ## run all tests with workqueue_ex config
PYTHONPATH=/tmp/cctools/lib/python3.8/site-packages pytest parsl/tests/ -k "not cleannet and not issue363" --config parsl/tests/configs/workqueue_ex.py --random-order --durations 10

.PHONY: config_local_test
Expand Down
1 change: 1 addition & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Executors
parsl.executors.ThreadPoolExecutor
parsl.executors.HighThroughputExecutor
parsl.executors.WorkQueueExecutor
parsl.executors.taskvine.TaskVineExecutor
parsl.executors.FluxExecutor

Launchers
Expand Down
27 changes: 17 additions & 10 deletions docs/userguide/configuring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,15 @@ Stepping through the following question should help formulate a suitable configu
| Laptop/Workstation | * `parsl.executors.HighThroughputExecutor` | `parsl.providers.LocalProvider` |
| | * `parsl.executors.ThreadPoolExecutor` | |
| | * `parsl.executors.WorkQueueExecutor` | |
| | * `parsl.executors.taskvine.TaskVineExecutor` | |
+---------------------+-----------------------------------------------+----------------------------------------+
| Amazon Web Services | * `parsl.executors.HighThroughputExecutor` | `parsl.providers.AWSProvider` |
+---------------------+-----------------------------------------------+----------------------------------------+
| Google Cloud | * `parsl.executors.HighThroughputExecutor` | `parsl.providers.GoogleCloudProvider` |
+---------------------+-----------------------------------------------+----------------------------------------+
| Slurm based system | * `parsl.executors.HighThroughputExecutor` | `parsl.providers.SlurmProvider` |
| | * `parsl.executors.WorkQueueExecutor` | |
| | * `parsl.executors.taskvine.TaskVineExecutor` | |
+---------------------+-----------------------------------------------+----------------------------------------+
| Torque/PBS based | * `parsl.executors.HighThroughputExecutor` | `parsl.providers.TorqueProvider` |
| system | * `parsl.executors.WorkQueueExecutor` | |
Expand All @@ -94,6 +96,7 @@ Stepping through the following question should help formulate a suitable configu
+---------------------+-----------------------------------------------+----------------------------------------+
| Condor based | * `parsl.executors.HighThroughputExecutor` | `parsl.providers.CondorProvider` |
| cluster or grid | * `parsl.executors.WorkQueueExecutor` | |
| | * `parsl.executors.taskvine.TaskVineExecutor` | |
+---------------------+-----------------------------------------------+----------------------------------------+
| Kubernetes cluster | * `parsl.executors.HighThroughputExecutor` | `parsl.providers.KubernetesProvider` |
+---------------------+-----------------------------------------------+----------------------------------------+
Expand All @@ -102,16 +105,18 @@ Stepping through the following question should help formulate a suitable configu
2. How many nodes will be used to execute the apps? What task durations are necessary to achieve good performance?


+------------------------------------------+----------------------+-------------------------------------+
| Executor | Number of Nodes [*]_ | Task duration for good performance |
+==========================================+======================+=====================================+
| `parsl.executors.ThreadPoolExecutor` | 1 (Only local) | Any |
+------------------------------------------+----------------------+-------------------------------------+
| `parsl.executors.HighThroughputExecutor` | <=2000 | Task duration(s)/#nodes >= 0.01 |
| | | longer tasks needed at higher scale |
+------------------------------------------+----------------------+-------------------------------------+
| `parsl.executors.WorkQueueExecutor` | <=1000 [*]_ | 10s+ |
+------------------------------------------+----------------------+-------------------------------------+
+--------------------------------------------+----------------------+-------------------------------------+
| Executor | Number of Nodes [*]_ | Task duration for good performance |
+============================================+======================+=====================================+
| `parsl.executors.ThreadPoolExecutor` | 1 (Only local) | Any |
+--------------------------------------------+----------------------+-------------------------------------+
| `parsl.executors.HighThroughputExecutor` | <=2000 | Task duration(s)/#nodes >= 0.01 |
| | | longer tasks needed at higher scale |
+--------------------------------------------+----------------------+-------------------------------------+
| `parsl.executors.WorkQueueExecutor` | <=1000 [*]_ | 10s+ |
+--------------------------------------------+----------------------+-------------------------------------+
| `parsl.executors.taskvine.TaskVineExecutor`| <=1000 [*]_ | 10s+ |
+--------------------------------------------+----------------------+-------------------------------------+


.. [*] Assuming 32 workers per node. If there are fewer workers launched
Expand All @@ -120,6 +125,8 @@ Stepping through the following question should help formulate a suitable configu
.. [*] The maximum number of nodes tested for the `parsl.executors.WorkQueueExecutor` is 10,000 GPU cores and
20,000 CPU cores.
.. [*] The maximum number of nodes tested for the `parsl.executors.taskvine.TaskVineExecutor` is
10,000 GPU cores and 20,000 CPU cores.
3. Should Parsl request multiple nodes in an individual scheduler job?
(Here the term block is equivalent to a single scheduler job.)
Expand Down
1 change: 1 addition & 0 deletions docs/userguide/execution.rst
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ Parsl currently supports the following executors:

3. `parsl.executors.WorkQueueExecutor`: This executor integrates `Work Queue <http://ccl.cse.nd.edu/software/workqueue/>`_ as an execution backend. Work Queue scales to tens of thousands of cores and implements reliable execution of tasks with dynamic resource sizing.

4. `parsl.executors.taskvine.TaskVineExecutor`: This executor uses `TaskVine <https://ccl.cse.nd.edu/software/taskvine/>`_ as the execution backend. TaskVine scales up to tens of thousands of cores and actively uses local storage on compute nodes to offer a diverse array of performance-oriented features, including: smart caching and sharing common large files between tasks and compute nodes, reliable execution of tasks, dynamic resource sizing, automatic Python environment detection and sharing.
These executors cover a broad range of execution requirements. As with other Parsl components, there is a standard interface (ParslExecutor) that can be implemented to add support for other executors.

.. note::
Expand Down
31 changes: 30 additions & 1 deletion parsl/addresses.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ def get_all_addresses() -> Set[str]:
s_addresses.add(address_by_interface(interface))
except Exception:
logger.exception("Ignoring failure to fetch address from interface {}".format(interface))
pass

resolution_functions: List[Callable[[], str]]
resolution_functions = [address_by_hostname, address_by_route, address_by_query]
Expand All @@ -122,3 +121,33 @@ def get_all_addresses() -> Set[str]:
logger.exception("Ignoring an address finder exception")

return s_addresses


def get_any_address() -> str:
""" Uses a combination of methods to find any address of the local machine.
Returns:
one address in string
"""
net_interfaces = psutil.net_if_addrs()

addr = ''
for interface in net_interfaces:
try:
addr = address_by_interface(interface)
return addr
except Exception:
logger.exception("Ignoring failure to fetch address from interface {}".format(interface))

resolution_functions: List[Callable[[], str]]
resolution_functions = [address_by_hostname, address_by_route, address_by_query]
for f in resolution_functions:
try:
addr = f()
return addr
except Exception:
logger.exception("Ignoring an address finder exception")

if addr == '':
raise Exception('Cannot find address of the local machine.')
return addr
11 changes: 2 additions & 9 deletions parsl/configs/vineex_local.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from parsl.config import Config
from parsl.executors.taskvine import TaskVineExecutor

from parsl.executors.taskvine import TaskVineManagerConfig
import uuid

config = Config(
Expand All @@ -13,14 +13,7 @@
# which can be viewed here: http://ccl.cse.nd.edu/software/taskvine/status

# To disable status reporting, comment out the project_name.
project_name="parsl-vine-" + str(uuid.uuid4()),

# The port number that TaskVine will listen on for connecting workers
# 0 means a random port.
port=0,

# A shared filesystem is not needed when using TaskVine.
shared_fs=False
manager_config=TaskVineManagerConfig(project_name="parsl-vine-" + str(uuid.uuid4())),
)
]
)
3 changes: 3 additions & 0 deletions parsl/dataflow/taskrecord.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ class TaskRecord(TypedDict, total=False):
try_id: int

resource_specification: Dict[str, Any]
"""Dictionary containing relevant info for a task execution.
Includes resources to allocate and execution mode as a given
executor permits."""

join: bool
"""Is this a join_app?"""
Expand Down
12 changes: 8 additions & 4 deletions parsl/executors/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,14 @@ def __init__(self, feature, current_executor, target_executor):
self.target_executor = target_executor

def __str__(self):
return "The {} feature is unsupported in {}. \
Please checkout {} for this feature".format(self.feature,
self.current_executor,
self.target_executor)
if self.target_executor:
return "The {} feature is unsupported in {}. \
Please checkout {} for this feature".format(self.feature,
self.current_executor,
self.target_executor)
else:
return "The {} feature is unsupported in {}.".format(self.feature,
self.current_executor)


class ScalingFailed(ExecutorError):
Expand Down
8 changes: 4 additions & 4 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ def submit(self, func, resource_specification, *args, **kwargs):
Args:
- func (callable) : Callable function
- resource_specification (dict): Dictionary containing relevant info about task that is needed by underlying executors.
- args (list) : List of arbitrary positional arguments.
Kwargs:
Expand All @@ -570,10 +571,9 @@ def submit(self, func, resource_specification, *args, **kwargs):
Future
"""
if resource_specification:
logger.error("Ignoring the resource specification. "
"Parsl resource specification is not supported in HighThroughput Executor. "
"Please check WorkQueueExecutor if resource specification is needed.")
raise UnsupportedFeatureError('resource specification', 'HighThroughput Executor', 'WorkQueue Executor')
logger.error("Ignoring the call specification. "
"Parsl call specification is not supported in HighThroughput Executor.")
raise UnsupportedFeatureError('resource specification', 'HighThroughput Executor', None)

if self.bad_state_is_set:
raise self.executor_exception
Expand Down
4 changes: 3 additions & 1 deletion parsl/executors/taskvine/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from parsl.executors.taskvine.executor import TaskVineExecutor
from parsl.executors.taskvine.manager_config import TaskVineManagerConfig
from parsl.executors.taskvine.factory_config import TaskVineFactoryConfig

__all__ = ['TaskVineExecutor']
__all__ = ['TaskVineExecutor', 'TaskVineManagerConfig', 'TaskVineFactoryConfig']
15 changes: 11 additions & 4 deletions parsl/executors/taskvine/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,27 @@


class TaskVineTaskFailure(AppException):
"""A failure executing a task in taskvine
"""A failure executing a task in TaskVine
Contains:
reason(string)
status(int)
"""

def __init__(self, reason, status):
def __init__(self, reason: str, status: int):
self.reason = reason
self.status = status


class TaskVineFailure(ParslError):
class TaskVineManagerFailure(ParslError):
"""A failure in the taskvine executor that prevented the task to be
executed.""
executed.
"""
pass


class TaskVineFactoryFailure(ParslError):
"""A failure in the TaskVine factory that prevents the factory from
supplying workers to the manager.
"""
pass
49 changes: 16 additions & 33 deletions parsl/executors/taskvine/exec_parsl_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@
import sys
import pickle

# This scripts executes a parsl function which is pickled in a file:
# This scripts executes a parsl function which is pickled in 3 files:
#
# exec_parsl_function.py map_file function_file result_file
#
# map_file: Contains a pickled dictionary that indicates which local_paths the
# parsl Files should take.
#
# function_file: Contains a pickle parsl function.
# function_file: Contains a pickle parsl function. Function might be serialized in advance.
# See @parsl.serialize.concretes.py
#
# result_file: It will contain the result of the function, including any
# result_file: A file path, whose content will contain the result of the function, including any
# exception generated. Exceptions will be wrapped with RemoteExceptionWrapper.
#
# Exit codes:
Expand All @@ -25,17 +26,20 @@
#


def load_pickled_file(filename):
def load_pickled_file(filename: str):
""" Load a pickled file and return its pickled object."""
with open(filename, "rb") as f_in:
return pickle.load(f_in)


def dump_result_to_file(result_file, result_package):
def dump_result_to_file(result_file: str, result_package):
""" Dump a result to the given result file."""
with open(result_file, "wb") as f_out:
pickle.dump(result_package, f_out)


def remap_location(mapping, parsl_file):
""" Remap files from local name (on manager) to remote name (on worker)."""
if not isinstance(parsl_file, File):
return
# Below we rewrite .local_path when scheme != file only when the local_name
Expand All @@ -48,6 +52,7 @@ def remap_location(mapping, parsl_file):


def remap_list_of_files(mapping, maybe_files):
""" Remap a list of potential files."""
for maybe_file in maybe_files:
remap_location(mapping, maybe_file)

Expand All @@ -74,29 +79,20 @@ def remap_all_files(mapping, fn_args, fn_kwargs):


def unpack_function(function_info, user_namespace):
if "source code" in function_info:
return unpack_source_code_function(function_info, user_namespace)
elif "byte code" in function_info:
return unpack_byte_code_function(function_info, user_namespace)
else:
raise ValueError("Function file does not have a valid function representation.")


def unpack_source_code_function(function_info, user_namespace):
source_code = function_info["source code"]
name = function_info["name"]
args = function_info["args"]
kwargs = function_info["kwargs"]
return (source_code, name, args, kwargs)
""" Unpack a function according to its encoding scheme."""
return unpack_byte_code_function(function_info, user_namespace)


def unpack_byte_code_function(function_info, user_namespace):
""" Returns a function object, a default name, positional arguments, and keyword arguments
for a function."""
from parsl.serialize import unpack_apply_message
func, args, kwargs = unpack_apply_message(function_info["byte code"], user_namespace, copy=False)
return (func, 'parsl_function_name', args, kwargs)


def encode_function(user_namespace, fn, fn_name, fn_args, fn_kwargs):
""" Register the given function to the given namespace."""
# Returns a tuple (code, result_name)
# code can be exec in the user_namespace to produce result_name.
prefix = "parsl_"
Expand All @@ -109,27 +105,14 @@ def encode_function(user_namespace, fn, fn_name, fn_args, fn_kwargs):
kwargs_name: fn_kwargs,
result_name: result_name})

if isinstance(fn, str):
code = encode_source_code_function(user_namespace, fn, fn_name, args_name, kwargs_name, result_name)
elif callable(fn):
if callable(fn):
code = encode_byte_code_function(user_namespace, fn, fn_name, args_name, kwargs_name, result_name)
else:
raise ValueError("Function object does not look like a function.")

return (code, result_name)


def encode_source_code_function(user_namespace, fn, fn_name, args_name, kwargs_name, result_name):
# We drop the first line as it names the parsl decorator used (i.e., @python_app)
source = fn.split('\n')[1:]
fn_app = "{0} = {1}(*{2}, **{3})".format(result_name, fn_name, args_name, kwargs_name)

source.append(fn_app)

code = "\n".join(source)
return code


def encode_byte_code_function(user_namespace, fn, fn_name, args_name, kwargs_name, result_name):
user_namespace.update({fn_name: fn})
code = "{0} = {1}(*{2}, **{3})".format(result_name, fn_name, args_name, kwargs_name)
Expand Down
Loading

0 comments on commit 41357c6

Please sign in to comment.