-
-
Notifications
You must be signed in to change notification settings - Fork 722
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Shuffle service resilience #6105
Comments
Scheduler plugins already support all these events. Why not just use that interface? In practice, there's just as much horrible horrible freedom with plugins as extensions, since they also give you access to the The only upside I see to extensions is that they generally have at-most-one semantics, whereas plugins don't assume they're singletons. An interface change we could consider is allowing plugins to return recommendations after these events. (In fact I once made this change a few years ago in a private fork for similar reasons.) This would be a slightly more well-structured way for plugins to affect things than calling Also, your benchmark doesn't include the function call (because |
Most extensions won't have such a method, but you're right it would be good
to include it on of them. A plugin might make sense though. It's
certainly nicer in that it doesn't require any new changes to core.
…On Mon, Apr 11, 2022 at 7:19 PM Gabe Joseph ***@***.***> wrote:
Scheduler plugins already support all these events. Why not just use that
interface? In practice, there's just as much horrible horrible freedom with
plugins as extensions, since they also give you access to the Scheduler
object to do whatever you want with. That's why I used a plugin in #5524
<#5524> instead of an extension:
https://github.com/dask/distributed/pull/5524/files#diff-bbcf2e505bf2f9dd0dc25de4582115ee4ed4a6e80997affc7b22122912cc6591R191-R194
The only upside I see to extensions is that they generally have
at-most-one semantics, whereas plugins don't assume they're singletons.
An interface change we could consider is allowing plugins to return
recommendations after these events. (In fact I once made this change a few
years ago in a private fork for similar reasons.) This would be a slightly
more well-structured way for plugins to affect things than calling
transition or manipulating state directly.
Also, your benchmark doesn't include the function call (because object()
has no attribute 'transition'). Adding that in more than doubles the time
on my machine (533µs -> 1.42ms). When you consider that that function might
actually... do something (😱) (mostly just to check there's nothing
serious to do), I might actually worry a little about adding that overhead
on every transition.
—
Reply to this email directly, view it on GitHub
<#6105 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTDPG5E4QV667VYFCO3VES6PXANCNFSM5TETLZBA>
.
You are receiving this because you authored the thread.Message ID:
***@***.***>
|
@gjoseph92 do you see obvious flaws with the plan above? Do you thnk that it would do the job? (although possibly not as nicely as desired) |
A couple of questions/comments mostly for my understanding
Is this something like the following...
df = ...
x = df.shuffle(on="x")
y = x.partitions[x.npartitions//2].persist()
sleep(0.1)
z = x.persist() This currently fails and there are actually a couple of different error cases depending on how data is distributed on the cluster. (Sometimes an assert, sometimes a KeyError; wouldn't be surprised if there is something else) That's basically
y = x.persist()
z = y.partitions[y.npartitions // 2].persist()
del y which currently does not clean up stuff / close an existing shuffle properly. This is already written in Did I understand these error cases properly? |
Based on my (bad) memory, yes. |
In many cases I think that the short term answer is
If we get good at both then that gives us a not ideal but totally workable approach, I think. |
I would suggest a slightly different approach
If we look at this example df = ...
x = df.shuffle(on="x")
y = x.partitions[x.npartitions//2].persist()
sleep(0.1)
z = x.persist() I suggest that
This is pretty easy if we just define a
|
In #5520 and #5976 and #6007 we've started a shuffle service. This has better memory characteristics, but is not resilient. In particular, it can break in a few ways:
There are a few ways to solve this problem. One way I'd like to discuss here is opening up scheduler events to extensions, and letting them trigger transitions. In particular both scenarios 1 and 2 can be handled by letting the extension track
remove_worker
andupdate_graph
events and restart all shuffle tasks if an output-holding worker dies, or if any of the existing shuffles occur in a new graph. Scenario 3 can be handled by letting the extension tracktransition
events, and clean things up when the barrier task transitions out of memory.So far, I think that this can solve all of the resilience issues in shuffling (at least everything I've come across). However, it introduces two possible concerns:
1 - Scheduler performance
Maybe it doesn't make sense for every transition to cycle through every extension to see if they care about transitions.
This doesn't seem to be that expensive in reality
So around 40ns per extension per transition. It's well under a microsecond for all of our extensions.
2 - Complexity
Now any extension can inject transitions. Horrible horrible freedom!
My guess is that this is ok as long as we maintain some hygiene around, for example, always using the trasitions system, rather than mucking about with state directly
This is also a somewhat systemic change for what is, today, a single feature.
cc @gjoseph92 @fjetter for feedback
The text was updated successfully, but these errors were encountered: