Skip to content

Commit

Permalink
Add request_id to ComputeFuture._metadata
Browse files Browse the repository at this point in the history
The main change is the `fut._metadata` dictionary update in `executor.py` in
the happy and non-happy paths.  This suggested a change in the fixtures of the
tests, which resulted in a larger than expected set of test-changes.  But it's
all in service to the 2 `_metadata` hunk changes.
  • Loading branch information
khk-globus committed Dec 16, 2024
1 parent 53b42d3 commit 379ee05
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 103 deletions.
34 changes: 27 additions & 7 deletions compute_sdk/globus_compute_sdk/sdk/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,14 @@ def __hash__(self):
api_burst_fill.append(fill_percent)

to_watch = [f for f in fut_list if f.task_id and not f.done()]
log.debug(
"%r (tid:%s): Submission complete (to %s);"
" count to watcher: %d",
self,
_tid,
ep_uuid,
len(to_watch),
)
if not to_watch:
continue

Expand Down Expand Up @@ -1043,6 +1051,8 @@ def _submit_tasks(
set when function completes successfully.
:param tasks: a list of tasks to submit upstream in a batch.
"""
assert len(futs) == len(tasks), "Developer reminder"

_tid = threading.get_ident()
if taskgroup_uuid is None and self.task_group_id:
taskgroup_uuid = self.task_group_id
Expand Down Expand Up @@ -1083,10 +1093,12 @@ def _submit_tasks(

try:
received_tasks_by_fn: dict[str, list[str]] = batch_response["tasks"]
new_tg_id: str = batch_response["task_group_id"]
new_tg_id = uuid.UUID(batch_response["task_group_id"])
_request_id = uuid.UUID(batch_response["request_id"])
_endpoint_id = uuid.UUID(batch_response["endpoint_id"])
except Exception as e:
log.debug(
f"Server response ({batch_response}) missing an expected field"
f"Invalid or unexpected server response ({batch_response})"
f" [({type(e).__name__}) {e}]"
)
for fut_list in submitted_futs_by_fn.values():
Expand All @@ -1095,7 +1107,7 @@ def _submit_tasks(
self._submitter_thread_exception_captured = True
raise

if str(self.task_group_id) != new_tg_id:
if self.task_group_id != new_tg_id:
log.info(f"Updating task_group_id from {self.task_group_id} to {new_tg_id}")
self.task_group_id = new_tg_id

Expand All @@ -1113,30 +1125,38 @@ def _submit_tasks(
for fn_id, fut_list in submitted_futs_by_fn.items():
task_uuids = received_tasks_by_fn.get(fn_id)

fut_exc = None
if task_uuids is None:
fut_exc = Exception(
f"The Globus Compute Service ignored tasks for function {fn_id}!"
" This 'should not happen,' so please reach out to the Globus"
" Compute team if you are able to recreate this behavior."
)
for fut in fut_list:
fut.set_exception(fut_exc)
continue

if len(fut_list) != len(task_uuids):
elif len(fut_list) != len(task_uuids):
fut_exc = Exception(
"The Globus Compute Service only partially initiated requested"
f" tasks for function {fn_id}! It is unclear which tasks it"
" honored, so marking all futures as failed. Please reach out"
" to the Globus Compute team if you are able to recreate this"
" behavior."
)

if fut_exc:
for fut in fut_list:
fut._metadata["request_uuid"] = _request_id
fut._metadata["endpoint_uuid"] = _endpoint_id
fut._metadata["task_group_uuid"] = new_tg_id
fut.set_exception(fut_exc)
continue

assert task_uuids is not None, "2nd order logic, mypy. :facepalm:"

# Happy -- expected -- path
for fut, task_id in zip(fut_list, task_uuids):
fut._metadata["request_uuid"] = _request_id
fut._metadata["endpoint_uuid"] = _endpoint_id
fut._metadata["task_group_uuid"] = new_tg_id
fut.task_id = task_id


Expand Down
Loading

0 comments on commit 379ee05

Please sign in to comment.