From 67bb4bf80e29fe99bc7e98b8d8b1aebe515398ff Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 22 Feb 2024 15:08:37 +0000 Subject: [PATCH] Race condition on graceful shutdown --- distributed/scheduler.py | 9 ++++----- distributed/worker.py | 39 ++++++++++++++++++++++++++++++++------- 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index bebfbc2109..8bf58a3a03 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7091,12 +7091,11 @@ async def retire_workers( If neither ``workers`` nor ``names`` are provided, we call ``workers_to_close`` which finds a good set. close_workers: bool (defaults to False) - Whether or not to actually close the worker explicitly from here. - Otherwise we expect some external job scheduler to finish off the - worker. + Whether to actually close the worker explicitly from here. + Otherwise, we expect some external job scheduler to finish off the worker. remove: bool (defaults to True) - Whether or not to remove the worker metadata immediately or else - wait for the worker to contact us. + Whether to remove the worker metadata immediately or else wait for the + worker to contact us. If close_workers=False and remove=False, this method just flushes the tasks in memory out of the workers and then returns. diff --git a/distributed/worker.py b/distributed/worker.py index 55dd5a7724..f4c1226557 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1272,12 +1272,37 @@ async def heartbeat(self) -> None: self._update_latency(end - start) if response["status"] == "missing": - # Scheduler thought we left. Reconnection is not supported, so just shut down. - logger.error( - f"Scheduler was unaware of this worker {self.address!r}. Shutting down." - ) - # Something is out of sync; have the nanny restart us if possible. - await self.close(nanny=False) + # Scheduler thought we left. + # Reconnection is not supported, so just shut down. + + if self.status == Status.closing_gracefully: + # There is a race condition during graceful shutdown where the + # Scheduler has already called remove_worker but the Nanny has not + # yet received instruction to shut down the worker. This could lead + # to a situation where the worker over-eagerly shuts itself down and + # then the nanny restarts it. + # + # Below we're calling close(nanny=True), but that's just for the + # sake of safety if anything happens to the Scheduler->Nanny comms + # that doesn't also affect the Worker->Nanny comms and that for some + # reason let the Worker->Scheduler->Worker round-trip of the + # heartbeat go through. + # If we don't account for this edge case, here we could just return. + logger.info( + f"Worker {self.address!r} is still retiring, but the scheduler " + "has already forgotten it. Forcing shutdown." + ) + await self.close( + nanny=True, + reason="worker-heartbeat-missing-while-closing-gracefully", + ) + else: + logger.error( + f"Scheduler was unaware of this worker {self.address!r}. " + "Shutting down." + ) + # Have the nanny restart us if possible + await self.close(nanny=False, reason="worker-heartbeat-missing") return self.scheduler_delay = response["time"] - middle @@ -1290,7 +1315,7 @@ async def heartbeat(self) -> None: logger.exception("Failed to communicate with scheduler during heartbeat.") except Exception: logger.exception("Unexpected exception during heartbeat. Closing worker.") - await self.close() + await self.close(reason="worker-heartbeat-error") raise @fail_hard