Skip to content

Commit

Permalink
Avoid reserializing results into multiprocessing in Work Queue and Ta…
Browse files Browse the repository at this point in the history
…sk Vine executors (#2916)

This PR fixes issue #2908
  • Loading branch information
tphung3 authored Oct 25, 2023
1 parent a5aede7 commit cc52e97
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 51 deletions.
7 changes: 3 additions & 4 deletions parsl/executors/taskvine/exec_parsl_function.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import traceback
import sys

import pickle
from parsl.app.errors import RemoteExceptionWrapper
from parsl.data_provider.files import File
from parsl.utils import get_std_fname_mode
from parsl.serialize import deserialize
from parsl.serialize import deserialize, serialize

# This scripts executes a parsl function which is pickled in 4 files:
#
Expand All @@ -30,10 +29,10 @@
#


def dump_result_to_file(result_file: str, result_package):
def dump_result_to_file(result_file: str, result):
""" Dump a result to the given result file."""
with open(result_file, "wb") as f_out:
pickle.dump(result_package, f_out)
f_out.write(serialize(result))


def remap_location(mapping, parsl_file):
Expand Down
13 changes: 10 additions & 3 deletions parsl/executors/taskvine/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
# Import Parsl constructs
import parsl.utils as putils
from parsl.data_provider.staging import Staging
from parsl.serialize import serialize
from parsl.serialize import serialize, deserialize
from parsl.data_provider.files import File
from parsl.errors import OptionalModuleMissing
from parsl.providers.base import ExecutionProvider
Expand Down Expand Up @@ -639,11 +639,18 @@ def _collect_taskvine_results(self):
logger.debug(f'Updating Future for Parsl Task: {task_report.executor_id}. \
Task {task_report.executor_id} has result_received set to {task_report.result_received}')
if task_report.result_received:
future.set_result(task_report.result)
try:
with open(task_report.result_file, 'rb') as f_in:
result = deserialize(f_in.read())
except Exception as e:
logger.debug(f'Cannot load result from result file {task_report.result_file}. Exception: {e}')
future.set_exception(TaskVineTaskFailure('Cannot load result from result file', None))
else:
future.set_result(result)
else:
# If there are no results, then the task failed according to one of
# taskvine modes, such as resource exhaustion.
future.set_exception(TaskVineTaskFailure(task_report.reason, task_report.result))
future.set_exception(TaskVineTaskFailure(task_report.reason, None))

# decrement outstanding task counter
with self._outstanding_tasks_lock:
Expand Down
28 changes: 12 additions & 16 deletions parsl/executors/taskvine/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import hashlib
import subprocess
import os
import pickle
import queue
import shutil
import uuid
Expand Down Expand Up @@ -229,7 +228,7 @@ def _taskvine_submit_wait(ready_task_queue=None,
logger.error("Unable to create executor task (mode:regular): {}".format(e))
finished_task_queue.put_nowait(VineTaskToParsl(executor_id=task.executor_id,
result_received=False,
result=None,
result_file=None,
reason="task could not be created by taskvine",
status=-1))
continue
Expand Down Expand Up @@ -268,7 +267,7 @@ def _taskvine_submit_wait(ready_task_queue=None,
logger.error("Unable to create executor task (mode:serverless): {}".format(e))
finished_task_queue.put_nowait(VineTaskToParsl(executor_id=task.executor_id,
result_received=False,
result=None,
result_file=None,
reason="task could not be created by taskvine",
status=-1))
else:
Expand Down Expand Up @@ -369,7 +368,7 @@ def _taskvine_submit_wait(ready_task_queue=None,
logger.error("Unable to submit task to taskvine: {}".format(e))
finished_task_queue.put_nowait(VineTaskToParsl(executor_id=task.executor_id,
result_received=False,
result=None,
result_file=None,
reason="task could not be submited to taskvine",
status=-1))
continue
Expand All @@ -394,24 +393,21 @@ def _taskvine_submit_wait(ready_task_queue=None,

logger.debug(f"completed executor task info: {executor_task_id}, {t.category}, {t.command}, {t.std_output}")

# A tasks completes 'succesfully' if it has result file,
# and it can be loaded. This may mean that the 'success' is
# an exception.
# A tasks completes 'succesfully' if it has result file.
# A check whether the Python object represented using this file can be
# deserialized happens later in the collector thread of the executor
# process.
logger.debug("Looking for result in {}".format(result_file))
try:
with open(result_file, "rb") as f_in:
result = pickle.load(f_in)
if os.path.exists(result_file):
logger.debug("Found result in {}".format(result_file))
finished_task_queue.put_nowait(VineTaskToParsl(executor_id=executor_task_id,
result_received=True,
result=result,
result_file=result_file,
reason=None,
status=t.exit_code))
# If a result file could not be generated, explain the
# failure according to taskvine error codes. We generate
# an exception and wrap it with RemoteExceptionWrapper, to
# match the positive case.
except Exception as e:
# failure according to taskvine error codes.
else:
reason = _explain_taskvine_result(t)
logger.debug("Did not find result in {}".format(result_file))
logger.debug("Wrapper Script status: {}\nTaskVine Status: {}"
Expand All @@ -420,7 +416,7 @@ def _taskvine_submit_wait(ready_task_queue=None,
.format(executor_task_id, t.id, reason))
finished_task_queue.put_nowait(VineTaskToParsl(executor_id=executor_task_id,
result_received=False,
result=e,
result_file=None,
reason=reason,
status=t.exit_code))

Expand Down
10 changes: 5 additions & 5 deletions parsl/executors/taskvine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,20 @@ def __init__(self,

class VineTaskToParsl:
"""
Support structure to communicate final status of TaskVine tasks to Parsl
result is only valid if result_received is True
reason and status are only valid if result_received is False
Support structure to communicate final status of TaskVine tasks to Parsl.
result_file is only valid if result_received is True.
Reason and status are only valid if result_received is False.
"""
def __init__(self,
executor_id: int, # executor id of task
result_received: bool, # whether result is received or not
result, # result object if available
result_file: Optional[str], # path to file that contains the serialized result object
reason: Optional[str], # string describing why execution fails
status: Optional[int] # exit code of execution of task
):
self.executor_id = executor_id
self.result_received = result_received
self.result = result
self.result_file = result_file
self.reason = reason
self.status = status

Expand Down
3 changes: 2 additions & 1 deletion parsl/executors/workqueue/exec_parsl_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import traceback
import sys
import pickle
from parsl.serialize import serialize

# This scripts executes a parsl function which is pickled in a file:
#
Expand Down Expand Up @@ -32,7 +33,7 @@ def load_pickled_file(filename):

def dump_result_to_file(result_file, result_package):
with open(result_file, "wb") as f_out:
pickle.dump(result_package, f_out)
f_out.write(serialize(result_package))


def remap_location(mapping, parsl_file):
Expand Down
55 changes: 33 additions & 22 deletions parsl/executors/workqueue/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import shutil
import itertools

from parsl.serialize import pack_apply_message
from parsl.serialize import pack_apply_message, deserialize
import parsl.utils as putils
from parsl.executors.errors import ExecutorError
from parsl.data_provider.files import File
Expand Down Expand Up @@ -66,11 +66,11 @@

# Support structure to communicate final status of work queue tasks to parsl
# if result_received is True:
# result is the result
# result_file is the path to the file containing the result.
# if result_received is False:
# reason and status are only valid if result_received is False
# result is either None or an exception raised while looking for a result
WqTaskToParsl = namedtuple('WqTaskToParsl', 'id result_received result reason status')
# result_file is None
WqTaskToParsl = namedtuple('WqTaskToParsl', 'id result_received result_file reason status')

# Support structure to report parsl filenames to work queue.
# parsl_name is the local_name or filepath attribute of a parsl file object.
Expand Down Expand Up @@ -729,14 +729,29 @@ def _collect_work_queue_results(self):
with self.tasks_lock:
future = self.tasks.pop(task_report.id)
logger.debug("Updating Future for executor task {}".format(task_report.id))
# If result_received, then there's a result file. The object inside the file
# may be a valid result or an exception caused within the function invocation.
# Otherwise there's no result file, implying errors from WorkQueue.
if task_report.result_received:
future.set_result(task_report.result)
try:
with open(task_report.result_file, 'rb') as f_in:
result = deserialize(f_in.read())
except Exception as e:
logger.error(f'Cannot load result from result file {task_report.result_file}. Exception: {e}')
ex = WorkQueueTaskFailure('Cannot load result from result file', None)
ex.__cause__ = e
future.set_exception(ex)
else:
if isinstance(result, Exception):
ex = WorkQueueTaskFailure('Task execution raises an exception', result)
ex.__cause__ = result
future.set_exception(ex)
else:
future.set_result(result)
else:
# If there are no results, then the task failed according to one of
# work queue modes, such as resource exhaustion.
ex = WorkQueueTaskFailure(task_report.reason, task_report.result)
if task_report.result is not None:
ex.__cause__ = task_report.result
ex = WorkQueueTaskFailure(task_report.reason, None)
future.set_exception(ex)
finally:
logger.debug("Marking all outstanding tasks as failed")
Expand Down Expand Up @@ -876,7 +891,7 @@ def _work_queue_submit_wait(*,
logger.error("Unable to create task: {}".format(e))
collector_queue.put_nowait(WqTaskToParsl(id=task.id,
result_received=False,
result=None,
result_file=None,
reason="task could not be created by work queue",
status=-1))
continue
Expand Down Expand Up @@ -937,7 +952,7 @@ def _work_queue_submit_wait(*,
logger.error("Unable to submit task to work queue: {}".format(e))
collector_queue.put_nowait(WqTaskToParsl(id=task.id,
result_received=False,
result=None,
result_file=None,
reason="task could not be submited to work queue",
status=-1))
continue
Expand All @@ -957,24 +972,20 @@ def _work_queue_submit_wait(*,
logger.debug("Completed Work Queue task {}, executor task {}".format(t.id, t.tag))
result_file = result_file_of_task_id.pop(t.tag)

# A tasks completes 'succesfully' if it has result file,
# and it can be loaded. This may mean that the 'success' is
# an exception.
# A tasks completes 'succesfully' if it has result file.
# The check whether this file can load a serialized Python object
# happens later in the collector thread of the executor process.
logger.debug("Looking for result in {}".format(result_file))
try:
with open(result_file, "rb") as f_in:
result = pickle.load(f_in)
if os.path.exists(result_file):
logger.debug("Found result in {}".format(result_file))
collector_queue.put_nowait(WqTaskToParsl(id=executor_task_id,
result_received=True,
result=result,
result_file=result_file,
reason=None,
status=t.return_status))
# If a result file could not be generated, explain the
# failure according to work queue error codes. We generate
# an exception and wrap it with RemoteExceptionWrapper, to
# match the positive case.
except Exception as e:
# failure according to work queue error codes.
else:
reason = _explain_work_queue_result(t)
logger.debug("Did not find result in {}".format(result_file))
logger.debug("Wrapper Script status: {}\nWorkQueue Status: {}"
Expand All @@ -983,7 +994,7 @@ def _work_queue_submit_wait(*,
.format(executor_task_id, t.id, reason))
collector_queue.put_nowait(WqTaskToParsl(id=executor_task_id,
result_received=False,
result=e,
result_file=None,
reason=reason,
status=t.return_status))
logger.debug("Exiting WorkQueue Monitoring Process")
Expand Down

0 comments on commit cc52e97

Please sign in to comment.