Skip to content

Commit

Permalink
Race condition on graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Feb 22, 2024
1 parent e6fe6f2 commit 67bb4bf
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 12 deletions.
9 changes: 4 additions & 5 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7091,12 +7091,11 @@ async def retire_workers(
If neither ``workers`` nor ``names`` are provided, we call
``workers_to_close`` which finds a good set.
close_workers: bool (defaults to False)
Whether or not to actually close the worker explicitly from here.
Otherwise we expect some external job scheduler to finish off the
worker.
Whether to actually close the worker explicitly from here.
Otherwise, we expect some external job scheduler to finish off the worker.
remove: bool (defaults to True)
Whether or not to remove the worker metadata immediately or else
wait for the worker to contact us.
Whether to remove the worker metadata immediately or else wait for the
worker to contact us.
If close_workers=False and remove=False, this method just flushes the tasks
in memory out of the workers and then returns.
Expand Down
39 changes: 32 additions & 7 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1272,12 +1272,37 @@ async def heartbeat(self) -> None:
self._update_latency(end - start)

if response["status"] == "missing":
# Scheduler thought we left. Reconnection is not supported, so just shut down.
logger.error(
f"Scheduler was unaware of this worker {self.address!r}. Shutting down."
)
# Something is out of sync; have the nanny restart us if possible.
await self.close(nanny=False)
# Scheduler thought we left.
# Reconnection is not supported, so just shut down.

if self.status == Status.closing_gracefully:
# There is a race condition during graceful shutdown where the
# Scheduler has already called remove_worker but the Nanny has not
# yet received instruction to shut down the worker. This could lead
# to a situation where the worker over-eagerly shuts itself down and
# then the nanny restarts it.
#
# Below we're calling close(nanny=True), but that's just for the
# sake of safety if anything happens to the Scheduler->Nanny comms
# that doesn't also affect the Worker->Nanny comms and that for some
# reason let the Worker->Scheduler->Worker round-trip of the
# heartbeat go through.
# If we don't account for this edge case, here we could just return.
logger.info(

Check warning on line 1291 in distributed/worker.py

View check run for this annotation

Codecov / codecov/patch

distributed/worker.py#L1291

Added line #L1291 was not covered by tests
f"Worker {self.address!r} is still retiring, but the scheduler "
"has already forgotten it. Forcing shutdown."
)
await self.close(

Check warning on line 1295 in distributed/worker.py

View check run for this annotation

Codecov / codecov/patch

distributed/worker.py#L1295

Added line #L1295 was not covered by tests
nanny=True,
reason="worker-heartbeat-missing-while-closing-gracefully",
)
else:
logger.error(
f"Scheduler was unaware of this worker {self.address!r}. "
"Shutting down."
)
# Have the nanny restart us if possible
await self.close(nanny=False, reason="worker-heartbeat-missing")
return

self.scheduler_delay = response["time"] - middle
Expand All @@ -1290,7 +1315,7 @@ async def heartbeat(self) -> None:
logger.exception("Failed to communicate with scheduler during heartbeat.")
except Exception:
logger.exception("Unexpected exception during heartbeat. Closing worker.")
await self.close()
await self.close(reason="worker-heartbeat-error")
raise

@fail_hard
Expand Down

0 comments on commit 67bb4bf

Please sign in to comment.