Skip to content

Commit

Permalink
Always return worker address from _remove_from_processing for paper t…
Browse files Browse the repository at this point in the history
…rail
  • Loading branch information
hendrikmakait committed Aug 17, 2022
1 parent 1d0701b commit b31ad11
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2259,7 +2259,7 @@ def transition_processing_released(self, key: str, stimulus_id: str):
assert ts.state == "processing"

w = _remove_from_processing(self, ts)
if w:
if w in self.workers:
worker_msgs[w] = [
{
"op": "free-keys",
Expand Down Expand Up @@ -2329,7 +2329,7 @@ def transition_processing_erred(

w = _remove_from_processing(self, ts)

ts.erred_on.add(w or worker) # type: ignore
ts.erred_on.add(w)
if exception is not None:
ts.exception = exception
ts.exception_text = exception_text # type: ignore
Expand Down Expand Up @@ -7314,7 +7314,7 @@ def request_remove_replicas(
)


def _remove_from_processing(state: SchedulerState, ts: TaskState) -> str | None:
def _remove_from_processing(state: SchedulerState, ts: TaskState) -> str:
"""Remove *ts* from the set of processing tasks.
See also
Expand All @@ -7326,7 +7326,7 @@ def _remove_from_processing(state: SchedulerState, ts: TaskState) -> str | None:
ts.processing_on = None

if ws.address not in state.workers: # may have been removed
return None
return ws.address

duration = ws.processing.pop(ts)
ws.long_running.discard(ts)
Expand Down

0 comments on commit b31ad11

Please sign in to comment.