Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix critical race condition in graceful shutdown #8522

Merged
merged 9 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7091,12 +7091,11 @@ async def retire_workers(
If neither ``workers`` nor ``names`` are provided, we call
``workers_to_close`` which finds a good set.
close_workers: bool (defaults to False)
Whether or not to actually close the worker explicitly from here.
Otherwise we expect some external job scheduler to finish off the
worker.
Whether to actually close the worker explicitly from here.
Otherwise, we expect some external job scheduler to finish off the worker.
remove: bool (defaults to True)
Whether or not to remove the worker metadata immediately or else
wait for the worker to contact us.
Whether to remove the worker metadata immediately or else wait for the
worker to contact us.

If close_workers=False and remove=False, this method just flushes the tasks
in memory out of the workers and then returns.
Expand Down
30 changes: 30 additions & 0 deletions distributed/tests/test_nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -956,3 +956,33 @@ async def test_nanny_plugin_register_nanny_killed(c, s, restart):
finally:
proc.kill()
assert await register == {}


@pytest.mark.slow
@gen_cluster(
client=True,
Worker=Nanny,
nthreads=[("", 1)],
worker_kwargs={"heartbeat_interval": "10ms"},
)
async def test_nanny_does_not_restart_worker_on_graceful_retirement(c, s, a):
"""Tests https://github.com/dask/distributed/pull/8522

Some clusters (e.g. SpecCluster) implement downscaling by calling
`Scheduler.retire_workers()` without arguments, which defaults to
`remove=True, close_workers=False`.

and then use an external system to tear down the worker and the nanny. In these
cases, make sure that the worker doesn't kill itself and that the nanny doesn't
restart it after the heartbeat to the scheduler fails.
"""
await s.retire_workers([a.worker_address], stimulus_id="test")
# On Linux, it takes ~3.5s for the nanny to resuscitate a worker
await asyncio.sleep(5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will either be missed entirely on github actions or will make the test very flaky. If you are waiting for some condition to occur, please wait until that condition did occur. Just having a plain sleep in here is not sufficient. Besides, this makes it also much harder to understand the test logic.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will either be missed entirely on github actions

Yes, getting a false negative on GH actions is a possibility.

or will make the test very flaky.

No, because if GH is slower than my local machine it will simply not test the use case properly and return false negative.

If you are waiting for some condition to occur,

No, I am waiting for a condition NOT to occur.

I will rewrite the unit test to check for the heartbeat stop. It will no longer verify that heartbeat does not call close().

assert not s.workers
events = [
ev
for _, ev in s.events["all"]
if isinstance(ev, dict) and ev.get("action") == "add-worker"
]
assert len(events) == 1
26 changes: 19 additions & 7 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1272,12 +1272,24 @@
self._update_latency(end - start)

if response["status"] == "missing":
# Scheduler thought we left. Reconnection is not supported, so just shut down.
logger.error(
f"Scheduler was unaware of this worker {self.address!r}. Shutting down."
)
# Something is out of sync; have the nanny restart us if possible.
await self.close(nanny=False)
Comment on lines -1279 to -1280
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I read your description correctly, this is the bug. what does it even mean that "something is out of sync"? In which cases would be want this worker to be restarted like this?

The heartbeats only start once the worker is registered to the scheduler, see

await self._register_with_scheduler()
self.start_periodic_callbacks()

I don't see what kind of "desync" would justify a restart and adding more complexity to this logic feels like trouble. Your test also passes if we just shut down the nanny as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is designed to verify that the worker is not accidentally restarted after it's retired. If something kills off the worker and the nanny, it will not perturb the test.

In which cases would be want this worker to be restarted like this?

I cannot come up with use cases. I'll remove the branch and see if anything breaks.

# Scheduler thought we left.
# Reconnection is not supported, so just shut down.

if self.status == Status.closing_gracefully:
# Called Scheduler.retire_workers(remove=True, close_workers=False)
# The worker will remain indefinitely in this state, unknown to the
# scheduler, until something else shuts it down.
# Stopping the heartbeat is just a nice-to-have to reduce
# unnecessary warnings on the scheduler log.
logger.info("Stopping heartbeat to the scheduler")
self.periodic_callbacks["heartbeat"].stop()
else:
logger.error(
f"Scheduler was unaware of this worker {self.address!r}. "
"Shutting down."
)
# Have the nanny restart us if possible
await self.close(nanny=False, reason="worker-heartbeat-missing")
return

self.scheduler_delay = response["time"] - middle
Expand All @@ -1290,7 +1302,7 @@
logger.exception("Failed to communicate with scheduler during heartbeat.")
except Exception:
logger.exception("Unexpected exception during heartbeat. Closing worker.")
await self.close()
fjetter marked this conversation as resolved.
Show resolved Hide resolved
await self.close(reason="worker-heartbeat-error")

Check warning on line 1305 in distributed/worker.py

View check run for this annotation

Codecov / codecov/patch

distributed/worker.py#L1305

Added line #L1305 was not covered by tests
raise

@fail_hard
Expand Down
Loading