From b24415862c7dcab63b004daaf1e6bec715dcfc15 Mon Sep 17 00:00:00 2001 From: Thanh Son Phung Date: Tue, 10 Dec 2024 10:20:47 -0500 Subject: [PATCH 01/15] first draft with double serialization --- parsl/dataflow/dflow.py | 7 +++ parsl/executors/taskvine/executor.py | 23 ++++++++ parsl/executors/taskvine/manager.py | 88 ++++++++++++++++++++-------- parsl/executors/taskvine/utils.py | 1 + 4 files changed, 94 insertions(+), 25 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 6cec168b5d..f288609fb7 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -752,6 +752,13 @@ def launch_task(self, task_record: TaskRecord) -> Future: monitor_resources=executor.monitor_resources(), run_dir=self.run_dir) + # hint executors that this function will be monitored + if task_record['resource_specification']: + task_record['resource_specification'].update({'is_monitoring_enabled': True}) + else: + if task_record['resource_specification']: + task_record['resource_specification'].update({'is_monitoring_enabled': False}) + with self.submitter_lock: exec_fu = executor.submit(function, task_record['resource_specification'], *args, **kwargs) self.update_task_state(task_record, States.launched) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index a15a444d2c..569c239df7 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -172,6 +172,10 @@ def __init__(self, # Path to directory that holds all tasks' data and results. self._function_data_dir = "" + # Mapping of function names to function objects + # Helpful to detect inconsistencies in serverless functions + self._map_func_names_to_func_objs = {} + # Helper scripts to prepare package tarballs for Parsl apps self._package_analyze_script = shutil.which("poncho_package_analyze") self._package_create_script = shutil.which("poncho_package_create") @@ -318,11 +322,29 @@ def submit(self, func, resource_specification, *args, **kwargs): Keyword arguments to the Parsl app """ + # a Parsl function must have a name + if func.__name__ is None: + raise ValueError('A Parsl function must have a name') + logger.debug(f'Got resource specification: {resource_specification}') + is_monitoring_enabled = resource_specification.get('is_monitoring_enabled', False) + # Default execution mode of apps is regular exec_mode = resource_specification.get('exec_mode', self.function_exec_mode) + if exec_mode == 'serverless': + if func.__name__ not in self._map_func_names_to_func_objs: + self._map_func_names_to_func_objs[func.__name__] = func + else: + if id(func) != id(self._map_func_names_to_func_objs[func.__name__]): + logger.warning('Inconsistency in a serverless function call detected. A function name cannot point to two different function objects. Falling back to executing it as a regular task.') + exec_mode = 'regular' + + if is_monitoring_enabled: + logger.warning("A serverless task cannot run with Parsl monitoring enabled. Falling back to execute this task as a regular task.") + exec_mode = 'regular' + # Detect resources and features of a submitted Parsl app cores = None memory = None @@ -419,6 +441,7 @@ def submit(self, func, resource_specification, *args, **kwargs): category = func.__name__ if self.manager_config.autocategory else 'parsl-default' task_info = ParslTaskToVine(executor_id=executor_task_id, + func_name=func.__name__, exec_mode=exec_mode, category=category, input_files=input_files, diff --git a/parsl/executors/taskvine/manager.py b/parsl/executors/taskvine/manager.py index 94cfb2e391..9a065a6768 100644 --- a/parsl/executors/taskvine/manager.py +++ b/parsl/executors/taskvine/manager.py @@ -10,6 +10,7 @@ from parsl.executors.taskvine.utils import VineTaskToParsl, run_parsl_function from parsl.process_loggers import wrap_with_logs from parsl.utils import setproctitle +from parsl.serialize import deserialize, serialize try: from ndcctools.taskvine import FunctionCall, Manager, Task, cvine @@ -134,6 +135,20 @@ def _prepare_environment_regular(m, manager_config, t, task, poncho_env_to_file, t.add_environment(poncho_env_file) +def _deserialize_object_from_file(path): + """Deserialize an object from a given file path.""" + with open(path, 'rb') as f_in: + obj = deserialize(f_in.read()) + return obj + +def _serialize_object_to_file(path, obj): + """Serialize an object to the given file path.""" + serialized_obj = serialize(obj) + with open(path, 'wb') as f_out: + written = 0 + while written < len(serialized_obj): + written += f_out.write(serialized_obj[written:]) + @wrap_with_logs def _taskvine_submit_wait(ready_task_queue=None, finished_task_queue=None, @@ -194,8 +209,9 @@ def _taskvine_submit_wait(ready_task_queue=None, # Declare helper script as cache-able and peer-transferable exec_parsl_function_file = m.declare_file(exec_parsl_function.__file__, cache=True, peer_transfer=True) - # Flag to make sure library for serverless tasks is declared and installed only once. - lib_installed = False + # Mapping from function names to library names + # This assumes that there's 1 library per function and vice versa (1-to-1 correspondence). + libs_installed = {} # Create cache dir for environment files env_cache_dir = os.path.join(manager_config.vine_log_dir, 'vine-cache', 'vine-poncho-env-cache') @@ -219,6 +235,7 @@ def _taskvine_submit_wait(ready_task_queue=None, except queue.Empty: logger.debug("Queue is empty") continue + if task.exec_mode == 'regular': # Create command string launch_cmd = "python3 exec_parsl_function.py {mapping} {function} {argument} {result}" @@ -241,20 +258,26 @@ def _taskvine_submit_wait(ready_task_queue=None, status=-1)) continue elif task.exec_mode == 'serverless': - if not lib_installed: - # Declare and install common library for serverless tasks. + if task.func_name not in libs_installed: + # Declare and install one library for serverless tasks per category, and vice versa. # Library requires an environment setup properly, which is # different from setup of regular tasks. # If shared_fs is True, then no environment preparation is done. # Only the core serverless code is created. poncho_env_path = _prepare_environment_serverless(manager_config, env_cache_dir, poncho_create_script) + # Deserialize the function to add it to the library + # This cost is paid only once per function/app. + func = _deserialize_object_from_file(task.function_file) + # Don't automatically add environment so manager can declare and cache the vine file associated with the environment file add_env = False - serverless_lib = m.create_library_from_functions('common-parsl-taskvine-lib', - run_parsl_function, + lib_name = f'{task.func_name}-lib' + serverless_lib = m.create_library_from_functions(lib_name, + func, poncho_env=poncho_env_path, init_command=manager_config.init_command, + exec_mode='direct', add_env=add_env) # Configure the library if provided @@ -262,15 +285,15 @@ def _taskvine_submit_wait(ready_task_queue=None, lib_cores = manager_config.library_config.get('cores', None) lib_memory = manager_config.library_config.get('memory', None) lib_disk = manager_config.library_config.get('disk', None) - lib_slots = manager_config.library_config.get('num_slots', None) + # 1 function call per library at any given moment + lib_slots = manager_config.library_config.get('num_slots', 1) if lib_cores: serverless_lib.set_cores(lib_cores) if lib_memory: serverless_lib.set_memory(lib_memory) if lib_disk: serverless_lib.set_disk(lib_disk) - if lib_slots: - serverless_lib.set_function_slots(lib_slots) + serverless_lib.set_function_slots(lib_slots) if poncho_env_path: serverless_lib_env_file = m.declare_poncho(poncho_env_path, cache=True, peer_transfer=True) @@ -281,12 +304,15 @@ def _taskvine_submit_wait(ready_task_queue=None, logger.debug('Created minimal library task with no environment.') m.install_library(serverless_lib) - lib_installed = True + libs_installed[task.func_name] = lib_name try: - # run_parsl_function only needs remote names of map_file, function_file, argument_file, - # and result_file, which are simply named map, function, argument, result. - # These names are given when these files are declared below. - t = FunctionCall('common-parsl-taskvine-lib', run_parsl_function.__name__, 'map', 'function', 'argument', 'result') + # deserialize the arguments to the function call so they can be serialized again in TaskVine way. + # this double serialization hurts the performance of the system, but there's no way around it at the moment. Mark as TODO for future work. + all_args = _deserialize_object_from_file(task.argument_file) + args = all_args['args'] + kwargs = all_args['kwargs'] + + t = FunctionCall(libs_installed[task.func_name], task.func_name, *args, **kwargs) except Exception as e: logger.error("Unable to create executor task (mode:serverless): {}".format(e)) finished_task_queue.put_nowait(VineTaskToParsl(executor_id=task.executor_id, @@ -342,18 +368,18 @@ def _taskvine_submit_wait(ready_task_queue=None, # only needed to add as file for tasks with 'regular' mode t.add_input(exec_parsl_function_file, "exec_parsl_function.py") - # Declare and add task-specific function, data, and result files to task - task_function_file = m.declare_file(task.function_file, cache=False, peer_transfer=False) - t.add_input(task_function_file, "function") + # Declare and add task-specific function, data, and result files to task + task_function_file = m.declare_file(task.function_file, cache=False, peer_transfer=False) + t.add_input(task_function_file, "function") - task_argument_file = m.declare_file(task.argument_file, cache=False, peer_transfer=False) - t.add_input(task_argument_file, "argument") + task_argument_file = m.declare_file(task.argument_file, cache=False, peer_transfer=False) + t.add_input(task_argument_file, "argument") - task_map_file = m.declare_file(task.map_file, cache=False, peer_transfer=False) - t.add_input(task_map_file, "map") + task_map_file = m.declare_file(task.map_file, cache=False, peer_transfer=False) + t.add_input(task_map_file, "map") - task_result_file = m.declare_file(task.result_file, cache=False, peer_transfer=False) - t.add_output(task_result_file, "result") + task_result_file = m.declare_file(task.result_file, cache=False, peer_transfer=False) + t.add_output(task_result_file, "result") result_file_of_task_id[str(task.executor_id)] = task.result_file @@ -403,13 +429,14 @@ def _taskvine_submit_wait(ready_task_queue=None, # If the queue is not empty wait on the TaskVine queue for a task task_found = True while not m.empty() and task_found and not should_stop.is_set(): - # Obtain the task from the queue + # Obtain a task from the queue t = m.wait(1) if t is None: task_found = False continue logger.debug('Found a task') executor_task_id = vine_id_to_executor_task_id[str(t.id)][0] + exec_mode = vine_id_to_executor_task_id[str(t.id)][1] vine_id_to_executor_task_id.pop(str(t.id)) # When a task is found @@ -417,12 +444,23 @@ def _taskvine_submit_wait(ready_task_queue=None, logger.debug(f"completed executor task info: {executor_task_id}, {t.category}, {t.command}, {t.std_output}") + # This serializes the result to a file again. + # Also a double serialization, mark as TODO. + is_serverless_output_ok = True + if exec_mode == "serverless": + try: + if t.successful(): + _serialize_object_to_file(result_file, t.output) + except: + is_serverless_output_ok = False + + # A tasks completes 'succesfully' if it has result file. # A check whether the Python object represented using this file can be # deserialized happens later in the collector thread of the executor # process. logger.debug("Looking for result in {}".format(result_file)) - if os.path.exists(result_file): + if os.path.exists(result_file) and is_serverless_output_ok: logger.debug("Found result in {}".format(result_file)) finished_task_queue.put_nowait(VineTaskToParsl(executor_id=executor_task_id, result_received=True, diff --git a/parsl/executors/taskvine/utils.py b/parsl/executors/taskvine/utils.py index 86cf446b1a..33feaf1317 100644 --- a/parsl/executors/taskvine/utils.py +++ b/parsl/executors/taskvine/utils.py @@ -6,6 +6,7 @@ class ParslTaskToVine: def __init__(self, executor_id: int, # executor id of Parsl function + func_name: str, # name of the Parsl function exec_mode: str, # execution mode of function, out of {regular, python, serverless} category: str, # category of Parsl function input_files: list, # list of input files to this function From 3a21102e268a8e0216efb61029dcf25f6cddb316 Mon Sep 17 00:00:00 2001 From: Thanh Son Phung Date: Tue, 10 Dec 2024 10:52:42 -0500 Subject: [PATCH 02/15] fix bug --- parsl/executors/taskvine/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/parsl/executors/taskvine/utils.py b/parsl/executors/taskvine/utils.py index 33feaf1317..b30ff1439c 100644 --- a/parsl/executors/taskvine/utils.py +++ b/parsl/executors/taskvine/utils.py @@ -24,6 +24,7 @@ def __init__(self, env_pkg: Optional[str], # path to a poncho environment tarball ): self.executor_id = executor_id + self.func_name = func_name self.exec_mode = exec_mode self.category = category self.map_file = map_file From 85f18e90af41c310be314ad11ddf8830a977fad3 Mon Sep 17 00:00:00 2001 From: Thanh Son Phung Date: Fri, 13 Dec 2024 13:19:58 -0500 Subject: [PATCH 03/15] deduplicate fn serialization --- parsl/executors/taskvine/executor.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 569c239df7..657a185fa8 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -176,6 +176,10 @@ def __init__(self, # Helpful to detect inconsistencies in serverless functions self._map_func_names_to_func_objs = {} + # Mapping of function names to file containing functions' serialization + # Helpful to deduplicate the same function + self._map_func_names_to_serialized_func_file = {} + # Helper scripts to prepare package tarballs for Parsl apps self._package_analyze_script = shutil.which("poncho_package_analyze") self._package_create_script = shutil.which("poncho_package_create") @@ -411,7 +415,10 @@ def submit(self, func, resource_specification, *args, **kwargs): # Get path to files that will contain the pickled function, # arguments, result, and map of input and output files - function_file = self._path_in_task(executor_task_id, "function") + if exec_mode == 'serverless': + if func.__name__ not in self._map_func_names_to_serialized_func_file: + function_file = self._path_in_task(executor_task_id, "function") + self._map_func_names_to_serialized_func_file argument_file = self._path_in_task(executor_task_id, "argument") result_file = self._path_in_task(executor_task_id, "result") map_file = self._path_in_task(executor_task_id, "map") From f0ccae0d08df693cf0f63b99913b90dbb1827af6 Mon Sep 17 00:00:00 2001 From: Thanh Son Phung Date: Fri, 13 Dec 2024 13:21:47 -0500 Subject: [PATCH 04/15] finish serialization deduplication --- parsl/executors/taskvine/executor.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 657a185fa8..0f528480cf 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -417,8 +417,10 @@ def submit(self, func, resource_specification, *args, **kwargs): # arguments, result, and map of input and output files if exec_mode == 'serverless': if func.__name__ not in self._map_func_names_to_serialized_func_file: - function_file = self._path_in_task(executor_task_id, "function") - self._map_func_names_to_serialized_func_file + function_file = self._path_in_task(func.__name__, "function") + self._map_func_names_to_serialized_func_file[func.__name__] = function_file + else: + function_file = self._map_func_names_to_serialized_func_file[func.__name__] argument_file = self._path_in_task(executor_task_id, "argument") result_file = self._path_in_task(executor_task_id, "result") map_file = self._path_in_task(executor_task_id, "map") From 1cde21efb9257efc048aa2628c49a30da5289571 Mon Sep 17 00:00:00 2001 From: Thanh Son Phung Date: Fri, 13 Dec 2024 13:53:48 -0500 Subject: [PATCH 05/15] fix bug dedup double serial --- parsl/executors/taskvine/executor.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 0f528480cf..33a1ba1763 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -417,10 +417,13 @@ def submit(self, func, resource_specification, *args, **kwargs): # arguments, result, and map of input and output files if exec_mode == 'serverless': if func.__name__ not in self._map_func_names_to_serialized_func_file: - function_file = self._path_in_task(func.__name__, "function") - self._map_func_names_to_serialized_func_file[func.__name__] = function_file + function_file = os.path.join(self._function_data_dir.name, func.__name__, 'function') + self._map_func_names_to_serialized_func_file[func.__name__] = {'function_file': function_file, 'is_serialized': False} + os.makedirs(os.path.join(self._function_data_dir.name, func.__name__)) else: - function_file = self._map_func_names_to_serialized_func_file[func.__name__] + function_file = self._map_func_names_to_serialized_func_file[func.__name__]['function_file'] + else: + function_file = self._path_in_task(executor_task_id, "function") argument_file = self._path_in_task(executor_task_id, "argument") result_file = self._path_in_task(executor_task_id, "result") map_file = self._path_in_task(executor_task_id, "map") @@ -429,7 +432,10 @@ def submit(self, func, resource_specification, *args, **kwargs): and result to be found at: {}".format(executor_task_id, function_file, argument_file, result_file)) # Serialize function object and arguments, separately - self._serialize_object_to_file(function_file, func) + if exec_mode == 'regular' or not self._map_func_names_to_serialized_func_file[func.__name__]['is_serialized']: + self._serialize_object_to_file(function_file, func) + if exec_mode == 'serverless': + self._map_func_names_to_serialized_func_file[func.__name__]['is_serialized'] = True args_dict = {'args': args, 'kwargs': kwargs} self._serialize_object_to_file(argument_file, args_dict) From 1f7ca1627b232edd492e449495ccd2a00b8d2cea Mon Sep 17 00:00:00 2001 From: Thanh Son Phung Date: Fri, 13 Dec 2024 14:00:50 -0500 Subject: [PATCH 06/15] add option for non-tmp staging dir --- parsl/executors/taskvine/executor.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 33a1ba1763..aca78f8ec9 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -81,8 +81,12 @@ class TaskVineExecutor(BlockProviderExecutor, putils.RepresentationMixin): pre-warmed forked python process. Default is 'regular'. + use_tmp_dir_for_staging: bool + Whether to use tmp dir for staging functions, arguments, and results. + Default is True. + manager_config: TaskVineManagerConfig - Configuration for the TaskVine manager. Default + Configuration for the TaskVine manager. factory_config: TaskVineFactoryConfig Configuration for the TaskVine factory. @@ -104,6 +108,7 @@ def __init__(self, label: str = "TaskVineExecutor", worker_launch_method: Union[Literal['provider'], Literal['factory'], Literal['manual']] = 'factory', function_exec_mode: Union[Literal['regular'], Literal['serverless']] = 'regular', + use_tmp_dir_for_staging: bool = True, manager_config: TaskVineManagerConfig = TaskVineManagerConfig(), factory_config: TaskVineFactoryConfig = TaskVineFactoryConfig(), provider: Optional[ExecutionProvider] = LocalProvider(init_blocks=1), @@ -129,6 +134,7 @@ def __init__(self, self.label = label self.worker_launch_method = worker_launch_method self.function_exec_mode = function_exec_mode + self.use_tmp_dir_for_staging = use_tmp_dir_for_staging self.manager_config = manager_config self.factory_config = factory_config self.storage_access = storage_access @@ -226,8 +232,12 @@ def __create_data_and_logging_dirs(self): # Create directories for data and results log_dir = os.path.join(run_dir, self.label) os.makedirs(log_dir) - tmp_prefix = f'{self.label}-{getpass.getuser()}-{datetime.now().strftime("%Y%m%d%H%M%S%f")}-' - self._function_data_dir = tempfile.TemporaryDirectory(prefix=tmp_prefix) + + if self.use_tmp_dir_for_staging: + tmp_prefix = f'{self.label}-{getpass.getuser()}-{datetime.now().strftime("%Y%m%d%H%M%S%f")}-' + self._function_data_dir = tempfile.TemporaryDirectory(prefix=tmp_prefix) + else: + self._function_data_dir = os.path.join(log_dir, 'function') # put TaskVine logs outside of a Parsl run as TaskVine caches between runs while # Parsl does not. From 71600853dc4ff97afd1e7f1f6a527c9a5e4bca04 Mon Sep 17 00:00:00 2001 From: Thanh Son Phung Date: Sat, 14 Dec 2024 15:58:46 -0500 Subject: [PATCH 07/15] context feature added --- parsl/executors/taskvine/executor.py | 15 +++++++++++++++ parsl/executors/taskvine/manager.py | 9 ++++++++- parsl/executors/taskvine/utils.py | 7 +++++++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index aca78f8ec9..9a996cc762 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -422,6 +422,7 @@ def submit(self, func, resource_specification, *args, **kwargs): argument_file = None result_file = None map_file = None + function_context_file = None # Get path to files that will contain the pickled function, # arguments, result, and map of input and output files @@ -438,6 +439,19 @@ def submit(self, func, resource_specification, *args, **kwargs): result_file = self._path_in_task(executor_task_id, "result") map_file = self._path_in_task(executor_task_id, "map") + if exec_mode == 'serverless': + if 'function_context' in resource_specification: + if 'function_context_file' not in self._map_func_names_to_serialized_func_file[func.__name__]: + function_context = resource_specification.get('function_context') + function_context_args = resource_specification.get('function_context_args', []) + function_context_kwargs = resource_specification.get('function_context_kwargs', {}) + function_context_file = os.path.join(self._function_data_dir.name, func.__name__, 'function_context') + self._serialize_object_to_file(function_context_file, [function_context, function_context_args, function_context_kwargs]) + self._map_func_names_to_serialized_func_file[func.__name__]['function_context_file'] = function_context_file + else: + function_context_file = self._map_func_names_to_serialized_func_file[func.__name__]['function_context_file'] + + logger.debug("Creating executor task {} with function at: {}, argument at: {}, \ and result to be found at: {}".format(executor_task_id, function_file, argument_file, result_file)) @@ -475,6 +489,7 @@ def submit(self, func, resource_specification, *args, **kwargs): function_file=function_file, argument_file=argument_file, result_file=result_file, + function_context_file=function_context_file, cores=cores, memory=memory, disk=disk, diff --git a/parsl/executors/taskvine/manager.py b/parsl/executors/taskvine/manager.py index 9a065a6768..63075a282f 100644 --- a/parsl/executors/taskvine/manager.py +++ b/parsl/executors/taskvine/manager.py @@ -270,6 +270,12 @@ def _taskvine_submit_wait(ready_task_queue=None, # This cost is paid only once per function/app. func = _deserialize_object_from_file(task.function_file) + # Deserialize the function context to add it to the library if available + # This cost is paid only once per function/app. + function_context_list = None + if task.function_context_file: + function_context_list = _deserialize_object_from_file(task.function_context_file) + # Don't automatically add environment so manager can declare and cache the vine file associated with the environment file add_env = False lib_name = f'{task.func_name}-lib' @@ -278,7 +284,8 @@ def _taskvine_submit_wait(ready_task_queue=None, poncho_env=poncho_env_path, init_command=manager_config.init_command, exec_mode='direct', - add_env=add_env) + add_env=add_env, + library_context_info=function_context_list) # Configure the library if provided if manager_config.library_config: diff --git a/parsl/executors/taskvine/utils.py b/parsl/executors/taskvine/utils.py index b30ff1439c..8ac61fe0f9 100644 --- a/parsl/executors/taskvine/utils.py +++ b/parsl/executors/taskvine/utils.py @@ -15,6 +15,7 @@ def __init__(self, function_file: Optional[str], # pickled file containing the function information argument_file: Optional[str], # pickled file containing the arguments to the function call result_file: Optional[str], # path to the pickled result object of the function execution + function_context_file: Optional[list], # path to the pickled list of function context details for serverless functions cores: Optional[float], # number of cores to allocate memory: Optional[int], # amount of memory in MBs to allocate disk: Optional[int], # amount of disk in MBs to allocate @@ -33,6 +34,7 @@ def __init__(self, self.result_file = result_file self.input_files = input_files self.output_files = output_files + self.function_context_file = function_context_file self.cores = cores self.memory = memory self.disk = disk @@ -85,3 +87,8 @@ def run_parsl_function(map_file, function_file, argument_file, result_file): """ from parsl.executors.taskvine.exec_parsl_function import run run(map_file, function_file, argument_file, result_file) + + +def load_variable_in_serverless(var_name): + from ndcctools.taskvine.utils import load_variable_from_library + return load_variable_from_library(var_name) From 816aca421d221930b736c6c2f211d17215292d0f Mon Sep 17 00:00:00 2001 From: Thanh Son Phung Date: Mon, 16 Dec 2024 14:36:32 -0500 Subject: [PATCH 08/15] add _ to hidden variable --- parsl/dataflow/dflow.py | 4 ++-- parsl/executors/taskvine/executor.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index f288609fb7..cc3731fae0 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -754,10 +754,10 @@ def launch_task(self, task_record: TaskRecord) -> Future: # hint executors that this function will be monitored if task_record['resource_specification']: - task_record['resource_specification'].update({'is_monitoring_enabled': True}) + task_record['resource_specification'].update({'_is_monitoring_enabled': True}) else: if task_record['resource_specification']: - task_record['resource_specification'].update({'is_monitoring_enabled': False}) + task_record['resource_specification'].update({'_is_monitoring_enabled': False}) with self.submitter_lock: exec_fu = executor.submit(function, task_record['resource_specification'], *args, **kwargs) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 9a996cc762..2bafa3c5a9 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -342,7 +342,7 @@ def submit(self, func, resource_specification, *args, **kwargs): logger.debug(f'Got resource specification: {resource_specification}') - is_monitoring_enabled = resource_specification.get('is_monitoring_enabled', False) + is_monitoring_enabled = resource_specification.get('_is_monitoring_enabled', False) # Default execution mode of apps is regular exec_mode = resource_specification.get('exec_mode', self.function_exec_mode) From aa58a71bb0bb8063f45f50a17c6b3791389c06aa Mon Sep 17 00:00:00 2001 From: Thanh Son Phung Date: Mon, 16 Dec 2024 14:56:00 -0500 Subject: [PATCH 09/15] use 1 mapping only --- parsl/executors/taskvine/executor.py | 32 +++++++++++++--------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 2bafa3c5a9..17d5086ee7 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -178,14 +178,12 @@ def __init__(self, # Path to directory that holds all tasks' data and results. self._function_data_dir = "" - # Mapping of function names to function objects + # Mapping of function names to function details + # Currently the values include function objects, path to serialized functions, path to serialized function contexts, and whether functions are serialized # Helpful to detect inconsistencies in serverless functions - self._map_func_names_to_func_objs = {} - - # Mapping of function names to file containing functions' serialization # Helpful to deduplicate the same function - self._map_func_names_to_serialized_func_file = {} - + self._map_func_names_to_func_details = {} + # Helper scripts to prepare package tarballs for Parsl apps self._package_analyze_script = shutil.which("poncho_package_analyze") self._package_create_script = shutil.which("poncho_package_create") @@ -348,10 +346,10 @@ def submit(self, func, resource_specification, *args, **kwargs): exec_mode = resource_specification.get('exec_mode', self.function_exec_mode) if exec_mode == 'serverless': - if func.__name__ not in self._map_func_names_to_func_objs: - self._map_func_names_to_func_objs[func.__name__] = func + if func.__name__ not in self._map_func_names_to_func_details or 'func_obj' not in self._map_func_names_to_func_details[func.__name__]: + self._map_func_names_to_func_details[func.__name__] = {'func_obj': func} else: - if id(func) != id(self._map_func_names_to_func_objs[func.__name__]): + if id(func) != id(self._map_func_names_to_func_details[func.__name__]['func_obj']): logger.warning('Inconsistency in a serverless function call detected. A function name cannot point to two different function objects. Falling back to executing it as a regular task.') exec_mode = 'regular' @@ -427,12 +425,12 @@ def submit(self, func, resource_specification, *args, **kwargs): # Get path to files that will contain the pickled function, # arguments, result, and map of input and output files if exec_mode == 'serverless': - if func.__name__ not in self._map_func_names_to_serialized_func_file: + if func.__name__ not in self._map_func_names_to_func_details or 'function_file' not in self._map_func_names_to_func_details[func.__name__]: function_file = os.path.join(self._function_data_dir.name, func.__name__, 'function') - self._map_func_names_to_serialized_func_file[func.__name__] = {'function_file': function_file, 'is_serialized': False} + self._map_func_names_to_func_details[func.__name__] = {'function_file': function_file, 'is_serialized': False} os.makedirs(os.path.join(self._function_data_dir.name, func.__name__)) else: - function_file = self._map_func_names_to_serialized_func_file[func.__name__]['function_file'] + function_file = self._map_func_names_to_func_details[func.__name__]['function_file'] else: function_file = self._path_in_task(executor_task_id, "function") argument_file = self._path_in_task(executor_task_id, "argument") @@ -441,25 +439,25 @@ def submit(self, func, resource_specification, *args, **kwargs): if exec_mode == 'serverless': if 'function_context' in resource_specification: - if 'function_context_file' not in self._map_func_names_to_serialized_func_file[func.__name__]: + if 'function_context_file' not in self._map_func_names_to_func_details[func.__name__]: function_context = resource_specification.get('function_context') function_context_args = resource_specification.get('function_context_args', []) function_context_kwargs = resource_specification.get('function_context_kwargs', {}) function_context_file = os.path.join(self._function_data_dir.name, func.__name__, 'function_context') self._serialize_object_to_file(function_context_file, [function_context, function_context_args, function_context_kwargs]) - self._map_func_names_to_serialized_func_file[func.__name__]['function_context_file'] = function_context_file + self._map_func_names_to_func_details[func.__name__]['function_context_file'] = function_context_file else: - function_context_file = self._map_func_names_to_serialized_func_file[func.__name__]['function_context_file'] + function_context_file = self._map_func_names_to_func_details[func.__name__]['function_context_file'] logger.debug("Creating executor task {} with function at: {}, argument at: {}, \ and result to be found at: {}".format(executor_task_id, function_file, argument_file, result_file)) # Serialize function object and arguments, separately - if exec_mode == 'regular' or not self._map_func_names_to_serialized_func_file[func.__name__]['is_serialized']: + if exec_mode == 'regular' or not self._map_func_names_to_func_details[func.__name__]['is_serialized']: self._serialize_object_to_file(function_file, func) if exec_mode == 'serverless': - self._map_func_names_to_serialized_func_file[func.__name__]['is_serialized'] = True + self._map_func_names_to_func_details[func.__name__]['is_serialized'] = True args_dict = {'args': args, 'kwargs': kwargs} self._serialize_object_to_file(argument_file, args_dict) From 6057360d3b3c85c1707bfaa5ce9c4c09ef26368c Mon Sep 17 00:00:00 2001 From: Thanh Son Phung Date: Mon, 16 Dec 2024 14:59:15 -0500 Subject: [PATCH 10/15] check monitoring first --- parsl/executors/taskvine/executor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 17d5086ee7..97fb095b39 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -345,6 +345,10 @@ def submit(self, func, resource_specification, *args, **kwargs): # Default execution mode of apps is regular exec_mode = resource_specification.get('exec_mode', self.function_exec_mode) + if exec_mode == 'serverless' and is_monitoring_enabled: + logger.warning("A serverless task cannot run with Parsl monitoring enabled. Falling back to execute this task as a regular task.") + exec_mode = 'regular' + if exec_mode == 'serverless': if func.__name__ not in self._map_func_names_to_func_details or 'func_obj' not in self._map_func_names_to_func_details[func.__name__]: self._map_func_names_to_func_details[func.__name__] = {'func_obj': func} @@ -353,10 +357,6 @@ def submit(self, func, resource_specification, *args, **kwargs): logger.warning('Inconsistency in a serverless function call detected. A function name cannot point to two different function objects. Falling back to executing it as a regular task.') exec_mode = 'regular' - if is_monitoring_enabled: - logger.warning("A serverless task cannot run with Parsl monitoring enabled. Falling back to execute this task as a regular task.") - exec_mode = 'regular' - # Detect resources and features of a submitted Parsl app cores = None memory = None From c9547d35bd5682c5407f4b3e0870eab32a42f379 Mon Sep 17 00:00:00 2001 From: Thanh Son Phung Date: Mon, 16 Dec 2024 15:19:15 -0500 Subject: [PATCH 11/15] fix lint issues --- parsl/executors/taskvine/executor.py | 22 +++++++++++++--------- parsl/executors/taskvine/manager.py | 2 +- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 97fb095b39..7c4460b6ee 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -178,12 +178,13 @@ def __init__(self, # Path to directory that holds all tasks' data and results. self._function_data_dir = "" - # Mapping of function names to function details - # Currently the values include function objects, path to serialized functions, path to serialized function contexts, and whether functions are serialized - # Helpful to detect inconsistencies in serverless functions - # Helpful to deduplicate the same function + # Mapping of function names to function details. + # Currently the values include function objects, path to serialized functions, + # path to serialized function contexts, and whether functions are serialized. + # Helpful to detect inconsistencies in serverless functions. + # Helpful to deduplicate the same function. self._map_func_names_to_func_details = {} - + # Helper scripts to prepare package tarballs for Parsl apps self._package_analyze_script = shutil.which("poncho_package_analyze") self._package_create_script = shutil.which("poncho_package_create") @@ -350,11 +351,14 @@ def submit(self, func, resource_specification, *args, **kwargs): exec_mode = 'regular' if exec_mode == 'serverless': - if func.__name__ not in self._map_func_names_to_func_details or 'func_obj' not in self._map_func_names_to_func_details[func.__name__]: + if func.__name__ not in self._map_func_names_to_func_details or\ + 'func_obj' not in self._map_func_names_to_func_details[func.__name__]: self._map_func_names_to_func_details[func.__name__] = {'func_obj': func} else: if id(func) != id(self._map_func_names_to_func_details[func.__name__]['func_obj']): - logger.warning('Inconsistency in a serverless function call detected. A function name cannot point to two different function objects. Falling back to executing it as a regular task.') + logger.warning('Inconsistency in a serverless function call detected.\ + A function name cannot point to two different function objects.\ + Falling back to executing it as a regular task.') exec_mode = 'regular' # Detect resources and features of a submitted Parsl app @@ -425,7 +429,8 @@ def submit(self, func, resource_specification, *args, **kwargs): # Get path to files that will contain the pickled function, # arguments, result, and map of input and output files if exec_mode == 'serverless': - if func.__name__ not in self._map_func_names_to_func_details or 'function_file' not in self._map_func_names_to_func_details[func.__name__]: + if func.__name__ not in self._map_func_names_to_func_details or\ + 'function_file' not in self._map_func_names_to_func_details[func.__name__]: function_file = os.path.join(self._function_data_dir.name, func.__name__, 'function') self._map_func_names_to_func_details[func.__name__] = {'function_file': function_file, 'is_serialized': False} os.makedirs(os.path.join(self._function_data_dir.name, func.__name__)) @@ -449,7 +454,6 @@ def submit(self, func, resource_specification, *args, **kwargs): else: function_context_file = self._map_func_names_to_func_details[func.__name__]['function_context_file'] - logger.debug("Creating executor task {} with function at: {}, argument at: {}, \ and result to be found at: {}".format(executor_task_id, function_file, argument_file, result_file)) diff --git a/parsl/executors/taskvine/manager.py b/parsl/executors/taskvine/manager.py index 63075a282f..7280309313 100644 --- a/parsl/executors/taskvine/manager.py +++ b/parsl/executors/taskvine/manager.py @@ -9,8 +9,8 @@ from parsl.executors.taskvine import exec_parsl_function from parsl.executors.taskvine.utils import VineTaskToParsl, run_parsl_function from parsl.process_loggers import wrap_with_logs -from parsl.utils import setproctitle from parsl.serialize import deserialize, serialize +from parsl.utils import setproctitle try: from ndcctools.taskvine import FunctionCall, Manager, Task, cvine From 360ce50d5b611e50046d64714c6dd78be5e67cb6 Mon Sep 17 00:00:00 2001 From: Thanh Son Phung Date: Mon, 16 Dec 2024 15:50:26 -0500 Subject: [PATCH 12/15] fix bug in mapping of function names in executor --- parsl/executors/taskvine/executor.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 7c4460b6ee..77559ed09c 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -351,8 +351,7 @@ def submit(self, func, resource_specification, *args, **kwargs): exec_mode = 'regular' if exec_mode == 'serverless': - if func.__name__ not in self._map_func_names_to_func_details or\ - 'func_obj' not in self._map_func_names_to_func_details[func.__name__]: + if func.__name__ not in self._map_func_names_to_func_details: self._map_func_names_to_func_details[func.__name__] = {'func_obj': func} else: if id(func) != id(self._map_func_names_to_func_details[func.__name__]['func_obj']): @@ -429,11 +428,10 @@ def submit(self, func, resource_specification, *args, **kwargs): # Get path to files that will contain the pickled function, # arguments, result, and map of input and output files if exec_mode == 'serverless': - if func.__name__ not in self._map_func_names_to_func_details or\ - 'function_file' not in self._map_func_names_to_func_details[func.__name__]: + if 'function_file' not in self._map_func_names_to_func_details[func.__name__]: function_file = os.path.join(self._function_data_dir.name, func.__name__, 'function') - self._map_func_names_to_func_details[func.__name__] = {'function_file': function_file, 'is_serialized': False} os.makedirs(os.path.join(self._function_data_dir.name, func.__name__)) + self._map_func_names_to_func_details[func.__name__].update({'function_file': function_file, 'is_serialized': False}) else: function_file = self._map_func_names_to_func_details[func.__name__]['function_file'] else: @@ -450,7 +448,7 @@ def submit(self, func, resource_specification, *args, **kwargs): function_context_kwargs = resource_specification.get('function_context_kwargs', {}) function_context_file = os.path.join(self._function_data_dir.name, func.__name__, 'function_context') self._serialize_object_to_file(function_context_file, [function_context, function_context_args, function_context_kwargs]) - self._map_func_names_to_func_details[func.__name__]['function_context_file'] = function_context_file + self._map_func_names_to_func_details[func.__name__].update({'function_context_file': function_context_file}) else: function_context_file = self._map_func_names_to_func_details[func.__name__]['function_context_file'] From eb0c670822c5705703d881fb6b6612c919008352 Mon Sep 17 00:00:00 2001 From: Thanh Son Phung Date: Mon, 16 Dec 2024 15:52:29 -0500 Subject: [PATCH 13/15] fix flake8 --- parsl/executors/taskvine/manager.py | 10 ++++++---- parsl/executors/taskvine/utils.py | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/parsl/executors/taskvine/manager.py b/parsl/executors/taskvine/manager.py index 7280309313..c4c7e64381 100644 --- a/parsl/executors/taskvine/manager.py +++ b/parsl/executors/taskvine/manager.py @@ -7,7 +7,7 @@ import uuid from parsl.executors.taskvine import exec_parsl_function -from parsl.executors.taskvine.utils import VineTaskToParsl, run_parsl_function +from parsl.executors.taskvine.utils import VineTaskToParsl from parsl.process_loggers import wrap_with_logs from parsl.serialize import deserialize, serialize from parsl.utils import setproctitle @@ -141,6 +141,7 @@ def _deserialize_object_from_file(path): obj = deserialize(f_in.read()) return obj + def _serialize_object_to_file(path, obj): """Serialize an object to the given file path.""" serialized_obj = serialize(obj) @@ -149,6 +150,7 @@ def _serialize_object_to_file(path, obj): while written < len(serialized_obj): written += f_out.write(serialized_obj[written:]) + @wrap_with_logs def _taskvine_submit_wait(ready_task_queue=None, finished_task_queue=None, @@ -314,7 +316,8 @@ def _taskvine_submit_wait(ready_task_queue=None, libs_installed[task.func_name] = lib_name try: # deserialize the arguments to the function call so they can be serialized again in TaskVine way. - # this double serialization hurts the performance of the system, but there's no way around it at the moment. Mark as TODO for future work. + # this double serialization hurts the performance of the system, + # but there's no way around it at the moment. Mark as TODO for future work. all_args = _deserialize_object_from_file(task.argument_file) args = all_args['args'] kwargs = all_args['kwargs'] @@ -458,10 +461,9 @@ def _taskvine_submit_wait(ready_task_queue=None, try: if t.successful(): _serialize_object_to_file(result_file, t.output) - except: + except Exception: is_serverless_output_ok = False - # A tasks completes 'succesfully' if it has result file. # A check whether the Python object represented using this file can be # deserialized happens later in the collector thread of the executor diff --git a/parsl/executors/taskvine/utils.py b/parsl/executors/taskvine/utils.py index 8ac61fe0f9..5621c98659 100644 --- a/parsl/executors/taskvine/utils.py +++ b/parsl/executors/taskvine/utils.py @@ -15,7 +15,7 @@ def __init__(self, function_file: Optional[str], # pickled file containing the function information argument_file: Optional[str], # pickled file containing the arguments to the function call result_file: Optional[str], # path to the pickled result object of the function execution - function_context_file: Optional[list], # path to the pickled list of function context details for serverless functions + function_context_file: Optional[list], # path to the pickled list of function context details for serverless functions cores: Optional[float], # number of cores to allocate memory: Optional[int], # amount of memory in MBs to allocate disk: Optional[int], # amount of disk in MBs to allocate From ce2eced543e7f16b359981e9af448e84ac3a0825 Mon Sep 17 00:00:00 2001 From: Thanh Son Phung Date: Mon, 16 Dec 2024 15:59:11 -0500 Subject: [PATCH 14/15] add annotation --- parsl/executors/taskvine/executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 77559ed09c..572729a6c8 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -20,7 +20,7 @@ import uuid from concurrent.futures import Future from datetime import datetime -from typing import List, Literal, Optional, Union +from typing import Dict, List, Literal, Optional, Union # Import other libraries import typeguard @@ -183,7 +183,7 @@ def __init__(self, # path to serialized function contexts, and whether functions are serialized. # Helpful to detect inconsistencies in serverless functions. # Helpful to deduplicate the same function. - self._map_func_names_to_func_details = {} + self._map_func_names_to_func_details: Dict[str, Dict] = {} # Helper scripts to prepare package tarballs for Parsl apps self._package_analyze_script = shutil.which("poncho_package_analyze") From 37e8e18b5a6a02bf3c263dd2d3426bc66373206e Mon Sep 17 00:00:00 2001 From: Thanh Son Phung Date: Mon, 16 Dec 2024 16:36:35 -0500 Subject: [PATCH 15/15] new way to detect monitoring code --- parsl/dataflow/dflow.py | 7 ------- parsl/executors/taskvine/executor.py | 6 +++++- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index cc3731fae0..6cec168b5d 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -752,13 +752,6 @@ def launch_task(self, task_record: TaskRecord) -> Future: monitor_resources=executor.monitor_resources(), run_dir=self.run_dir) - # hint executors that this function will be monitored - if task_record['resource_specification']: - task_record['resource_specification'].update({'_is_monitoring_enabled': True}) - else: - if task_record['resource_specification']: - task_record['resource_specification'].update({'_is_monitoring_enabled': False}) - with self.submitter_lock: exec_fu = executor.submit(function, task_record['resource_specification'], *args, **kwargs) self.update_task_state(task_record, States.launched) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 572729a6c8..d7a7bfe2ad 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -341,11 +341,15 @@ def submit(self, func, resource_specification, *args, **kwargs): logger.debug(f'Got resource specification: {resource_specification}') - is_monitoring_enabled = resource_specification.get('_is_monitoring_enabled', False) + # If `_parsl_monitoring_task_id` is in kwargs, Parsl monitoring code is enabled. + is_monitoring_enabled = '_parsl_monitoring_task_id' in kwargs # Default execution mode of apps is regular exec_mode = resource_specification.get('exec_mode', self.function_exec_mode) + # Fall back to regular execution if a function is Parsl-monitored as a monitored function is invocation-specific. + # Note that it is possible to get the wrapped function by calling the `__wrapped__` attribute when monitoring is enabled. + # It will disable the monitoring wrapper code however. if exec_mode == 'serverless' and is_monitoring_enabled: logger.warning("A serverless task cannot run with Parsl monitoring enabled. Falling back to execute this task as a regular task.") exec_mode = 'regular'