From 69b74f9f0aa46dac7facb960f1b0aaac69121e5e Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 16 Nov 2022 17:44:12 +0000 Subject: [PATCH] Remove exception handling from transitions (#7316) --- distributed/scheduler.py | 1094 +++++++++++++++++--------------------- 1 file changed, 478 insertions(+), 616 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 2dd1f851f4..b8150b190e 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1983,81 +1983,66 @@ def _transitions( scheduler.validate_key(key) def transition_released_waiting(self, key, stimulus_id): - try: - ts: TaskState = self.tasks[key] - dts: TaskState - recommendations: dict = {} - client_msgs: dict = {} - worker_msgs: dict = {} - - if self.validate: - assert ts.run_spec - assert not ts.waiting_on - assert not ts.who_has - assert not ts.processing_on - assert not any([dts.state == "forgotten" for dts in ts.dependencies]) + ts: TaskState = self.tasks[key] + dts: TaskState + recommendations: dict = {} + client_msgs: dict = {} + worker_msgs: dict = {} - if ts.has_lost_dependencies: - recommendations[key] = "forgotten" - return recommendations, client_msgs, worker_msgs + if self.validate: + assert ts.run_spec + assert not ts.waiting_on + assert not ts.who_has + assert not ts.processing_on + assert not any([dts.state == "forgotten" for dts in ts.dependencies]) - ts.state = "waiting" + if ts.has_lost_dependencies: + recommendations[key] = "forgotten" + return recommendations, client_msgs, worker_msgs - dts: TaskState - for dts in ts.dependencies: - if dts.exception_blame: - ts.exception_blame = dts.exception_blame - recommendations[key] = "erred" - return recommendations, client_msgs, worker_msgs + ts.state = "waiting" - for dts in ts.dependencies: - dep = dts.key - if not dts.who_has: - ts.waiting_on.add(dts) - if dts.state == "released": - recommendations[dep] = "waiting" - else: - dts.waiters.add(ts) + dts: TaskState + for dts in ts.dependencies: + if dts.exception_blame: + ts.exception_blame = dts.exception_blame + recommendations[key] = "erred" + return recommendations, client_msgs, worker_msgs - ts.waiters = {dts for dts in ts.dependents if dts.state == "waiting"} + for dts in ts.dependencies: + dep = dts.key + if not dts.who_has: + ts.waiting_on.add(dts) + if dts.state == "released": + recommendations[dep] = "waiting" + else: + dts.waiters.add(ts) - if not ts.waiting_on: - # NOTE: waiting->processing will send tasks to queued or no-worker as necessary - recommendations[key] = "processing" + ts.waiters = {dts for dts in ts.dependents if dts.state == "waiting"} - return recommendations, client_msgs, worker_msgs - except Exception as e: - logger.exception(e) - if LOG_PDB: - import pdb + if not ts.waiting_on: + # NOTE: waiting->processing will send tasks to queued or no-worker as + # necessary + recommendations[key] = "processing" - pdb.set_trace() - raise + return recommendations, client_msgs, worker_msgs def transition_no_worker_processing(self, key, stimulus_id): - try: - ts: TaskState = self.tasks[key] - recommendations: Recs = {} - client_msgs: dict = {} - worker_msgs: dict = {} - - if self.validate: - assert not ts.actor, f"Actors can't be in `no-worker`: {ts}" - assert ts in self.unrunnable + ts: TaskState = self.tasks[key] + recommendations: Recs = {} + client_msgs: dict = {} + worker_msgs: dict = {} - if ws := self.decide_worker_non_rootish(ts): - self.unrunnable.discard(ts) - worker_msgs = self._add_to_processing(ts, ws) - # If no worker, task just stays in `no-worker` + if self.validate: + assert not ts.actor, f"Actors can't be in `no-worker`: {ts}" + assert ts in self.unrunnable - return recommendations, client_msgs, worker_msgs - except Exception as e: - logger.exception(e) - if LOG_PDB: - import pdb + if ws := self.decide_worker_non_rootish(ts): + self.unrunnable.discard(ts) + worker_msgs = self._add_to_processing(ts, ws) + # If no worker, task just stays in `no-worker` - pdb.set_trace() - raise + return recommendations, client_msgs, worker_msgs def decide_worker_rootish_queuing_disabled( self, ts: TaskState @@ -2144,11 +2129,11 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None: """ if self.validate: - # We don't `assert self.is_rootish(ts)` here, because that check is dependent on - # cluster size. It's possible a task looked root-ish when it was queued, but the - # cluster has since scaled up and it no longer does when coming out of the queue. - # If `is_rootish` changes to a static definition, then add that assertion here - # (and actually pass in the task). + # We don't `assert self.is_rootish(ts)` here, because that check is + # dependent on cluster size. It's possible a task looked root-ish when it + # was queued, but the cluster has since scaled up and it no longer does when + # coming out of the queue. If `is_rootish` changes to a static definition, + # then add that assertion here (and actually pass in the task). assert not math.isinf(self.WORKER_SATURATION) if not self.idle_task_count: @@ -2175,7 +2160,8 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None: return ws def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None: - """Pick a worker for a runnable non-root task, considering dependencies and restrictions. + """Pick a worker for a runnable non-root task, considering dependencies and + restrictions. Out of eligible workers holding dependencies of ``ts``, selects the worker where, considering worker backlong and data-transfer costs, the task is @@ -2196,7 +2182,8 @@ def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None: if not self.running: return None - # If there were no restrictions, `valid_workers()` didn't subset by `running`. + # If there were no restrictions, `valid_workers()` didn't subset by + # `running`. valid_workers = self.running if ts.dependencies or valid_workers is not None: @@ -2207,11 +2194,11 @@ def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None: partial(self.worker_objective, ts), ) else: - # TODO if `is_rootish` would always return True for tasks without dependencies, - # we could remove all this logic. The rootish assignment logic would behave - # more or less the same as this, maybe without guaranteed round-robin though? - # This path is only reachable when `ts` doesn't have dependencies, but its - # group is also smaller than the cluster. + # TODO if `is_rootish` would always return True for tasks without + # dependencies, we could remove all this logic. The rootish assignment logic + # would behave more or less the same as this, maybe without guaranteed + # round-robin though? This path is only reachable when `ts` doesn't have + # dependencies, but its group is also smaller than the cluster. # Fastpath when there are no related tasks or restrictions worker_pool = self.idle or self.workers @@ -2246,35 +2233,28 @@ def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None: def transition_waiting_processing(self, key, stimulus_id): """Possibly schedule a ready task. This is the primary dispatch for ready tasks. - If there's no appropriate worker for the task (but the task is otherwise runnable), - it will be recommended to ``no-worker`` or ``queued``. + If there's no appropriate worker for the task (but the task is otherwise + runnable), it will be recommended to ``no-worker`` or ``queued``. """ - try: - ts: TaskState = self.tasks[key] - - if self.is_rootish(ts): - # NOTE: having two root-ish methods is temporary. When the feature flag is removed, - # there should only be one, which combines co-assignment and queuing. - # Eventually, special-casing root tasks might be removed entirely, with better heuristics. - if math.isinf(self.WORKER_SATURATION): - if not (ws := self.decide_worker_rootish_queuing_disabled(ts)): - return {ts.key: "no-worker"}, {}, {} - else: - if not (ws := self.decide_worker_rootish_queuing_enabled()): - return {ts.key: "queued"}, {}, {} - else: - if not (ws := self.decide_worker_non_rootish(ts)): - return {ts.key: "no-worker"}, {}, {} + ts: TaskState = self.tasks[key] - worker_msgs = self._add_to_processing(ts, ws) - return {}, {}, worker_msgs - except Exception as e: - logger.exception(e) - if LOG_PDB: - import pdb + if self.is_rootish(ts): + # NOTE: having two root-ish methods is temporary. When the feature flag is + # removed, there should only be one, which combines co-assignment and + # queuing. Eventually, special-casing root tasks might be removed entirely, + # with better heuristics. + if math.isinf(self.WORKER_SATURATION): + if not (ws := self.decide_worker_rootish_queuing_disabled(ts)): + return {ts.key: "no-worker"}, {}, {} + else: + if not (ws := self.decide_worker_rootish_queuing_enabled()): + return {ts.key: "queued"}, {}, {} + else: + if not (ws := self.decide_worker_non_rootish(ts)): + return {ts.key: "no-worker"}, {}, {} - pdb.set_trace() - raise + worker_msgs = self._add_to_processing(ts, ws) + return {}, {}, worker_msgs def transition_waiting_memory( self, @@ -2295,22 +2275,14 @@ def transition_waiting_memory( scheduler has a chance to reach the worker. Shortly, the cancellation request will reach the worker, thus deleting the data from memory. """ - try: - ts = self.tasks[key] - - if self.validate: - assert not ts.processing_on - assert ts.waiting_on - assert ts.state == "waiting" + ts = self.tasks[key] - return {}, {}, {} - except Exception as e: - logger.exception(e) - if LOG_PDB: - import pdb + if self.validate: + assert not ts.processing_on + assert ts.waiting_on + assert ts.state == "waiting" - pdb.set_trace() - raise + return {}, {}, {} def transition_processing_memory( self, @@ -2327,313 +2299,266 @@ def transition_processing_memory( recommendations: dict = {} client_msgs: dict = {} worker_msgs: dict = {} - try: - ts = self.tasks[key] - - assert worker - assert isinstance(worker, str) - - if self.validate: - assert ts.processing_on - wss = ts.processing_on - assert wss - assert ts in wss.processing - del wss - assert not ts.waiting_on - assert not ts.who_has, (ts, ts.who_has) - assert not ts.exception_blame - assert ts.state == "processing" + ts = self.tasks[key] - ws = self.workers.get(worker) - if ws is None: - return {key: "released"}, {}, {} + assert worker + assert isinstance(worker, str) - if ws != ts.processing_on: # pragma: nocover - assert ts.processing_on - raise RuntimeError( - f"Task {ts.key!r} transitioned from processing to memory on worker " - f"{ws}, while it was expected from {ts.processing_on}. This should " - f"be impossible. {stimulus_id=}, story={self.story(ts)}" - ) + if self.validate: + assert ts.processing_on + wss = ts.processing_on + assert wss + assert ts in wss.processing + del wss + assert not ts.waiting_on + assert not ts.who_has, (ts, ts.who_has) + assert not ts.exception_blame + assert ts.state == "processing" - ############################# - # Update Timing Information # - ############################# - if startstops: - for startstop in startstops: - ts.group.add_duration( - stop=startstop["stop"], - start=startstop["start"], - action=startstop["action"], - ) + ws = self.workers.get(worker) + if ws is None: + return {key: "released"}, {}, {} - s: set = self.unknown_durations.pop(ts.prefix.name, set()) - tts: TaskState - steal = self.extensions.get("stealing") - if steal: - for tts in s: - if tts.processing_on: - steal.recalculate_cost(tts) - - ############################ - # Update State Information # - ############################ - if nbytes is not None: - ts.set_nbytes(nbytes) - - # NOTE: recommendations for queued tasks are added first, so they'll be popped last, - # allowing higher-priority downstream tasks to be transitioned first. - # FIXME: this would be incorrect if queued tasks are user-annotated as higher priority. - self._exit_processing_common(ts, recommendations) - - self._add_to_memory( - ts, ws, recommendations, client_msgs, type=type, typename=typename + if ws != ts.processing_on: # pragma: nocover + assert ts.processing_on + raise RuntimeError( + f"Task {ts.key!r} transitioned from processing to memory on worker " + f"{ws}, while it was expected from {ts.processing_on}. This should " + f"be impossible. {stimulus_id=}, story={self.story(ts)}" ) - if self.validate: - assert not ts.processing_on - assert not ts.waiting_on + ############################# + # Update Timing Information # + ############################# + if startstops: + for startstop in startstops: + ts.group.add_duration( + stop=startstop["stop"], + start=startstop["start"], + action=startstop["action"], + ) - return recommendations, client_msgs, worker_msgs - except Exception as e: - logger.exception(e) - if LOG_PDB: - import pdb + s: set = self.unknown_durations.pop(ts.prefix.name, set()) + tts: TaskState + steal = self.extensions.get("stealing") + if steal: + for tts in s: + if tts.processing_on: + steal.recalculate_cost(tts) + + ############################ + # Update State Information # + ############################ + if nbytes is not None: + ts.set_nbytes(nbytes) + + # NOTE: recommendations for queued tasks are added first, so they'll be popped + # last, allowing higher-priority downstream tasks to be transitioned first. + # FIXME: this would be incorrect if queued tasks are user-annotated as higher + # priority. + self._exit_processing_common(ts, recommendations) + + self._add_to_memory( + ts, ws, recommendations, client_msgs, type=type, typename=typename + ) - pdb.set_trace() - raise + if self.validate: + assert not ts.processing_on + assert not ts.waiting_on + + return recommendations, client_msgs, worker_msgs def transition_memory_released(self, key, stimulus_id, safe: bool = False): ws: WorkerState - try: - ts: TaskState = self.tasks[key] - dts: TaskState - recommendations: dict = {} - client_msgs: dict = {} - worker_msgs: dict = {} + ts: TaskState = self.tasks[key] + dts: TaskState + recommendations: dict = {} + client_msgs: dict = {} + worker_msgs: dict = {} - if self.validate: - assert not ts.waiting_on - assert not ts.processing_on - if safe: - assert not ts.waiters - - if ts.actor: - for ws in ts.who_has: - ws.actors.discard(ts) - if ts.who_wants: - ts.exception_blame = ts - ts.exception = "Worker holding Actor was lost" - recommendations[ts.key] = "erred" - return ( - recommendations, - client_msgs, - worker_msgs, - ) # don't try to recreate - - # XXX factor this out? - worker_msg = { - "op": "free-keys", - "keys": [key], - "stimulus_id": stimulus_id, - } - for ws in ts.who_has: - worker_msgs[ws.address] = [worker_msg] - self.remove_all_replicas(ts) + if self.validate: + assert not ts.waiting_on + assert not ts.processing_on + if safe: + assert not ts.waiters - ts.state = "released" + if ts.actor: + for ws in ts.who_has: + ws.actors.discard(ts) + if ts.who_wants: + ts.exception_blame = ts + ts.exception = "Worker holding Actor was lost" + recommendations[ts.key] = "erred" + return ( + recommendations, + client_msgs, + worker_msgs, + ) # don't try to recreate + + # XXX factor this out? + worker_msg = { + "op": "free-keys", + "keys": [key], + "stimulus_id": stimulus_id, + } + for ws in ts.who_has: + worker_msgs[ws.address] = [worker_msg] + self.remove_all_replicas(ts) - report_msg = {"op": "lost-data", "key": key} - cs: ClientState - for cs in ts.who_wants: - client_msgs[cs.client_key] = [report_msg] + ts.state = "released" - if not ts.run_spec: # pure data - recommendations[key] = "forgotten" - elif ts.has_lost_dependencies: - recommendations[key] = "forgotten" - elif ts.who_wants or ts.waiters: - recommendations[key] = "waiting" + report_msg = {"op": "lost-data", "key": key} + cs: ClientState + for cs in ts.who_wants: + client_msgs[cs.client_key] = [report_msg] - for dts in ts.waiters: - if dts.state in ("no-worker", "processing"): - recommendations[dts.key] = "waiting" - elif dts.state == "waiting": - dts.waiting_on.add(ts) + if not ts.run_spec: # pure data + recommendations[key] = "forgotten" + elif ts.has_lost_dependencies: + recommendations[key] = "forgotten" + elif ts.who_wants or ts.waiters: + recommendations[key] = "waiting" - if self.validate: - assert not ts.waiting_on + for dts in ts.waiters: + if dts.state in ("no-worker", "processing"): + recommendations[dts.key] = "waiting" + elif dts.state == "waiting": + dts.waiting_on.add(ts) - return recommendations, client_msgs, worker_msgs - except Exception as e: - logger.exception(e) - if LOG_PDB: - import pdb + if self.validate: + assert not ts.waiting_on - pdb.set_trace() - raise + return recommendations, client_msgs, worker_msgs def transition_released_erred(self, key, stimulus_id): - try: - ts: TaskState = self.tasks[key] - dts: TaskState - failing_ts: TaskState - recommendations: dict = {} - client_msgs: dict = {} - worker_msgs: dict = {} + ts: TaskState = self.tasks[key] + dts: TaskState + failing_ts: TaskState + recommendations: dict = {} + client_msgs: dict = {} + worker_msgs: dict = {} - if self.validate: - with log_errors(pdb=LOG_PDB): - assert ts.exception_blame - assert not ts.who_has - assert not ts.waiting_on - assert not ts.waiters - - failing_ts = ts.exception_blame - - for dts in ts.dependents: - dts.exception_blame = failing_ts - if not dts.who_has: - recommendations[dts.key] = "erred" - - report_msg = { - "op": "task-erred", - "key": key, - "exception": failing_ts.exception, - "traceback": failing_ts.traceback, - } - cs: ClientState - for cs in ts.who_wants: - client_msgs[cs.client_key] = [report_msg] + if self.validate: + with log_errors(pdb=LOG_PDB): + assert ts.exception_blame + assert not ts.who_has + assert not ts.waiting_on + assert not ts.waiters - ts.state = "erred" + failing_ts = ts.exception_blame - # TODO: waiting data? - return recommendations, client_msgs, worker_msgs - except Exception as e: - logger.exception(e) - if LOG_PDB: - import pdb + for dts in ts.dependents: + dts.exception_blame = failing_ts + if not dts.who_has: + recommendations[dts.key] = "erred" - pdb.set_trace() - raise + report_msg = { + "op": "task-erred", + "key": key, + "exception": failing_ts.exception, + "traceback": failing_ts.traceback, + } + cs: ClientState + for cs in ts.who_wants: + client_msgs[cs.client_key] = [report_msg] + + ts.state = "erred" + + # TODO: waiting data? + return recommendations, client_msgs, worker_msgs def transition_erred_released(self, key, stimulus_id): - try: - ts: TaskState = self.tasks[key] - dts: TaskState - recommendations: dict = {} - client_msgs: dict = {} - worker_msgs: dict = {} + ts: TaskState = self.tasks[key] + dts: TaskState + recommendations: dict = {} + client_msgs: dict = {} + worker_msgs: dict = {} - if self.validate: - with log_errors(pdb=LOG_PDB): - assert ts.exception_blame - assert not ts.who_has - assert not ts.waiting_on - assert not ts.waiters - - ts.exception = None - ts.exception_blame = None - ts.traceback = None - - for dts in ts.dependents: - if dts.state == "erred": - recommendations[dts.key] = "waiting" - - w_msg = { - "op": "free-keys", - "keys": [key], - "stimulus_id": stimulus_id, - } - for ws_addr in ts.erred_on: - worker_msgs[ws_addr] = [w_msg] - ts.erred_on.clear() + if self.validate: + with log_errors(pdb=LOG_PDB): + assert ts.exception_blame + assert not ts.who_has + assert not ts.waiting_on + assert not ts.waiters - report_msg = {"op": "task-retried", "key": key} - cs: ClientState - for cs in ts.who_wants: - client_msgs[cs.client_key] = [report_msg] + ts.exception = None + ts.exception_blame = None + ts.traceback = None - ts.state = "released" + for dts in ts.dependents: + if dts.state == "erred": + recommendations[dts.key] = "waiting" - return recommendations, client_msgs, worker_msgs - except Exception as e: - logger.exception(e) - if LOG_PDB: - import pdb + w_msg = { + "op": "free-keys", + "keys": [key], + "stimulus_id": stimulus_id, + } + for ws_addr in ts.erred_on: + worker_msgs[ws_addr] = [w_msg] + ts.erred_on.clear() - pdb.set_trace() - raise + report_msg = {"op": "task-retried", "key": key} + cs: ClientState + for cs in ts.who_wants: + client_msgs[cs.client_key] = [report_msg] - def transition_waiting_released(self, key, stimulus_id): - try: - ts: TaskState = self.tasks[key] - recommendations: dict = {} - client_msgs: dict = {} - worker_msgs: dict = {} + ts.state = "released" - if self.validate: - assert not ts.who_has - assert not ts.processing_on + return recommendations, client_msgs, worker_msgs - dts: TaskState - for dts in ts.dependencies: - if ts in dts.waiters: - dts.waiters.discard(ts) - if not dts.waiters and not dts.who_wants: - recommendations[dts.key] = "released" - ts.waiting_on.clear() + def transition_waiting_released(self, key, stimulus_id): + ts: TaskState = self.tasks[key] + recommendations: dict = {} + client_msgs: dict = {} + worker_msgs: dict = {} - ts.state = "released" + if self.validate: + assert not ts.who_has + assert not ts.processing_on - if ts.has_lost_dependencies: - recommendations[key] = "forgotten" - elif not ts.exception_blame and (ts.who_wants or ts.waiters): - recommendations[key] = "waiting" - else: - ts.waiters.clear() + dts: TaskState + for dts in ts.dependencies: + if ts in dts.waiters: + dts.waiters.discard(ts) + if not dts.waiters and not dts.who_wants: + recommendations[dts.key] = "released" + ts.waiting_on.clear() - return recommendations, client_msgs, worker_msgs - except Exception as e: - logger.exception(e) - if LOG_PDB: - import pdb + ts.state = "released" - pdb.set_trace() - raise + if ts.has_lost_dependencies: + recommendations[key] = "forgotten" + elif not ts.exception_blame and (ts.who_wants or ts.waiters): + recommendations[key] = "waiting" + else: + ts.waiters.clear() - def transition_processing_released(self, key: str, stimulus_id: str): - try: - ts = self.tasks[key] - recommendations: Recs = {} - worker_msgs = {} + return recommendations, client_msgs, worker_msgs - if self.validate: - assert ts.processing_on - assert not ts.who_has - assert not ts.waiting_on - assert ts.state == "processing" + def transition_processing_released(self, key: str, stimulus_id: str): + ts = self.tasks[key] + recommendations: Recs = {} + worker_msgs = {} - ws = self._exit_processing_common(ts, recommendations) - if ws: - worker_msgs[ws.address] = [ - { - "op": "free-keys", - "keys": [key], - "stimulus_id": stimulus_id, - } - ] + if self.validate: + assert ts.processing_on + assert not ts.who_has + assert not ts.waiting_on + assert ts.state == "processing" - self._propagate_released(ts, recommendations) - return recommendations, {}, worker_msgs - except Exception as e: - logger.exception(e) - if LOG_PDB: - import pdb + ws = self._exit_processing_common(ts, recommendations) + if ws: + worker_msgs[ws.address] = [ + { + "op": "free-keys", + "keys": [key], + "stimulus_id": stimulus_id, + } + ] - pdb.set_trace() - raise + self._propagate_released(ts, recommendations) + return recommendations, {}, worker_msgs def transition_processing_erred( self, @@ -2656,7 +2581,8 @@ def transition_processing_erred( stimulus_id ID of the stimulus causing the transition worker - Address of the worker where the task erred. Not necessarily ``ts.processing_on``. + Address of the worker where the task erred. + Not necessarily ``ts.processing_on``. cause Address of the task that caused this task to be transitioned to erred exception @@ -2673,215 +2599,167 @@ def transition_processing_erred( ------- Recommendations, client messages and worker messages to process """ - try: - ts: TaskState = self.tasks[key] - dts: TaskState - failing_ts: TaskState - recommendations: dict = {} - client_msgs: dict = {} - worker_msgs: dict = {} + ts: TaskState = self.tasks[key] + dts: TaskState + failing_ts: TaskState + recommendations: dict = {} + client_msgs: dict = {} + worker_msgs: dict = {} - if self.validate: - assert cause or ts.exception_blame - assert ts.processing_on - assert not ts.who_has - assert not ts.waiting_on + if self.validate: + assert cause or ts.exception_blame + assert ts.processing_on + assert not ts.who_has + assert not ts.waiting_on - if ts.actor: - assert ts.processing_on - ws = ts.processing_on - ws.actors.remove(ts) - - self._exit_processing_common(ts, recommendations) - - ts.erred_on.add(worker) - if exception is not None: - ts.exception = exception - ts.exception_text = exception_text # type: ignore - if traceback is not None: - ts.traceback = traceback - ts.traceback_text = traceback_text # type: ignore - if cause is not None: - failing_ts = self.tasks[cause] - ts.exception_blame = failing_ts - else: - failing_ts = ts.exception_blame # type: ignore - - self.erred_tasks.appendleft( - ErredTask( - ts.key, - time(), - ts.erred_on.copy(), - exception_text or "", - traceback_text or "", - ) + if ts.actor: + assert ts.processing_on + ws = ts.processing_on + ws.actors.remove(ts) + + self._exit_processing_common(ts, recommendations) + + ts.erred_on.add(worker) + if exception is not None: + ts.exception = exception + ts.exception_text = exception_text # type: ignore + if traceback is not None: + ts.traceback = traceback + ts.traceback_text = traceback_text # type: ignore + if cause is not None: + failing_ts = self.tasks[cause] + ts.exception_blame = failing_ts + else: + failing_ts = ts.exception_blame # type: ignore + + self.erred_tasks.appendleft( + ErredTask( + ts.key, + time(), + ts.erred_on.copy(), + exception_text or "", + traceback_text or "", ) + ) - for dts in ts.dependents: - dts.exception_blame = failing_ts - recommendations[dts.key] = "erred" - - for dts in ts.dependencies: - dts.waiters.discard(ts) - if not dts.waiters and not dts.who_wants: - recommendations[dts.key] = "released" + for dts in ts.dependents: + dts.exception_blame = failing_ts + recommendations[dts.key] = "erred" - ts.waiters.clear() # do anything with this? + for dts in ts.dependencies: + dts.waiters.discard(ts) + if not dts.waiters and not dts.who_wants: + recommendations[dts.key] = "released" - ts.state = "erred" + ts.waiters.clear() # do anything with this? - report_msg = { - "op": "task-erred", - "key": key, - "exception": failing_ts.exception, - "traceback": failing_ts.traceback, - } - cs: ClientState - for cs in ts.who_wants: - client_msgs[cs.client_key] = [report_msg] + ts.state = "erred" - cs = self.clients["fire-and-forget"] - if ts in cs.wants_what: - self._client_releases_keys( - cs=cs, - keys=[key], - recommendations=recommendations, - ) + report_msg = { + "op": "task-erred", + "key": key, + "exception": failing_ts.exception, + "traceback": failing_ts.traceback, + } + cs: ClientState + for cs in ts.who_wants: + client_msgs[cs.client_key] = [report_msg] - if self.validate: - assert not ts.processing_on + cs = self.clients["fire-and-forget"] + if ts in cs.wants_what: + self._client_releases_keys( + cs=cs, + keys=[key], + recommendations=recommendations, + ) - return recommendations, client_msgs, worker_msgs - except Exception as e: - logger.exception(e) - if LOG_PDB: - import pdb + if self.validate: + assert not ts.processing_on - pdb.set_trace() - raise + return recommendations, client_msgs, worker_msgs def transition_no_worker_released(self, key, stimulus_id): - try: - ts: TaskState = self.tasks[key] - dts: TaskState - recommendations: dict = {} - client_msgs: dict = {} - worker_msgs: dict = {} - - if self.validate: - assert self.tasks[key].state == "no-worker" - assert not ts.who_has - assert not ts.waiting_on + ts: TaskState = self.tasks[key] + dts: TaskState + recommendations: dict = {} + client_msgs: dict = {} + worker_msgs: dict = {} - self.unrunnable.remove(ts) - ts.state = "released" + if self.validate: + assert self.tasks[key].state == "no-worker" + assert not ts.who_has + assert not ts.waiting_on - for dts in ts.dependencies: - dts.waiters.discard(ts) + self.unrunnable.remove(ts) + ts.state = "released" - ts.waiters.clear() + for dts in ts.dependencies: + dts.waiters.discard(ts) - return recommendations, client_msgs, worker_msgs - except Exception as e: - logger.exception(e) - if LOG_PDB: - import pdb + ts.waiters.clear() - pdb.set_trace() - raise + return recommendations, client_msgs, worker_msgs def transition_waiting_queued(self, key, stimulus_id): - try: - ts: TaskState = self.tasks[key] - recommendations: Recs = {} - client_msgs: dict = {} - worker_msgs: dict = {} - - if self.validate: - assert not self.idle_task_count, (ts, self.idle_task_count) - self._validate_ready(ts) + ts: TaskState = self.tasks[key] + recommendations: Recs = {} + client_msgs: dict = {} + worker_msgs: dict = {} - ts.state = "queued" - self.queued.add(ts) + if self.validate: + assert not self.idle_task_count, (ts, self.idle_task_count) + self._validate_ready(ts) - return recommendations, client_msgs, worker_msgs - except Exception as e: - logger.exception(e) - if LOG_PDB: - import pdb + ts.state = "queued" + self.queued.add(ts) - pdb.set_trace() - raise + return recommendations, client_msgs, worker_msgs def transition_waiting_no_worker(self, key, stimulus_id): - try: - ts: TaskState = self.tasks[key] - recommendations: Recs = {} - client_msgs: dict = {} - worker_msgs: dict = {} - - if self.validate: - self._validate_ready(ts) + ts: TaskState = self.tasks[key] + recommendations: Recs = {} + client_msgs: dict = {} + worker_msgs: dict = {} - ts.state = "no-worker" - self.unrunnable.add(ts) + if self.validate: + self._validate_ready(ts) - return recommendations, client_msgs, worker_msgs - except Exception as e: - logger.exception(e) - if LOG_PDB: - import pdb + ts.state = "no-worker" + self.unrunnable.add(ts) - pdb.set_trace() - raise + return recommendations, client_msgs, worker_msgs def transition_queued_released(self, key, stimulus_id): - try: - ts: TaskState = self.tasks[key] - recommendations: Recs = {} - client_msgs: dict = {} - worker_msgs: dict = {} - - if self.validate: - assert ts in self.queued - assert not ts.processing_on + ts: TaskState = self.tasks[key] + recommendations: Recs = {} + client_msgs: dict = {} + worker_msgs: dict = {} - self.queued.remove(ts) + if self.validate: + assert ts in self.queued + assert not ts.processing_on - self._propagate_released(ts, recommendations) - return recommendations, client_msgs, worker_msgs - except Exception as e: - logger.exception(e) - if LOG_PDB: - import pdb + self.queued.remove(ts) - pdb.set_trace() - raise + self._propagate_released(ts, recommendations) + return recommendations, client_msgs, worker_msgs def transition_queued_processing(self, key, stimulus_id): - try: - ts: TaskState = self.tasks[key] - recommendations: Recs = {} - client_msgs: dict = {} - worker_msgs: dict = {} - - if self.validate: - assert not ts.actor, f"Actors can't be queued: {ts}" - assert ts in self.queued + ts: TaskState = self.tasks[key] + recommendations: Recs = {} + client_msgs: dict = {} + worker_msgs: dict = {} - if ws := self.decide_worker_rootish_queuing_enabled(): - self.queued.discard(ts) - worker_msgs = self._add_to_processing(ts, ws) - # If no worker, task just stays `queued` + if self.validate: + assert not ts.actor, f"Actors can't be queued: {ts}" + assert ts in self.queued - return recommendations, client_msgs, worker_msgs - except Exception as e: - logger.exception(e) - if LOG_PDB: - import pdb + if ws := self.decide_worker_rootish_queuing_enabled(): + self.queued.discard(ts) + worker_msgs = self._add_to_processing(ts, ws) + # If no worker, task just stays `queued` - pdb.set_trace() - raise + return recommendations, client_msgs, worker_msgs def _remove_key(self, key): ts: TaskState = self.tasks.pop(key) @@ -2897,84 +2775,68 @@ def _remove_key(self, key): def transition_memory_forgotten(self, key, stimulus_id): ws: WorkerState - try: - ts: TaskState = self.tasks[key] - recommendations: dict = {} - client_msgs: dict = {} - worker_msgs: dict = {} - - if self.validate: - assert ts.state == "memory" - assert not ts.processing_on - assert not ts.waiting_on - if not ts.run_spec: - # It's ok to forget a pure data task - pass - elif ts.has_lost_dependencies: - # It's ok to forget a task with forgotten dependencies - pass - elif not ts.who_wants and not ts.waiters and not ts.dependents: - # It's ok to forget a task that nobody needs - pass - else: - assert 0, (ts,) + ts: TaskState = self.tasks[key] + recommendations: dict = {} + client_msgs: dict = {} + worker_msgs: dict = {} - if ts.actor: - for ws in ts.who_has: - ws.actors.discard(ts) + if self.validate: + assert ts.state == "memory" + assert not ts.processing_on + assert not ts.waiting_on + if not ts.run_spec: + # It's ok to forget a pure data task + pass + elif ts.has_lost_dependencies: + # It's ok to forget a task with forgotten dependencies + pass + elif not ts.who_wants and not ts.waiters and not ts.dependents: + # It's ok to forget a task that nobody needs + pass + else: + assert 0, (ts,) - self._propagate_forgotten(ts, recommendations, worker_msgs, stimulus_id) + if ts.actor: + for ws in ts.who_has: + ws.actors.discard(ts) - client_msgs = _task_to_client_msgs(ts) - self._remove_key(key) + self._propagate_forgotten(ts, recommendations, worker_msgs, stimulus_id) - return recommendations, client_msgs, worker_msgs - except Exception as e: - logger.exception(e) - if LOG_PDB: - import pdb + client_msgs = _task_to_client_msgs(ts) + self._remove_key(key) - pdb.set_trace() - raise + return recommendations, client_msgs, worker_msgs def transition_released_forgotten(self, key, stimulus_id): - try: - ts: TaskState = self.tasks[key] - recommendations: dict = {} - client_msgs: dict = {} - worker_msgs: dict = {} - - if self.validate: - assert ts.state in ("released", "erred") - assert not ts.who_has - assert not ts.processing_on - assert ts not in self.queued - assert not ts.waiting_on, (ts, ts.waiting_on) - if not ts.run_spec: - # It's ok to forget a pure data task - pass - elif ts.has_lost_dependencies: - # It's ok to forget a task with forgotten dependencies - pass - elif not ts.who_wants and not ts.waiters and not ts.dependents: - # It's ok to forget a task that nobody needs - pass - else: - assert 0, (ts,) + ts: TaskState = self.tasks[key] + recommendations: dict = {} + client_msgs: dict = {} + worker_msgs: dict = {} - self._propagate_forgotten(ts, recommendations, worker_msgs, stimulus_id) + if self.validate: + assert ts.state in ("released", "erred") + assert not ts.who_has + assert not ts.processing_on + assert ts not in self.queued + assert not ts.waiting_on, (ts, ts.waiting_on) + if not ts.run_spec: + # It's ok to forget a pure data task + pass + elif ts.has_lost_dependencies: + # It's ok to forget a task with forgotten dependencies + pass + elif not ts.who_wants and not ts.waiters and not ts.dependents: + # It's ok to forget a task that nobody needs + pass + else: + raise AssertionError("Unreachable", str(ts)) # pragma: nocover - client_msgs = _task_to_client_msgs(ts) - self._remove_key(key) + self._propagate_forgotten(ts, recommendations, worker_msgs, stimulus_id) - return recommendations, client_msgs, worker_msgs - except Exception as e: - logger.exception(e) - if LOG_PDB: - import pdb + client_msgs = _task_to_client_msgs(ts) + self._remove_key(key) - pdb.set_trace() - raise + return recommendations, client_msgs, worker_msgs # { # (start, finish):