From b31ad118c96b1f534b5161d17478b03f39f1c6d6 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 12 Aug 2022 15:49:36 +0200 Subject: [PATCH 1/2] Always return worker address from _remove_from_processing for paper trail --- distributed/scheduler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index e38447c70b..ad50a42bc8 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2259,7 +2259,7 @@ def transition_processing_released(self, key: str, stimulus_id: str): assert ts.state == "processing" w = _remove_from_processing(self, ts) - if w: + if w in self.workers: worker_msgs[w] = [ { "op": "free-keys", @@ -2329,7 +2329,7 @@ def transition_processing_erred( w = _remove_from_processing(self, ts) - ts.erred_on.add(w or worker) # type: ignore + ts.erred_on.add(w) if exception is not None: ts.exception = exception ts.exception_text = exception_text # type: ignore @@ -7314,7 +7314,7 @@ def request_remove_replicas( ) -def _remove_from_processing(state: SchedulerState, ts: TaskState) -> str | None: +def _remove_from_processing(state: SchedulerState, ts: TaskState) -> str: """Remove *ts* from the set of processing tasks. See also @@ -7326,7 +7326,7 @@ def _remove_from_processing(state: SchedulerState, ts: TaskState) -> str | None: ts.processing_on = None if ws.address not in state.workers: # may have been removed - return None + return ws.address duration = ws.processing.pop(ts) ws.long_running.discard(ts) From 3ffcf711432495057fc31c86a2ab99200d0c5e24 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 15 Aug 2022 17:11:17 +0200 Subject: [PATCH 2/2] Drop worker keyword arg --- distributed/scheduler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index ad50a42bc8..a8b5f5b3f7 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -2304,7 +2304,6 @@ def transition_processing_erred( traceback=None, exception_text: str | None = None, traceback_text: str | None = None, - worker: str | None = None, **kwargs, ): ws: WorkerState