Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SpecCluster correct state can cause inconsistencies #5919

Open
fjetter opened this issue Mar 9, 2022 · 2 comments
Open

SpecCluster correct state can cause inconsistencies #5919

fjetter opened this issue Mar 9, 2022 · 2 comments
Labels
bug Something is broken

Comments

@fjetter
Copy link
Member

fjetter commented Mar 9, 2022

There are a bunch of problems in SpecCluster._correct_state_internal such that I believe it should be rewritten

  • If one worker fails during startup, all workers are rejected. This can cause the cluster to spin up too many workers
  • Similarly, while closing, if one worker fails but others properly shut down, they are not removed from the internal state
  • correct_state_internal is only called once, i.e. any kind of exception would abort the entire up/downscaling without further attempt to correct the state.
  • If the cluster is closing while correct_state is running, nothing is actually cancelled.
  • self._correct_state_waiting is actually never cancelled.
  • SpecCluster.scale schedules a callback to self _correct_state. This can cause all sorts of race conditions, e.g. by creating more futures even if the cluster is already closing

This list probably continues long and most issues could be addressed individually but I believe we're better off rewriting this section.

cc @graingert

@fjetter fjetter added the bug Something is broken label Mar 9, 2022
@graingert
Copy link
Member

graingert commented Mar 9, 2022

If one worker fails during startup, all workers are rejected. This can cause the cluster to spin up too many workers

it's actually sort of worse - the workers get 2 chances to startup, one (concurrently) in asyncio.wait(workers) and another (sequentially) in await w # for tornado gen.coroutine support

await asyncio.wait(workers)
for w in workers:
w._cluster = weakref.ref(self)
await w # for tornado gen.coroutine support

I think this is supposed to be more like

tasks = [asyncio.create_task(worker) for worker in workers]
await asyncio.wait(tasks)
for t in tasks:
    w = await t
    w._cluster = weakref.ref(self)

@fjetter
Copy link
Member Author

fjetter commented Mar 30, 2022

Xref #4606

wence- added a commit to wence-/distributed that referenced this issue Oct 30, 2023
Prior to dask#8233, when correcting state after calling Cluster.scale, we
would wait until all futures had completed before updating the mapping
of workers that we knew about. This meant that failure to boot a
worker would propagate from a message on the worker side to an
exception on the cluster side. With dask#8233 this order was changed, so
that the workers we know about are updated before checking if the
worker successfully booted. With this change, any exception is not
propagated from the worker to the cluster, and so we cannot easily
tell if scaling our cluster was successful. While _correct_state has
issues (see dask#5919) until we can fix this properly, at least restore
the old behaviour of propagating any raised exceptions to the cluster.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken
Projects
None yet
Development

No branches or pull requests

2 participants