Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Errors when scaling up cluster no longer propagate to client side #8309

Open
wence- opened this issue Oct 27, 2023 · 10 comments
Open

Errors when scaling up cluster no longer propagate to client side #8309

wence- opened this issue Oct 27, 2023 · 10 comments

Comments

@wence-
Copy link
Contributor

wence- commented Oct 27, 2023

Describe the issue:

Since #8233, scaling up a cluster with a worker plugin that produces an error no longer propagates that error onto the client side. Creating such a cluster with a non-zero number of workers initially does produce an error.

Is this intended behaviour? And if so, is there a new blessed way of propagating errors when scaling up a cluster?

Minimal Complete Verifiable Example:

Running with distributed 2023.10.0-21-gb62b7005

from distributed import LocalCluster

class ErroringPlugin:
    def setup(self, worker=None):
        raise RuntimeError()

if __name__ == "__main__":
    # This does not error
    with LocalCluster(n_workers=0, plugins={ErroringPlugin()}) as cluster:
        cluster.scale(1)
        cluster.sync(cluster._correct_state)

    # This does
    with LocalCluster(n_workers=1, plugins={ErroringPlugin()}) as cluster:
        pass
@fjetter
Copy link
Member

fjetter commented Oct 27, 2023

How was that error propagated to the client before? Cluster.scale typically works asynchronously in the background and does not block so it is difficult to raise exceptions.

@fjetter
Copy link
Member

fjetter commented Oct 27, 2023

So, the code snippet you are posting shows what I would expect to happen. I'm surprised to hear that this worked differently before

@wence-
Copy link
Contributor Author

wence- commented Oct 27, 2023

I think it propagated through the cluster.sync(cluster._correct_state) call. At least without that, I don't see any errors raised (even before #8233 )

@fjetter
Copy link
Member

fjetter commented Oct 27, 2023

I think it propagated through the cluster.sync(cluster._correct_state) call.

That makes sense. What exactly is your expectation? The public API did not change so I assume you are internally using _correct_state yourself?

@wence-
Copy link
Contributor Author

wence- commented Oct 27, 2023

What exactly is your expectation?

I expect:

    with LocalCluster(n_workers=0, plugins={ErroringPlugin()}) as cluster:
        cluster.scale(1)
        cluster.sync(cluster._correct_state)

To raise a RuntimeError. With current main it does not.

@wence-
Copy link
Contributor Author

wence- commented Oct 27, 2023

I think I have a fix. The cluster's workers dict is updated before we wait on the futures that are booting the new workers. So _correct_state thinks it has succeeded (because the workers dict is "correct" even though the newly created worker is non-functional).

@fjetter
Copy link
Member

fjetter commented Oct 27, 2023

To raise a RuntimeError. With current main it does not.

I would like us to find a way for public API to make sense for you. I wouldn't even want to guarantee that _correct_state continues to exist mid term (e.g. #5919)

I does sound like you want a public version cluster.scale_sync or cluster.scale(..., block=True) that blocks until the cluster is in the expected state to raise exceptions and ensure the state is indeed how it is expected.

@wence-
Copy link
Contributor Author

wence- commented Oct 27, 2023

Ah, thanks. That issue does capture the kind of things I was observing.

I does sound like you want a public version cluster.scale_sync or cluster.scale(..., block=True) that blocks until the cluster is in the expected state to raise exceptions and ensure the state is indeed how it is expected.

Yes, I think so. Rationale for this is that in dask-cuda when we boot the cluster we have to do things with the worker spec to ensure that each worker binds to the appropriate GPU (if there is more than one GPU on a node). To do this
start the cluster with n_workers=0 and then scale up to the correct target number with the cluster.scale, cluster.sync pattern.

In this scenario, we'd like to be able to report failures back to the user immediately. I think cluster.scale(..., block=True) would be OK, or even just a public blessed way of waiting for a scale up to complete and reporting any errors that occurred.

I agree that it makes sense that scale is not synchronous, and that moreover you might want to ignore exceptions booting some workers (since perhaps you just want "about 50" and don't care if you only got 48).

@fjetter
Copy link
Member

fjetter commented Oct 27, 2023

OK, or even just a public blessed way of waiting for a scale up to complete and reporting any errors that occurred.

I believe the "wait for scaleup" is typically done with Client.wait_for_workers and Cluster.wait_for_workers but this is detached from the actual scale request and I don't believe it is capturing any exceptions.

even just a public blessed way of waiting

I don't have a strong preference of which API to use for this but I suggest to not use a method that is marked as internal with an underscore.

Are you interested in looking into a public version of this?

@wence-
Copy link
Contributor Author

wence- commented Oct 30, 2023

Are you interested in looking into a public version of this?

I had a first minimal go at restoring the old behaviour (in #8314) and will endeavour to do something more sensible (after some annual leave, so in about two weeks). IIUC, I think an appropriate fix would be to address (at least some of) the problems raised in #5919. Since just moving scale + _correct_state behind a scale(..., block=True) flag would merely advertise the availability of an API, and not address underlying problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants