Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix critical race condition in graceful shutdown #8522

Merged
merged 9 commits into from
Feb 23, 2024

Conversation

crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented Feb 22, 2024

Executive summary

Coiled, as well as other dask clusters that can dynamically shrink down while there's data on the cluster, can accidentally lose vital data when shrinking. This will cause large parts of a computation to restart from scratch; scattered data will be lost forever, causing any computation that relies on it to fail.

Impact

Any cluster that calls SpecCluster.scale to shrink down while there is data on the workers; e.g.

  1. because the computation is finished, but the client didn't retrieve the output yet
  2. because the computation features an initial highly parallel phase followed by a long sequential "tail" that can't saturate the workers.

Use case

Coiled is a cloud cluster which, to my understanding, handles scaling down as follows:

  1. it receives instruction to scale down, either manually or from adaptive
  2. it chooses workers to shut down
  3. it calls Scheduler.retire_workers(addresses, close_workers=False, remove=True)
  4. this in turn triggers graceful worker retirement, a.k.a. AMM RetireWorkers, which changes the workers to status=closing_gracefully and copies over to other workers all unique data they contain. The recipient workers must have status=running.
  5. once a worker being retired no longer contains any unique data AMM RetireWorkers ends and Scheduler.retire_workers calls Scheduler.remove_worker(close=False)
  6. a bespoke SchedulerPlugin installed by the cloud manager, or any other kind of event listener on the scheduler, is informed that the worker has been removed from the scheduler
  7. the cloud manager notices that the worker is on the list that itself asked to retire on step 3, so it shuts down the EC2/GCP/whatever instance running the worker.

This PR fixes a critical race condition that happens if there's a few seconds delay between steps 6 and 7:

6.1. the worker, which is still in status=closing_gracefully, heartbeats the scheduler
6.2 the scheduler responds {status: missing}: "I don't know you and I have no memory of you".
6.3 the worker interprets this as an irrecoverable desync (e.g. the scheduler process may have been restarted), so it shuts itself down
6.4 the nanny notices the worker process died and restarts it
6.5 a new worker from the same host connects to the scheduler, with status=running
6.6 this brand new worker is noticed and immediately used by Active Memory Manager (AMM), which in the meantime is evacuating data from other workers on the cluster (AMM reconsiders the list of recipients every 2 seconds).
6.7 AMM instructs the new worker process to gather some data from other workers being retired
6.8 the new worker obliges and acquires a replica of the data
6.9 the other workers are removed from the cluster. The born-again worker now is the sole holder of important task outputs, or even worse scattered data

...and now we go back to step 7 from the bullet points above. The cloud manager finally reacts to the old worker being removed from the cluster, and implements the shutdown of the EC2 instance where the new worker is holding precious data.
All computed tasks on it are lost and need to be recomputed.
All scattered data on it is lost and cannot be recovered without the client sending it again explicitly.

Aside

Note that the above process is not the only way to retire a worker.
One could alternatively call Scheduler.retire_workers(addresses, close_workers=True, remove=True). This would safely shut down the worker process and send the nanny to status=close_gracefully, which will prevent it from restarting the worker process.
I'm not too happy that there are multiple ways to shut down a worker; I guess this has to do with the possibility that one may not use a nanny at all and use a different, bespoke watchdog instead.

Reproducer

import coiled

@coiled.function()
def foo(x):
    import time
    time.sleep(30)
    return x+1


X = foo.map(range(2000))

The above code starts 344 workers and, once there are 2000 tasks in memory, shrinks down to 1 worker, moving all the task outputs (None's) to the only survivor.
It never finishes because of the above race condition. Halfway through the cluster shrink down, tasks in memory are lost and are shunted back to queued state, which causes the cluster to enlarge again.

The issue disappears when the peak number of workers gets lower; this happens if you send either less than 2000 tasks or spend less than 30s per task. I could only reproduce it with cluster.adapt(minimum=1, maximum=500); for some reason I failed to reproduce it when manually scaling down with cluster.scale(1).

@ntabris
@jrbourbeau best effort for 2024.2.1

Copy link
Contributor

github-actions bot commented Feb 22, 2024

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

    27 files  ± 0      27 suites  ±0   9h 54m 53s ⏱️ + 1m 1s
 3 995 tests ± 0   3 883 ✅ ± 0    110 💤 ±0  2 ❌ ±0 
50 241 runs  +18  47 946 ✅ +24  2 293 💤  - 6  2 ❌ ±0 

For more details on these failures, see this check.

Results for commit d967330. ± Comparison against base commit 1211e79.

This pull request removes 1 and adds 1 tests. Note that renamed tests count towards both.
distributed.tests.test_worker ‑ test_heartbeat_missing_restarts
distributed.tests.test_worker ‑ test_heartbeat_missing_doesnt_restart

♻️ This comment has been updated with latest results.

@crusaderky crusaderky force-pushed the heartbeat-close-gracefully branch from ca62f3a to 67bb4bf Compare February 22, 2024 15:08
@crusaderky
Copy link
Collaborator Author

crusaderky commented Feb 22, 2024

I think I found the code that calls retire_workers:

await self.scheduler_comm.retire_workers(workers=list(to_close))

The above uses the default parameters close_workers=False, remove=True.

I think it's not great that there are two different places in distributed.deploy that call retire_workers, and that they differ from each other:

await self.scheduler.retire_workers(
names=workers,
remove=True,
close_workers=True,
)

As explained in my previous post, however, without the certainty that everyone that's using a SpecCluster is also using nannies I'm not comfortable changing spec.py.

@crusaderky crusaderky marked this pull request as ready for review February 22, 2024 18:25
@crusaderky crusaderky requested a review from fjetter as a code owner February 22, 2024 18:25
@crusaderky crusaderky force-pushed the heartbeat-close-gracefully branch 3 times, most recently from 2f69e25 to 3d53fec Compare February 22, 2024 18:52
@crusaderky crusaderky force-pushed the heartbeat-close-gracefully branch from 3d53fec to 0e329dd Compare February 22, 2024 18:54
@crusaderky crusaderky added p1 Affects a large population and inhibits work adaptive All things relating to adaptive scaling bug Something is broken labels Feb 22, 2024
Comment on lines -1279 to -1280
# Something is out of sync; have the nanny restart us if possible.
await self.close(nanny=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I read your description correctly, this is the bug. what does it even mean that "something is out of sync"? In which cases would be want this worker to be restarted like this?

The heartbeats only start once the worker is registered to the scheduler, see

await self._register_with_scheduler()
self.start_periodic_callbacks()

I don't see what kind of "desync" would justify a restart and adding more complexity to this logic feels like trouble. Your test also passes if we just shut down the nanny as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is designed to verify that the worker is not accidentally restarted after it's retired. If something kills off the worker and the nanny, it will not perturb the test.

In which cases would be want this worker to be restarted like this?

I cannot come up with use cases. I'll remove the branch and see if anything breaks.

Comment on lines 980 to 981
# On Linux, it takes ~3.5s for the nanny to resuscitate a worker
await asyncio.sleep(5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will either be missed entirely on github actions or will make the test very flaky. If you are waiting for some condition to occur, please wait until that condition did occur. Just having a plain sleep in here is not sufficient. Besides, this makes it also much harder to understand the test logic.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will either be missed entirely on github actions

Yes, getting a false negative on GH actions is a possibility.

or will make the test very flaky.

No, because if GH is slower than my local machine it will simply not test the use case properly and return false negative.

If you are waiting for some condition to occur,

No, I am waiting for a condition NOT to occur.

I will rewrite the unit test to check for the heartbeat stop. It will no longer verify that heartbeat does not call close().

@fjetter
Copy link
Member

fjetter commented Feb 23, 2024

I'm not too happy that there are multiple ways to shut down a worker; I guess this has to do with the possibility that one may not use a nanny at all and use a different, bespoke watchdog instead.

I would be very surprised if this was intentional.

@fjetter
Copy link
Member

fjetter commented Feb 23, 2024

As explained in my previous post, however, without the certainty that everyone that's using a SpecCluster is also using nannies I'm not comfortable changing spec.py.

I don't get what this has to do with nannies. This entire system should work without reliance on nannies. the only purpose of a nanny is to shut down a worker if it breaches the configured memory limit and restart it. beyond that, the nanny shut just do what it's been told, i.e. if the scheduler asks the worker to shut down the nanny must oblige

@crusaderky
Copy link
Collaborator Author

crusaderky commented Feb 23, 2024

the only purpose of a nanny is to shut down a worker if it breaches the configured memory limit and restart it. beyond that, the nanny shut just do what it's been told, i.e. if the scheduler asks the worker to shut down the nanny must oblige

Not quite. The nanny has three purposes:

  1. to restart a (potentially unresponsive) worker upon Client.restart()
  2. to restart a worker that died unexpectedly; namely after
  • segmentation fault
  • termination by system-wide OOM killer
  • ulimits / resources exceeded
  • other, more exotic use cases
  1. to watch the worker memory and restart it if it exceeds its limit.

Only the first task is performed upon the scheduler's command; the other two happen autonomously.

I don't get what this has to do with nannies. This entire system should work without reliance on nannies.

If you change spec.py to call retire_workers(close_worker=True) on a cluster that doesn't use nannies, then you need to ask yourself what is there instead of a nanny? If, namely, you have a watchdog that just restarts the worker process when it terminates, then you have a problem. If the watchdog just prints a very ominous error message somewhere upon unexpected worker death, it's also undesirable to close the worker this way.

I would be very surprised if this was intentional.

The design is certainly overcomplicated. There are some benefits to it; namely a cluster manager could RPC into the worker and do some stuff. I would love to simplify it to a hardcoded close_workers=True, remove=True, but that's a fair amount of work that's out of scope for this PR and with a high risk of breaking third-party clusters. At the very least I would not do it without community engagement and an explicit OK from the dask-kubernetes people.

@crusaderky
Copy link
Collaborator Author

@fjetter All review comments have been addressed.

Comment on lines 1274 to 1276
# This happens after one calls Scheduler.remove_worker(close=False).
# ***DO NOT call close()!***
# Read: https://github.com/dask/distributed/pull/8522
Copy link
Collaborator Author

@crusaderky crusaderky Feb 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no longer a unit test that makes sure that future developers don't re-add self.close(nanny=False) and cause the nanny to restart the worker.
Let us hope that whoever touches this code next takes the time to read and understand the race condition that this PR resolves.

@fjetter
Copy link
Member

fjetter commented Feb 23, 2024

Sorry but I'm having a hard time to follow. The above description is very long and somewhat off topic. Why are we not closing a worker and its nanny if the scheduler says it's missing/unknown? The bug I'm seeing is the nanny=False. Disabling the heartbeat without closing it creates a zombie worker. This can't be the right behavior.

@crusaderky
Copy link
Collaborator Author

for the follow-up,

Why are we not closing a worker and its nanny if the scheduler says it's missing/unknown?

Because if the worker does not use a nanny then we do not know what is there instead of a nanny
Namely, it could be a watchdog that simply says "if the worker dies, surely something went wrong and I must restart it".
In this case, calling retire_workers() without flags would simply restart the workers.

@crusaderky crusaderky force-pushed the heartbeat-close-gracefully branch from fb6d2c4 to 1b4e6d2 Compare February 23, 2024 12:54
@crusaderky
Copy link
Collaborator Author

crusaderky commented Feb 23, 2024

I can now see three possible outcomes of this PR:

  1. as per my initial commit: the change is selective for closing_gracefully. There will be zombie workers if the cluster manager doesn't kill the instances afterwards. Note that this changes from a behaviour where there are resurrected workers instead. Before and after the PR, the worker is alive in the end; the difference is that it doesn't restart and reconnect anymore.
  2. as of the current state: no more special casing based on worker status. The impact is more widespread and carries a risk of creating zombie workers if a desync happens - but neither of us can think of a use case for a desync.
  3. call self.close(nanny=True) at all times. This makes no sense to me, as a simpler solution that would yield the exact same effect would be to remove the close=False option from the scheduler side. At that point, we can remove the whole "missing" message, hard assert on the scheduler side that the worker must be known, and remove the close_workers=False and retire=False options from retire_workers and remove_worker. This would effectively close Eliminate partially-removed-worker state on scheduler (comms open, state removed) #6390.
    It would be by far the cleanest design but it carries the risk of breaking adaptive deployments that don't use nannies. It is unfeasible to deliver in time for tonight's release.
  4. either (1) or (2) today, and then (3) as a follow-up in the next release.

@fjetter fjetter mentioned this pull request Feb 23, 2024
4 tasks
@fjetter
Copy link
Member

fjetter commented Feb 23, 2024

I did some research into the git history to explain why that close statement was/is there. This was intentionally added in #6505 to fix #6494 #6494 (comment) mentions this explicitly and suggest to not close the nanny in this situation

@fjetter
Copy link
Member

fjetter commented Feb 23, 2024

Since then a lot has changed about how we deal internally with the status of our servers and I'm not sure if this condition will come back since there was no proper test added for it.

@crusaderky
Copy link
Collaborator Author

Since then a lot has changed about how we deal internally with the status of our servers and I'm not sure if this condition will come back since there was no proper test added for it.

Client.restart seems to call Nanny.kill, not remove_worker, so it should not be an issue.

# Close non-Nanny workers. We have no way to restart them, so we just let them go,
# and assume a deployment system is going to restart them for us.
await asyncio.gather(
*(
self.remove_worker(address=addr, stimulus_id=stimulus_id)
for addr in self.workers
if addr not in nanny_workers
)
)
logger.debug("Send kill signal to nannies: %s", nanny_workers)
async with contextlib.AsyncExitStack() as stack:
nannies = await asyncio.gather(
*(
stack.enter_async_context(
rpc(nanny_address, connection_args=self.connection_args)
)
for nanny_address in nanny_workers.values()
)
)
start = monotonic()
resps = await asyncio.gather(
*(
wait_for(
# FIXME does not raise if the process fails to shut down,
# see https://github.com/dask/distributed/pull/6427/files#r894917424
# NOTE: Nanny will automatically restart worker process when it's killed
nanny.kill(
reason="scheduler-restart",
timeout=timeout,
),
timeout,
)
for nanny in nannies
),
return_exceptions=True,
)

@crusaderky
Copy link
Collaborator Author

@fjetter PR has been changed to call close(nanny=True) and is ready for review again.

@fjetter fjetter merged commit a61e92e into dask:main Feb 23, 2024
31 of 34 checks passed
@crusaderky crusaderky deleted the heartbeat-close-gracefully branch February 23, 2024 23:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
adaptive All things relating to adaptive scaling bug Something is broken p1 Affects a large population and inhibits work
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants