diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index a15a444d2c..d7a7bfe2ad 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -20,7 +20,7 @@ import uuid from concurrent.futures import Future from datetime import datetime -from typing import List, Literal, Optional, Union +from typing import Dict, List, Literal, Optional, Union # Import other libraries import typeguard @@ -81,8 +81,12 @@ class TaskVineExecutor(BlockProviderExecutor, putils.RepresentationMixin): pre-warmed forked python process. Default is 'regular'. + use_tmp_dir_for_staging: bool + Whether to use tmp dir for staging functions, arguments, and results. + Default is True. + manager_config: TaskVineManagerConfig - Configuration for the TaskVine manager. Default + Configuration for the TaskVine manager. factory_config: TaskVineFactoryConfig Configuration for the TaskVine factory. @@ -104,6 +108,7 @@ def __init__(self, label: str = "TaskVineExecutor", worker_launch_method: Union[Literal['provider'], Literal['factory'], Literal['manual']] = 'factory', function_exec_mode: Union[Literal['regular'], Literal['serverless']] = 'regular', + use_tmp_dir_for_staging: bool = True, manager_config: TaskVineManagerConfig = TaskVineManagerConfig(), factory_config: TaskVineFactoryConfig = TaskVineFactoryConfig(), provider: Optional[ExecutionProvider] = LocalProvider(init_blocks=1), @@ -129,6 +134,7 @@ def __init__(self, self.label = label self.worker_launch_method = worker_launch_method self.function_exec_mode = function_exec_mode + self.use_tmp_dir_for_staging = use_tmp_dir_for_staging self.manager_config = manager_config self.factory_config = factory_config self.storage_access = storage_access @@ -172,6 +178,13 @@ def __init__(self, # Path to directory that holds all tasks' data and results. self._function_data_dir = "" + # Mapping of function names to function details. + # Currently the values include function objects, path to serialized functions, + # path to serialized function contexts, and whether functions are serialized. + # Helpful to detect inconsistencies in serverless functions. + # Helpful to deduplicate the same function. + self._map_func_names_to_func_details: Dict[str, Dict] = {} + # Helper scripts to prepare package tarballs for Parsl apps self._package_analyze_script = shutil.which("poncho_package_analyze") self._package_create_script = shutil.which("poncho_package_create") @@ -218,8 +231,12 @@ def __create_data_and_logging_dirs(self): # Create directories for data and results log_dir = os.path.join(run_dir, self.label) os.makedirs(log_dir) - tmp_prefix = f'{self.label}-{getpass.getuser()}-{datetime.now().strftime("%Y%m%d%H%M%S%f")}-' - self._function_data_dir = tempfile.TemporaryDirectory(prefix=tmp_prefix) + + if self.use_tmp_dir_for_staging: + tmp_prefix = f'{self.label}-{getpass.getuser()}-{datetime.now().strftime("%Y%m%d%H%M%S%f")}-' + self._function_data_dir = tempfile.TemporaryDirectory(prefix=tmp_prefix) + else: + self._function_data_dir = os.path.join(log_dir, 'function') # put TaskVine logs outside of a Parsl run as TaskVine caches between runs while # Parsl does not. @@ -318,11 +335,35 @@ def submit(self, func, resource_specification, *args, **kwargs): Keyword arguments to the Parsl app """ + # a Parsl function must have a name + if func.__name__ is None: + raise ValueError('A Parsl function must have a name') + logger.debug(f'Got resource specification: {resource_specification}') + # If `_parsl_monitoring_task_id` is in kwargs, Parsl monitoring code is enabled. + is_monitoring_enabled = '_parsl_monitoring_task_id' in kwargs + # Default execution mode of apps is regular exec_mode = resource_specification.get('exec_mode', self.function_exec_mode) + # Fall back to regular execution if a function is Parsl-monitored as a monitored function is invocation-specific. + # Note that it is possible to get the wrapped function by calling the `__wrapped__` attribute when monitoring is enabled. + # It will disable the monitoring wrapper code however. + if exec_mode == 'serverless' and is_monitoring_enabled: + logger.warning("A serverless task cannot run with Parsl monitoring enabled. Falling back to execute this task as a regular task.") + exec_mode = 'regular' + + if exec_mode == 'serverless': + if func.__name__ not in self._map_func_names_to_func_details: + self._map_func_names_to_func_details[func.__name__] = {'func_obj': func} + else: + if id(func) != id(self._map_func_names_to_func_details[func.__name__]['func_obj']): + logger.warning('Inconsistency in a serverless function call detected.\ + A function name cannot point to two different function objects.\ + Falling back to executing it as a regular task.') + exec_mode = 'regular' + # Detect resources and features of a submitted Parsl app cores = None memory = None @@ -386,19 +427,43 @@ def submit(self, func, resource_specification, *args, **kwargs): argument_file = None result_file = None map_file = None + function_context_file = None # Get path to files that will contain the pickled function, # arguments, result, and map of input and output files - function_file = self._path_in_task(executor_task_id, "function") + if exec_mode == 'serverless': + if 'function_file' not in self._map_func_names_to_func_details[func.__name__]: + function_file = os.path.join(self._function_data_dir.name, func.__name__, 'function') + os.makedirs(os.path.join(self._function_data_dir.name, func.__name__)) + self._map_func_names_to_func_details[func.__name__].update({'function_file': function_file, 'is_serialized': False}) + else: + function_file = self._map_func_names_to_func_details[func.__name__]['function_file'] + else: + function_file = self._path_in_task(executor_task_id, "function") argument_file = self._path_in_task(executor_task_id, "argument") result_file = self._path_in_task(executor_task_id, "result") map_file = self._path_in_task(executor_task_id, "map") + if exec_mode == 'serverless': + if 'function_context' in resource_specification: + if 'function_context_file' not in self._map_func_names_to_func_details[func.__name__]: + function_context = resource_specification.get('function_context') + function_context_args = resource_specification.get('function_context_args', []) + function_context_kwargs = resource_specification.get('function_context_kwargs', {}) + function_context_file = os.path.join(self._function_data_dir.name, func.__name__, 'function_context') + self._serialize_object_to_file(function_context_file, [function_context, function_context_args, function_context_kwargs]) + self._map_func_names_to_func_details[func.__name__].update({'function_context_file': function_context_file}) + else: + function_context_file = self._map_func_names_to_func_details[func.__name__]['function_context_file'] + logger.debug("Creating executor task {} with function at: {}, argument at: {}, \ and result to be found at: {}".format(executor_task_id, function_file, argument_file, result_file)) # Serialize function object and arguments, separately - self._serialize_object_to_file(function_file, func) + if exec_mode == 'regular' or not self._map_func_names_to_func_details[func.__name__]['is_serialized']: + self._serialize_object_to_file(function_file, func) + if exec_mode == 'serverless': + self._map_func_names_to_func_details[func.__name__]['is_serialized'] = True args_dict = {'args': args, 'kwargs': kwargs} self._serialize_object_to_file(argument_file, args_dict) @@ -419,6 +484,7 @@ def submit(self, func, resource_specification, *args, **kwargs): category = func.__name__ if self.manager_config.autocategory else 'parsl-default' task_info = ParslTaskToVine(executor_id=executor_task_id, + func_name=func.__name__, exec_mode=exec_mode, category=category, input_files=input_files, @@ -427,6 +493,7 @@ def submit(self, func, resource_specification, *args, **kwargs): function_file=function_file, argument_file=argument_file, result_file=result_file, + function_context_file=function_context_file, cores=cores, memory=memory, disk=disk, diff --git a/parsl/executors/taskvine/manager.py b/parsl/executors/taskvine/manager.py index 94cfb2e391..c4c7e64381 100644 --- a/parsl/executors/taskvine/manager.py +++ b/parsl/executors/taskvine/manager.py @@ -7,8 +7,9 @@ import uuid from parsl.executors.taskvine import exec_parsl_function -from parsl.executors.taskvine.utils import VineTaskToParsl, run_parsl_function +from parsl.executors.taskvine.utils import VineTaskToParsl from parsl.process_loggers import wrap_with_logs +from parsl.serialize import deserialize, serialize from parsl.utils import setproctitle try: @@ -134,6 +135,22 @@ def _prepare_environment_regular(m, manager_config, t, task, poncho_env_to_file, t.add_environment(poncho_env_file) +def _deserialize_object_from_file(path): + """Deserialize an object from a given file path.""" + with open(path, 'rb') as f_in: + obj = deserialize(f_in.read()) + return obj + + +def _serialize_object_to_file(path, obj): + """Serialize an object to the given file path.""" + serialized_obj = serialize(obj) + with open(path, 'wb') as f_out: + written = 0 + while written < len(serialized_obj): + written += f_out.write(serialized_obj[written:]) + + @wrap_with_logs def _taskvine_submit_wait(ready_task_queue=None, finished_task_queue=None, @@ -194,8 +211,9 @@ def _taskvine_submit_wait(ready_task_queue=None, # Declare helper script as cache-able and peer-transferable exec_parsl_function_file = m.declare_file(exec_parsl_function.__file__, cache=True, peer_transfer=True) - # Flag to make sure library for serverless tasks is declared and installed only once. - lib_installed = False + # Mapping from function names to library names + # This assumes that there's 1 library per function and vice versa (1-to-1 correspondence). + libs_installed = {} # Create cache dir for environment files env_cache_dir = os.path.join(manager_config.vine_log_dir, 'vine-cache', 'vine-poncho-env-cache') @@ -219,6 +237,7 @@ def _taskvine_submit_wait(ready_task_queue=None, except queue.Empty: logger.debug("Queue is empty") continue + if task.exec_mode == 'regular': # Create command string launch_cmd = "python3 exec_parsl_function.py {mapping} {function} {argument} {result}" @@ -241,36 +260,49 @@ def _taskvine_submit_wait(ready_task_queue=None, status=-1)) continue elif task.exec_mode == 'serverless': - if not lib_installed: - # Declare and install common library for serverless tasks. + if task.func_name not in libs_installed: + # Declare and install one library for serverless tasks per category, and vice versa. # Library requires an environment setup properly, which is # different from setup of regular tasks. # If shared_fs is True, then no environment preparation is done. # Only the core serverless code is created. poncho_env_path = _prepare_environment_serverless(manager_config, env_cache_dir, poncho_create_script) + # Deserialize the function to add it to the library + # This cost is paid only once per function/app. + func = _deserialize_object_from_file(task.function_file) + + # Deserialize the function context to add it to the library if available + # This cost is paid only once per function/app. + function_context_list = None + if task.function_context_file: + function_context_list = _deserialize_object_from_file(task.function_context_file) + # Don't automatically add environment so manager can declare and cache the vine file associated with the environment file add_env = False - serverless_lib = m.create_library_from_functions('common-parsl-taskvine-lib', - run_parsl_function, + lib_name = f'{task.func_name}-lib' + serverless_lib = m.create_library_from_functions(lib_name, + func, poncho_env=poncho_env_path, init_command=manager_config.init_command, - add_env=add_env) + exec_mode='direct', + add_env=add_env, + library_context_info=function_context_list) # Configure the library if provided if manager_config.library_config: lib_cores = manager_config.library_config.get('cores', None) lib_memory = manager_config.library_config.get('memory', None) lib_disk = manager_config.library_config.get('disk', None) - lib_slots = manager_config.library_config.get('num_slots', None) + # 1 function call per library at any given moment + lib_slots = manager_config.library_config.get('num_slots', 1) if lib_cores: serverless_lib.set_cores(lib_cores) if lib_memory: serverless_lib.set_memory(lib_memory) if lib_disk: serverless_lib.set_disk(lib_disk) - if lib_slots: - serverless_lib.set_function_slots(lib_slots) + serverless_lib.set_function_slots(lib_slots) if poncho_env_path: serverless_lib_env_file = m.declare_poncho(poncho_env_path, cache=True, peer_transfer=True) @@ -281,12 +313,16 @@ def _taskvine_submit_wait(ready_task_queue=None, logger.debug('Created minimal library task with no environment.') m.install_library(serverless_lib) - lib_installed = True + libs_installed[task.func_name] = lib_name try: - # run_parsl_function only needs remote names of map_file, function_file, argument_file, - # and result_file, which are simply named map, function, argument, result. - # These names are given when these files are declared below. - t = FunctionCall('common-parsl-taskvine-lib', run_parsl_function.__name__, 'map', 'function', 'argument', 'result') + # deserialize the arguments to the function call so they can be serialized again in TaskVine way. + # this double serialization hurts the performance of the system, + # but there's no way around it at the moment. Mark as TODO for future work. + all_args = _deserialize_object_from_file(task.argument_file) + args = all_args['args'] + kwargs = all_args['kwargs'] + + t = FunctionCall(libs_installed[task.func_name], task.func_name, *args, **kwargs) except Exception as e: logger.error("Unable to create executor task (mode:serverless): {}".format(e)) finished_task_queue.put_nowait(VineTaskToParsl(executor_id=task.executor_id, @@ -342,18 +378,18 @@ def _taskvine_submit_wait(ready_task_queue=None, # only needed to add as file for tasks with 'regular' mode t.add_input(exec_parsl_function_file, "exec_parsl_function.py") - # Declare and add task-specific function, data, and result files to task - task_function_file = m.declare_file(task.function_file, cache=False, peer_transfer=False) - t.add_input(task_function_file, "function") + # Declare and add task-specific function, data, and result files to task + task_function_file = m.declare_file(task.function_file, cache=False, peer_transfer=False) + t.add_input(task_function_file, "function") - task_argument_file = m.declare_file(task.argument_file, cache=False, peer_transfer=False) - t.add_input(task_argument_file, "argument") + task_argument_file = m.declare_file(task.argument_file, cache=False, peer_transfer=False) + t.add_input(task_argument_file, "argument") - task_map_file = m.declare_file(task.map_file, cache=False, peer_transfer=False) - t.add_input(task_map_file, "map") + task_map_file = m.declare_file(task.map_file, cache=False, peer_transfer=False) + t.add_input(task_map_file, "map") - task_result_file = m.declare_file(task.result_file, cache=False, peer_transfer=False) - t.add_output(task_result_file, "result") + task_result_file = m.declare_file(task.result_file, cache=False, peer_transfer=False) + t.add_output(task_result_file, "result") result_file_of_task_id[str(task.executor_id)] = task.result_file @@ -403,13 +439,14 @@ def _taskvine_submit_wait(ready_task_queue=None, # If the queue is not empty wait on the TaskVine queue for a task task_found = True while not m.empty() and task_found and not should_stop.is_set(): - # Obtain the task from the queue + # Obtain a task from the queue t = m.wait(1) if t is None: task_found = False continue logger.debug('Found a task') executor_task_id = vine_id_to_executor_task_id[str(t.id)][0] + exec_mode = vine_id_to_executor_task_id[str(t.id)][1] vine_id_to_executor_task_id.pop(str(t.id)) # When a task is found @@ -417,12 +454,22 @@ def _taskvine_submit_wait(ready_task_queue=None, logger.debug(f"completed executor task info: {executor_task_id}, {t.category}, {t.command}, {t.std_output}") + # This serializes the result to a file again. + # Also a double serialization, mark as TODO. + is_serverless_output_ok = True + if exec_mode == "serverless": + try: + if t.successful(): + _serialize_object_to_file(result_file, t.output) + except Exception: + is_serverless_output_ok = False + # A tasks completes 'succesfully' if it has result file. # A check whether the Python object represented using this file can be # deserialized happens later in the collector thread of the executor # process. logger.debug("Looking for result in {}".format(result_file)) - if os.path.exists(result_file): + if os.path.exists(result_file) and is_serverless_output_ok: logger.debug("Found result in {}".format(result_file)) finished_task_queue.put_nowait(VineTaskToParsl(executor_id=executor_task_id, result_received=True, diff --git a/parsl/executors/taskvine/utils.py b/parsl/executors/taskvine/utils.py index 86cf446b1a..5621c98659 100644 --- a/parsl/executors/taskvine/utils.py +++ b/parsl/executors/taskvine/utils.py @@ -6,6 +6,7 @@ class ParslTaskToVine: def __init__(self, executor_id: int, # executor id of Parsl function + func_name: str, # name of the Parsl function exec_mode: str, # execution mode of function, out of {regular, python, serverless} category: str, # category of Parsl function input_files: list, # list of input files to this function @@ -14,6 +15,7 @@ def __init__(self, function_file: Optional[str], # pickled file containing the function information argument_file: Optional[str], # pickled file containing the arguments to the function call result_file: Optional[str], # path to the pickled result object of the function execution + function_context_file: Optional[list], # path to the pickled list of function context details for serverless functions cores: Optional[float], # number of cores to allocate memory: Optional[int], # amount of memory in MBs to allocate disk: Optional[int], # amount of disk in MBs to allocate @@ -23,6 +25,7 @@ def __init__(self, env_pkg: Optional[str], # path to a poncho environment tarball ): self.executor_id = executor_id + self.func_name = func_name self.exec_mode = exec_mode self.category = category self.map_file = map_file @@ -31,6 +34,7 @@ def __init__(self, self.result_file = result_file self.input_files = input_files self.output_files = output_files + self.function_context_file = function_context_file self.cores = cores self.memory = memory self.disk = disk @@ -83,3 +87,8 @@ def run_parsl_function(map_file, function_file, argument_file, result_file): """ from parsl.executors.taskvine.exec_parsl_function import run run(map_file, function_file, argument_file, result_file) + + +def load_variable_in_serverless(var_name): + from ndcctools.taskvine.utils import load_variable_from_library + return load_variable_from_library(var_name)