-
-
Notifications
You must be signed in to change notification settings - Fork 722
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DNM] P2P shuffle skeleton - scheduler plugin #5524
[DNM] P2P shuffle skeleton - scheduler plugin #5524
Conversation
1df7a61
to
b56d15d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beyond whether or not we want to use the scheduler plugin, I like the code in this more than #5520. Overall the style is a little simpler, which is a nicer starting point to build other things off of. #5520 feels slightly over-engineered in comparison, with the ShuffleExtension
containing Shuffle
s containing ShuffleMetadata
s.
The problem with sibling shuffles noted at the end of test_graph.py
also will probably require more complex task-traversing logic on the scheduler to pick the right worker per shuffle. That makes me think we'll have a scheduler plugin regardless, whether it offers an RPC or sets the restrictions itself.
), f"Removed {id}, which still has data for output partitions {list(data)}" | ||
|
||
async def get_shuffle(self, id: ShuffleId) -> ShuffleState: | ||
"Get the `ShuffleState`, blocking until it's been received from the scheduler." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fjetter mentioned that this whole method and waiting_for_metadata
may be unnecessary. Since messages from scheduler to workers remain ordered in BatchedSend
(and TCP preserves ordering), we can probably count on the shuffle_init
hitting the worker before the add_partition
does, so long as we trust the transition logic of our plugin.
del self.output_keys[k] | ||
|
||
def transition(self, key: str, start: str, finish: str, *args, **kwargs): | ||
"Watch transitions for keys we care about" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having this run for every key is not ideal. I've tried to make it return as fast as possible for irrelevant keys.
# FIXME this feels very hacky/brittle. | ||
# For example, after `df.set_index(...).to_delayed()`, you could create | ||
# keys that don't have indices in them, and get fused (because they should!). | ||
m = re.match(r"\(.+, (\d+)\)$", key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is my least favorite part of the implementation. Keys are supposed to be opaque to the scheduler (as far as I understand); we're inferring a lot of meaning from them.
Parsing the IDs out for transfer
/barrier
is okay, because we control those key names when we generate the graph. The downstream tasks could theoretically be named anything though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- perf nitpick
pattern = re.compile("<pattern>") # global var
def foo(...):
pattern.match(key)
- I might want to see a unit test for this. I generally don't trust regular expressions, even though this one looks straight forward...
- I don't feel comfortable with using such a logic at all. I don't think we should parse keys to infer logic. We do control them but this is not a well defined API and it is very hard to control whether or not keys are mutated at some point in time. The fusing you mentioned is the best example. Any kind of optimization or reordering would have the potential to break this contract easily and I think such systems should be disentangled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- I figured
re
caching would handle it, but sure - Agreed a test would be good
- I don't like this logic at all. As noted though, I can't think of any other options right now to have shuffling still work in the face of output task fusion, since HLG annotations are somewhat broken.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's also important to understand that this key-parsing isn't here because this PR uses a scheduler plugin and #5520 doesn't.
This parsing is here because this PR supports Blockwise fusion of output tasks, and the other doesn't. Whether or not we used a scheduler plugin, the only way I can come up with right now to support fusion of output tasks is to parse key names.
Annotations are the ideal, proper way to deal with this. But if you use annotations, then fusion won't happen, so it defeats the purpose. Therefore, if there's no way to attach additional information to the graph (and there's no way to embed information in the tasks themselves and have it transmitted at runtime to the scheduler, because by the time the task is running, it's already too late), the only information you have is the key names.
prefix, group, id = parts | ||
|
||
if prefix == TASK_PREFIX: | ||
if start == "waiting" and finish in ("processing", "memory"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if start == "waiting" and finish in ("processing", "memory"): | |
if start == "waiting" and finish == "processing": |
It should be impossible for the transfer
task to go to memory
before we've taken some action, since that task will block on us (the plugin) broadcasting shuffle info to all workers.
And if the barrier task is going to memory, that's bad news, because the dependents are about to run and we need to set restrictions on them.
I was just concerned that transition_waiting_memory
exists. I'm not sure what triggers that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If in doubt, I prefer raising in these situations. If it is a valid case, we should know about it. Either way, an explicit exception protects us from corrupted state
All pass with dask/dask#8392. Rather crude; needs unit testing.
Surprisingly, blockwise decides to merge the two output layers. This really throws things off. The test passes right now by disabling an aggressive assertion, but we need more robust validation here.
Whenver I forget to switch to dask#5520, the errors are confusing.
See dask#5524 (comment). Since messages from scheduler to workers remain ordered in `BatchedSend` (and TCP preserves ordering), we should be able to count on the `shuffle_init` always hitting the worker before the `add_partition` does, so long as we trust the transition logic of our plugin.
b56d15d
to
04833a3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think hooking into the transition engine is fine. I'm not overly concerned about this but I'm not convinced about the other solutions/problems introduced here. Most notably how the worker restrictions are computed.
One different way coming to mind is that instead of worker restrictions, we could use resource restrictions. we could define every output partition as a unique resource. The shuffle init would then assign an output partition resource to every worker. The scheduler heuritics would then take care of the rest.
class ShuffleScheduler:
def transfer(self, id, key):
# Shuffle init part
shuffle_ix = 0
while shuffle_ix < npartition_out:
for w in self.scheduler.workers:
scheduler.add_resources(
worker=w,
resources={f"shuffle-{shuffle_ix}": 1},
)
shuffle_ix += 1
if shuffle_ix == npartition_out:
break
This would require us to encode this resource during graph construction but the total number of output partitions is known during graph construction so I don't see a conceptional problem, maybe a technical one, though. I'm not sure if we can assign resources in the blockwise layer. If not, adding this feature to to blockwise or dropping blockwise might be an option (is this actually a blockwise operation or are we just abusing this for some optimization hack??)
either way, arguing about resources and arguing about off-the-band output partitions feels semantically well aligned. Even when talking about task fusion, I would argue that whatever semantics should be true for resource should as well be true for the unpack tasks.
thoughts?
# TODO if these checks fail, we need to error the task! | ||
# Otherwise it'll still run, and maybe even succeed, but just produce wrong data? | ||
|
||
dts._worker_restrictions = restrictions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a scheduler API for this, isn't there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is, but it's a little bit more overhead when we already have the TaskState here:
distributed/distributed/scheduler.py
Lines 7168 to 7174 in d0b40d3
def set_restrictions(self, comm=None, worker=None): | |
ts: TaskState | |
for key, restrictions in worker.items(): | |
ts = self.tasks[key] | |
if isinstance(restrictions, str): | |
restrictions = {restrictions} | |
ts._worker_restrictions = set(restrictions) |
|
||
class ShuffleSchedulerPlugin(SchedulerPlugin): | ||
name: ClassVar[str] = "ShuffleSchedulerPlugin" | ||
output_keys: dict[str, ShuffleId] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this rather be something like dict[ShuffleId, Set[str]]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See:
distributed/distributed/shuffle/shuffle_scheduler.py
Lines 175 to 184 in 04833a3
# Task completed | |
if start in ("waiting", "processing") and finish in ( | |
"memory", | |
"released", | |
"erred", | |
): | |
try: | |
id = self.output_keys[key] | |
except KeyError: | |
return |
This lets us check which shuffle (if any) a given key is a part of.
Also note that this whole thing isn't actually necessary for the proper operation of a shuffle:
distributed/distributed/shuffle/shuffle_scheduler.py
Lines 114 to 120 in 04833a3
# Check if all output keys are done | |
# NOTE: we don't actually need this `unpack` step or tracking output keys; | |
# we could just delete the state in `barrier`. | |
# But we do it so we can detect duplicate shuffles, where a `transfer` task | |
# tries to reuse a shuffle ID that we're unpacking. | |
# (It does also allow us to clean up worker restrictions on error) |
I added it just for sanity checking and validation right now.
return addr | ||
|
||
|
||
def parse_key(key: str) -> list[str] | None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe premat optimization but I guess this should have at least an LRU cache
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that idea, but this is going to run on every single transition, so I think that cache would get blown out. We could implement an internal LRU cache just for positives I guess.
|
||
def transition(self, key: str, start: str, finish: str, *args, **kwargs): | ||
"Watch transitions for keys we care about" | ||
parts = parse_key(key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really like us parsing the key to do this logic. I would likely prefer us using a task attribute or similar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rather even consider us adopting how tasks are submitted to the scheduler. I'm not too familar with HLG but shouldn't it be somehow "easy" to tell the scheduler what keys should be considered for this? isn't there some way during unpacking to let the scheduler know that the keys are "special" such that we just keep a set of "shuffle keys" on board instead of parsing them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Annotations seem like the mechanism for this. Unfortunately there are two blockers to using them:
- Graph optimization loses annotations dask#7036
- Blockwise fusion does not fuse across layers with different annotations. We need blockwise fusion to happen on the output tasks to prevent Workers run twice as many root tasks as they should, causing memory pressure #5223. Maybe we'd need some meta-annotation about whether an annotation can be fused?
So yes, it should be easy, but it will require a bit of fixing and changing blockwise for it to actually be easy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also thinking about getting rid of this transition-watching logic entirely, and having transfer
tasks call an RPC on the scheduler to register themselves and get the list of peer workers (which would then also cause the scheduler to broadcast a message to all workers telling them the shuffle has started). This would eliminate the need for parse_key
entirely. The only keys we'd have to parse are the output keys (for reasons mentioned in https://github.com/dask/distributed/pull/5524/files#r765203083). That would be more of a hybrid approach with #5520.
prefix, group, id = parts | ||
|
||
if prefix == TASK_PREFIX: | ||
if start == "waiting" and finish in ("processing", "memory"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If in doubt, I prefer raising in these situations. If it is a valid case, we should know about it. Either way, an explicit exception protects us from corrupted state
return self.erred(ShuffleId(id), key) | ||
|
||
# Task completed | ||
if start in ("waiting", "processing") and finish in ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition feels brittle. What motivates the selective start states?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking the start state is probably unnecessary. All we really care about is that the key is in a terminal state.
# FIXME this feels very hacky/brittle. | ||
# For example, after `df.set_index(...).to_delayed()`, you could create | ||
# keys that don't have indices in them, and get fused (because they should!). | ||
m = re.match(r"\(.+, (\d+)\)$", key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- perf nitpick
pattern = re.compile("<pattern>") # global var
def foo(...):
pattern.match(key)
- I might want to see a unit test for this. I generally don't trust regular expressions, even though this one looks straight forward...
- I don't feel comfortable with using such a logic at all. I don't think we should parse keys to infer logic. We do control them but this is not a well defined API and it is very hard to control whether or not keys are mutated at some point in time. The fusing you mentioned is the best example. Any kind of optimization or reordering would have the potential to break this contract easily and I think such systems should be disentangled
I like the spirit of this, because you're embedding some information at graph-generation time that lets you identify the partition number of an output task through something else than parsing its key. However, as noted in https://github.com/dask/distributed/pull/5524/files#r765203083 and #5524 (comment) this isn't possible right now:
Whether we use resource restriction annotations, or some custom shuffle annotation that then gets translated into worker restrictions, it's basically the same thing. (The scheduling mechanism for worker restrictions is slightly simpler than for resource restrictions, so I'd prefer sticking with that path.) It's all just a way of passing along auxiliary data so we have more information to go off of than just the key names.
This is definitely a proper blockwise optimization. And we really, really want subsequent blockwise optimizations to fuse onto it, otherwise we get root-task overproduction.
I don't think so. Consider df = dd.read_parquet(...)
df_pre = df.map_partitions(preprocess)
with dask.annotate(resources={"GPU": 1}):
inferred = df_pre.map_partitions(run_ml_model)
df_post = inferred.map_partitions(post_process) Without the resource annotations, this whole thing would Blockwise-fuse into one layer ( But our case would use resources in basically the same way: transfer = df.map_partitions(transfer)
barrier = delayed(barrier)(transfer)
with dask.annotate(resources={f"shuffle-{id}-{i}": 1 for i in range(transfer.npartitions)}):
unpack = dd.map_partitions(unpack, BlockwiseRange(transfer.npartitions), barrier)
downstream = unpack.map_partitions(user_code) However, we do want I think we'll end up needing something like |
Thanks for your replies. I don't think what we would like to do in this PR is actually very special. I could see similar applications down the road that require us to slightly change scheduling logic for specific keys. Therefore I think this PR is a nice example and it should be considered as a requirement for future iterations of the HLE/HLG/annotation engine. I'll have another peak at the other PR shortly but right now it feels like we should postpone the scheduler plugin until we have a more robust annotation / HLG / HLE engine. |
I think what you're really talking about is postponing handling of Blockwise fusion of the output tasks, since handling fused output tasks is what requires us to parse keys right now, and parsing keys is what makes it more reasonable to use a scheduler plugin, instead of RPCs to the scheduler from within tasks like the other PR (because the scheduler would have to send a bunch of key names to the task, which would then parse them and send them back to the scheduler—might as well just do that all on the scheduler and save the communication). I get this from a simplicity perspective, but I disagree for a few reasons:
(Also, the logic in the other PR is even more egregious than parsing keys, because it just generates the key names it expects based on the hardcoded name of the unpack function. This is highly brittle—just setting Another option
|
All this said, I could still see the rationale in merging the other PR first, then making this a new PR onto that (though the diff would be very hard to read) in order to have a place for more discussion about why we're moving logic to the scheduler. Or even for doing so incrementally. It would add a bit of extra work, but maybe that's worth it for the clearer process. |
We've decided to merge #5520 as the initial skeleton instead of this one. |
Alternative to #5520: a peer-to-peer shuffle skeleton, based on a scheduler plugin to handle much of the synchronization.
There are some hacky things, but generally I think this shows more promise than #5520. Things this can do that the other PR cannot:
This also doesn't match the design in #5435—in reality, a bit needed to change to work with the scheduler driving things—but overall, I like this design more than #5520. It also feels a little easier to build off of.
This needs unit tests, especially for the more exciting logic around concurrent shuffles, waiting for the scheduler, etc. But in the basic tests, it seems to shuffle correctly, including sequential and concurrent shuffles, and also passes
test_shuffle.py
.Since this design differs a little from #5435, here's a rough diagram of the flow:
cc @fjetter
pre-commit run --all-files