diff --git a/distributed/worker.py b/distributed/worker.py index fdc43f8f626..8986baa2902 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2114,9 +2114,11 @@ def transition_released_waiting(self, ts, *, stimulus_id): def transition_fetch_flight(self, ts, worker, *, stimulus_id): if self.validate: assert ts.state == "fetch" - assert ts.who_has assert ts.key not in self.data_needed + if not ts.who_has: + return {ts: "missing"}, [] + ts.done = False ts.state = "flight" ts.coming_from = worker @@ -3237,6 +3239,11 @@ def release_key( ts.nbytes = None ts._previous = None ts._next = None + ts.exception = None + ts.exception_text = "" + ts.traceback = None + ts.traceback_text = "" + self._missing_dep_flight.discard(ts) ts.done = False self._executing.discard(ts) @@ -3883,7 +3890,7 @@ def validate_task_missing(self, ts): assert not ts.who_has assert not ts.done assert not any(ts.key in has_what for has_what in self.has_what.values()) - assert ts.key in self._missing_dep_flight + assert ts in self._missing_dep_flight def validate_task_cancelled(self, ts): assert ts.key not in self.data @@ -3973,7 +3980,7 @@ def validate_state(self): assert ( ts_wait.state in READY | {"executing", "flight", "fetch", "missing"} - or ts_wait.key in self._missing_dep_flight + or ts_wait in self._missing_dep_flight or ts_wait.who_has.issubset(self.in_flight_workers) ), (ts, ts_wait, self.story(ts), self.story(ts_wait)) if ts.state == "memory":