-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Worker assignment for split-shuffle tasks #4962
Comments
If shuffle split tasks are not blacklisted from work stealing, this can have catastrophic effects on performance. See also dask#4962
If shuffle split tasks are not blacklisted from work stealing, this can have catastrophic effects on performance. See also dask#4962
I'll try this on #4925 and see if it changes anything. Theoretically, if our estimate of the costs from I wonder if
We could also add a similar heuristic of "don't move really fast tasks" to #4925, and only consider dependency-less workers if the task's average duration is > 5ms, though the above penalties would probably enforce something like that in practice. |
So #4925 makes this really bad 😓 As expected, the I added this to diff --git a/distributed/scheduler.py b/distributed/scheduler.py
index 5d10e55a..340c228c 100644
--- a/distributed/scheduler.py
+++ b/distributed/scheduler.py
@@ -7525,6 +7525,9 @@ def decide_worker(
for ws in candidates:
break
else:
+ if "split-shuffle" in ts.key and len(all_workers) > 1:
+ objectives = dict(sorted(((ws.name, objective(ws)[0]) for ws in candidates), key=lambda kv: kv[1]))
+ print({wws.name for dts in deps for wws in dts._who_has}, objectives)
ws = min(candidates, key=objective)
return ws scaling to 10 workers
{0} {13: 0.08823027999999998, 5: 0.08823027999999998, 17: 0.08823027999999998, 10: 0.08823027999999998, 4: 0.08823027999999998, 0: 20.733723118831517}
{0} {5: 0.08823027999999998, 17: 0.08823027999999998, 10: 0.08823027999999998, 4: 0.08823027999999998, 13: 0.1769286684050997, 0: 20.733723118831517}
{0} {17: 0.08823027999999998, 10: 0.08823027999999998, 4: 0.08823027999999998, 13: 0.1769286684050997, 5: 0.1769286684050997, 0: 20.733723118831517}
... So the dependency lives on worker Then the new workers get going, and the tasks are still moved, but we see that it's a much closer competition (~.1 sec): ...
{1} {13: 0.5331303904522746, 12: 0.5544517075890987, 7: 0.5560758518534369, 18: 0.5593241403821131, 11: 0.5593241403821131, 14: 0.5609482846464513, 15: 0.5741733246531499, 2: 0.5971187860543121, 8: 0.6003670745829883, 19: 0.6037313060576612, 1: 0.6307006389845927}
{1} {12: 0.5544517075890987, 7: 0.5560758518534369, 18: 0.5593241403821131, 11: 0.5593241403821131, 14: 0.5609482846464513, 15: 0.5741733246531499, 2: 0.5971187860543121, 8: 0.6003670745829883, 19: 0.6037313060576612, 13: 0.6135921145896869, 1: 0.6307006389845927}
{1} {7: 0.5560758518534369, 18: 0.5593241403821131, 11: 0.5593241403821131, 14: 0.5609482846464513, 15: 0.5741733246531499, 2: 0.5971187860543121, 8: 0.6003670745829883, 19: 0.6037313060576612, 13: 0.6135921145896869, 1: 0.6307006389845927, 12: 0.6349134317265109}
... And actually most of the rest looks like this, where we don't transfer the task: {10} {10: 1.215171096590307, 5: 1.3515733953241507, 15: 1.3865404377693613, 9: 1.4441341760252921, 8: 1.4619327413785106, 3: 1.4698834858064056, 16: 1.6275828667341654, 12: 1.6849934314041595, 7: 1.7309365546100495}
{10} {10: 1.2156094609378076, 5: 1.3515733953241507, 19: 1.5010363349941767, 16: 1.6275828667341654, 18: 1.717431345264229, 6: 1.7251503801086199, 7: 1.7309365546100495, 14: 1.7380236908036772}
...
{13} {13: 0.9894290725058903, 19: 1.225090831846512, 7: 1.2347089909794402, 6: 1.2792799217482789, 15: 1.2866044296804542, 17: 1.306529883194976, 8: 1.3235009598130638, 0: 7.246138411365685}
{13} {13: 0.9899081580390164, 1: 1.0576689393300158, 4: 1.146884504580281, 5: 1.1907214280860343, 19: 1.225090831846512, 3: 1.22564715003349, 7: 1.2347089909794402, 8: 1.3235009598130638}
...
{1} {1: 0.8129096572085968, 5: 1.1697874239600665, 9: 1.1919375910934176, 13: 1.213012706371855, 19: 1.2389138104501627, 3: 1.2394701286371408, 2: 1.2717672574518704, 6: 1.2931029003519297, 8: 1.3373239384167146}
{1} {1: 0.8132188095246882, 4: 1.1607074831839317, 9: 1.1919375910934176, 7: 1.248531969583091, 11: 1.2651144899098326, 2: 1.2717672574518704, 8: 1.3373239384167146, 18: 1.424596210464172, 14: 1.449497343373671} I also added this to the end of your script to print the total number of transfers by key: logs = client.run(lambda dask_worker: dask_worker.incoming_transfer_log)
transfer_counts = collections.Counter(
distributed.utils.key_split(k)
for log in logs.values()
for entry in log
for k in entry["keys"]
)
print(transfer_counts) Remember these are the keys being transferred, so we expect
So we could easily make #4925 ignore tasks with a very low average duration, and probably that would solve this issue. But I'm more curious about why transferring |
Can we check to see if the size of the intermediate results are large?
These are dicts or tuples. It may be that we're misjudging them, resulting
in too-frequent steals.
…On Wed, Jun 23, 2021 at 4:08 PM Gabe Joseph ***@***.***> wrote:
So #4925 <#4925> makes this
really bad 😓
As expected, the worker_objective estimate makes it look like at best a
toss-up—and in some cases, like a no-brainer—to transfer the the
group-shuffle key and run split-shuffle on a different worker.
I added this to decide_worker on my branch to get a log of the decisions:
diff --git a/distributed/scheduler.py b/distributed/scheduler.py
index 5d10e55a..340c228c 100644
--- a/distributed/scheduler.py
+++ b/distributed/scheduler.py
@@ -7525,6 +7525,9 @@ def decide_worker(
for ws in candidates:
break
else:
+ if "split-shuffle" in ts.key and len(all_workers) > 1:
+ objectives = dict(sorted(((ws.name, objective(ws)[0]) for ws in candidates), key=lambda kv: kv[1]))
+ print({wws.name for dts in deps for wws in dts._who_has}, objectives)
ws = min(candidates, key=objective)
return ws
scaling to 10 workers
{0} {13: 0.08823027999999998, 5: 0.08823027999999998, 17: 0.08823027999999998, 10: 0.08823027999999998, 4: 0.08823027999999998, 0: 20.733723118831517}
{0} {5: 0.08823027999999998, 17: 0.08823027999999998, 10: 0.08823027999999998, 4: 0.08823027999999998, 13: 0.1769286684050997, 0: 20.733723118831517}
{0} {17: 0.08823027999999998, 10: 0.08823027999999998, 4: 0.08823027999999998, 13: 0.1769286684050997, 5: 0.1769286684050997, 0: 20.733723118831517}
...
So the dependency lives on worker 0, and we picked 13, 5, 17, etc.
instead. Scroll all the way to the right and you'll see that worker 0's
estimate is 20sec! (Because its occupancy is huge, since every task up
until now is already queued there). There's no baseline communication
penalty that would overcome that. And only looking at those metrics, it's
kinda hard to argue that we even *shouldn't* move those tasks. So we need
different metrics?
Then the new workers get going, and the tasks are still moved, but we see
that it's a much closer competition (~.1 sec):
...
{1} {13: 0.5331303904522746, 12: 0.5544517075890987, 7: 0.5560758518534369, 18: 0.5593241403821131, 11: 0.5593241403821131, 14: 0.5609482846464513, 15: 0.5741733246531499, 2: 0.5971187860543121, 8: 0.6003670745829883, 19: 0.6037313060576612, 1: 0.6307006389845927}
{1} {12: 0.5544517075890987, 7: 0.5560758518534369, 18: 0.5593241403821131, 11: 0.5593241403821131, 14: 0.5609482846464513, 15: 0.5741733246531499, 2: 0.5971187860543121, 8: 0.6003670745829883, 19: 0.6037313060576612, 13: 0.6135921145896869, 1: 0.6307006389845927}
{1} {7: 0.5560758518534369, 18: 0.5593241403821131, 11: 0.5593241403821131, 14: 0.5609482846464513, 15: 0.5741733246531499, 2: 0.5971187860543121, 8: 0.6003670745829883, 19: 0.6037313060576612, 13: 0.6135921145896869, 1: 0.6307006389845927, 12: 0.6349134317265109}
...
And actually most of the rest looks like this, where we don't transfer the
task:
{10} {10: 1.215171096590307, 5: 1.3515733953241507, 15: 1.3865404377693613, 9: 1.4441341760252921, 8: 1.4619327413785106, 3: 1.4698834858064056, 16: 1.6275828667341654, 12: 1.6849934314041595, 7: 1.7309365546100495}
{10} {10: 1.2156094609378076, 5: 1.3515733953241507, 19: 1.5010363349941767, 16: 1.6275828667341654, 18: 1.717431345264229, 6: 1.7251503801086199, 7: 1.7309365546100495, 14: 1.7380236908036772}
...
{13} {13: 0.9894290725058903, 19: 1.225090831846512, 7: 1.2347089909794402, 6: 1.2792799217482789, 15: 1.2866044296804542, 17: 1.306529883194976, 8: 1.3235009598130638, 0: 7.246138411365685}
{13} {13: 0.9899081580390164, 1: 1.0576689393300158, 4: 1.146884504580281, 5: 1.1907214280860343, 19: 1.225090831846512, 3: 1.22564715003349, 7: 1.2347089909794402, 8: 1.3235009598130638}
...
{1} {1: 0.8129096572085968, 5: 1.1697874239600665, 9: 1.1919375910934176, 13: 1.213012706371855, 19: 1.2389138104501627, 3: 1.2394701286371408, 2: 1.2717672574518704, 6: 1.2931029003519297, 8: 1.3373239384167146}
{1} {1: 0.8132188095246882, 4: 1.1607074831839317, 9: 1.1919375910934176, 7: 1.248531969583091, 11: 1.2651144899098326, 2: 1.2717672574518704, 8: 1.3373239384167146, 18: 1.424596210464172, 14: 1.449497343373671}
I also added this to the end of your script to print the total number of
transfers by key:
logs = client.run(lambda dask_worker: dask_worker.incoming_transfer_log)
transfer_counts = collections.Counter(
distributed.utils.key_split(k)
for log in logs.values()
for entry in log
for k in entry["keys"]
)
print(transfer_counts)
Remember these are the keys being transferred, so we expect split-shuffle
to be transferred a lot (it's the input to shuffle). group-shuffle is
what we don't want transferred.
Main:
Counter({'split-shuffle': 53458, 'dataframe-sum-chunk': 1021, 'shuffle': 477, 'getitem': 53, 'group-shuffle': 5, 'make-timeseries': 1})
Main with stealing fix (shuffle-split -> split-shuffle):
Counter({'split-shuffle': 45834, 'dataframe-sum-chunk': 1009, 'shuffle': 498, 'getitem': 81, 'group-shuffle': 1})
My PR:
Counter({'split-shuffle': 59427, 'group-shuffle': 3307, 'dataframe-sum-chunk': 762, 'shuffle': 505, 'getitem': 220, 'make-timeseries': 148})
So we could easily make #4925
<#4925> ignore tasks with a very
low average duration, and probably that would solve this issue. But I'm
more curious about why transferring split-shuffle is so much more
expensive in reality than the objective function estimates it's going to
be, and what we could do to make the objective function more accurate. If
we are going to do #4925 <#4925>,
a good objective function becomes very important.
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#4962 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTHFODU4Z736MBNMHDDTUJEMJANCNFSM47GEAVIQ>
.
|
I'd love to see what the computed size of all of the elements in the
TaskGroups are. Fortunately the recent taskgroup viz should have this
information
…On Wed, Jun 23, 2021 at 5:31 PM Matthew Rocklin ***@***.***> wrote:
Can we check to see if the size of the intermediate results are large?
These are dicts or tuples. It may be that we're misjudging them, resulting
in too-frequent steals.
On Wed, Jun 23, 2021 at 4:08 PM Gabe Joseph ***@***.***>
wrote:
> So #4925 <#4925> makes this
> really bad 😓
>
> As expected, the worker_objective estimate makes it look like at best a
> toss-up—and in some cases, like a no-brainer—to transfer the the
> group-shuffle key and run split-shuffle on a different worker.
>
> I added this to decide_worker on my branch to get a log of the decisions:
>
> diff --git a/distributed/scheduler.py b/distributed/scheduler.py
>
> index 5d10e55a..340c228c 100644
> --- a/distributed/scheduler.py
> +++ b/distributed/scheduler.py
> @@ -7525,6 +7525,9 @@ def decide_worker(
>
> for ws in candidates:
>
> break
>
> else:
> + if "split-shuffle" in ts.key and len(all_workers) > 1:
> + objectives = dict(sorted(((ws.name, objective(ws)[0]) for ws in candidates), key=lambda kv: kv[1]))
> + print({wws.name for dts in deps for wws in dts._who_has}, objectives)
>
> ws = min(candidates, key=objective)
>
> return ws
>
> scaling to 10 workers
>
> {0} {13: 0.08823027999999998, 5: 0.08823027999999998, 17: 0.08823027999999998, 10: 0.08823027999999998, 4: 0.08823027999999998, 0: 20.733723118831517}
>
> {0} {5: 0.08823027999999998, 17: 0.08823027999999998, 10: 0.08823027999999998, 4: 0.08823027999999998, 13: 0.1769286684050997, 0: 20.733723118831517}
>
> {0} {17: 0.08823027999999998, 10: 0.08823027999999998, 4: 0.08823027999999998, 13: 0.1769286684050997, 5: 0.1769286684050997, 0: 20.733723118831517}
>
> ...
>
> So the dependency lives on worker 0, and we picked 13, 5, 17, etc.
> instead. Scroll all the way to the right and you'll see that worker 0's
> estimate is 20sec! (Because its occupancy is huge, since every task up
> until now is already queued there). There's no baseline communication
> penalty that would overcome that. And only looking at those metrics, it's
> kinda hard to argue that we even *shouldn't* move those tasks. So we
> need different metrics?
>
> Then the new workers get going, and the tasks are still moved, but we see
> that it's a much closer competition (~.1 sec):
>
> ...
>
> {1} {13: 0.5331303904522746, 12: 0.5544517075890987, 7: 0.5560758518534369, 18: 0.5593241403821131, 11: 0.5593241403821131, 14: 0.5609482846464513, 15: 0.5741733246531499, 2: 0.5971187860543121, 8: 0.6003670745829883, 19: 0.6037313060576612, 1: 0.6307006389845927}
>
> {1} {12: 0.5544517075890987, 7: 0.5560758518534369, 18: 0.5593241403821131, 11: 0.5593241403821131, 14: 0.5609482846464513, 15: 0.5741733246531499, 2: 0.5971187860543121, 8: 0.6003670745829883, 19: 0.6037313060576612, 13: 0.6135921145896869, 1: 0.6307006389845927}
>
> {1} {7: 0.5560758518534369, 18: 0.5593241403821131, 11: 0.5593241403821131, 14: 0.5609482846464513, 15: 0.5741733246531499, 2: 0.5971187860543121, 8: 0.6003670745829883, 19: 0.6037313060576612, 13: 0.6135921145896869, 1: 0.6307006389845927, 12: 0.6349134317265109}
>
> ...
>
> And actually most of the rest looks like this, where we don't transfer
> the task:
>
> {10} {10: 1.215171096590307, 5: 1.3515733953241507, 15: 1.3865404377693613, 9: 1.4441341760252921, 8: 1.4619327413785106, 3: 1.4698834858064056, 16: 1.6275828667341654, 12: 1.6849934314041595, 7: 1.7309365546100495}
>
> {10} {10: 1.2156094609378076, 5: 1.3515733953241507, 19: 1.5010363349941767, 16: 1.6275828667341654, 18: 1.717431345264229, 6: 1.7251503801086199, 7: 1.7309365546100495, 14: 1.7380236908036772}
>
> ...
>
> {13} {13: 0.9894290725058903, 19: 1.225090831846512, 7: 1.2347089909794402, 6: 1.2792799217482789, 15: 1.2866044296804542, 17: 1.306529883194976, 8: 1.3235009598130638, 0: 7.246138411365685}
>
> {13} {13: 0.9899081580390164, 1: 1.0576689393300158, 4: 1.146884504580281, 5: 1.1907214280860343, 19: 1.225090831846512, 3: 1.22564715003349, 7: 1.2347089909794402, 8: 1.3235009598130638}
>
> ...
>
> {1} {1: 0.8129096572085968, 5: 1.1697874239600665, 9: 1.1919375910934176, 13: 1.213012706371855, 19: 1.2389138104501627, 3: 1.2394701286371408, 2: 1.2717672574518704, 6: 1.2931029003519297, 8: 1.3373239384167146}
>
> {1} {1: 0.8132188095246882, 4: 1.1607074831839317, 9: 1.1919375910934176, 7: 1.248531969583091, 11: 1.2651144899098326, 2: 1.2717672574518704, 8: 1.3373239384167146, 18: 1.424596210464172, 14: 1.449497343373671}
>
> I also added this to the end of your script to print the total number of
> transfers by key:
>
> logs = client.run(lambda dask_worker: dask_worker.incoming_transfer_log)
>
> transfer_counts = collections.Counter(
>
> distributed.utils.key_split(k)
>
> for log in logs.values()
>
> for entry in log
>
> for k in entry["keys"]
>
> )
>
> print(transfer_counts)
>
> Remember these are the keys being transferred, so we expect split-shuffle
> to be transferred a lot (it's the input to shuffle). group-shuffle is
> what we don't want transferred.
>
> Main:
>
> Counter({'split-shuffle': 53458, 'dataframe-sum-chunk': 1021, 'shuffle': 477, 'getitem': 53, 'group-shuffle': 5, 'make-timeseries': 1})
>
>
>
> Main with stealing fix (shuffle-split -> split-shuffle):
>
> Counter({'split-shuffle': 45834, 'dataframe-sum-chunk': 1009, 'shuffle': 498, 'getitem': 81, 'group-shuffle': 1})
>
>
>
> My PR:
>
> Counter({'split-shuffle': 59427, 'group-shuffle': 3307, 'dataframe-sum-chunk': 762, 'shuffle': 505, 'getitem': 220, 'make-timeseries': 148})
>
>
> So we could easily make #4925
> <#4925> ignore tasks with a very
> low average duration, and probably that would solve this issue. But I'm
> more curious about why transferring split-shuffle is so much more
> expensive in reality than the objective function estimates it's going to
> be, and what we could do to make the objective function more accurate. If
> we are going to do #4925 <#4925>,
> a good objective function becomes very important.
>
> —
> You are receiving this because you are subscribed to this thread.
> Reply to this email directly, view it on GitHub
> <#4962 (comment)>,
> or unsubscribe
> <https://github.com/notifications/unsubscribe-auth/AACKZTHFODU4Z736MBNMHDDTUJEMJANCNFSM47GEAVIQ>
> .
>
|
I looked into the error distribution of our sizeof estimations on the group results. For this I patched dask with diff --git a/dask/dataframe/shuffle.py b/dask/dataframe/shuffle.py
index a2195b0d..2f439d6f 100644
--- a/dask/dataframe/shuffle.py
+++ b/dask/dataframe/shuffle.py
@@ -787,6 +787,25 @@ def shuffle_group_get(g_head, i):
def shuffle_group(df, cols, stage, k, npartitions, ignore_index, nfinal):
+ res = _shuffle_group(df, cols, stage, k, npartitions, ignore_index, nfinal)
+ size = sizeof(res)
+ size_actual = 0
+ for df in res.values():
+ size_actual += df.memory_usage(deep=True, index=True).sum()
+ if size_actual:
+ logger.critical(
+ {
+ "dask_sizeof": size,
+ "pandas_memory_usage": size_actual,
+ "diff": size - size_actual,
+ "diff_relative": size / size_actual,
+ "nsplits": len(res),
+ }
+ )
+ return res
+
+
+def _shuffle_group(df, cols, stage, k, npartitions, ignore_index, nfinal):
"""Splits dataframe into groups
The group is determined by their final partition, and which stage we are in and parsed the resulting logs. I didn't let the entire run finish (at some point my log pipe broke :)) but I don't think the statistics wouldn't have changed much The test dataframe we're running on is made up of about 1k partitions with slightly below 8MB of data per partition. As expected, the size of the group statistics don't deviate a lot from this since I was only measuring roughly the first half of the run. I'm primarily interested in our sizeof error. Note that the splits are dicts as Matt already indicated and this particular test run turned out to use 11 intermediate splits. our dask_sizeof will start sampling at 10 so we're in the soft sampling range. What we can see from these results already is that, on average, we are overestimating the size which is typically not a bad thing and should work in our favour. The distribution is skewed to the right with a very long tail (tail not shown in the histogram below) since underestimating would be our biggest problem, I looked at the lower tail and most of the severe under estimations stem from groupings with empty splits. the sampling would then pick the empty dataframes only and in the worst case scenarios ignore all splits with actual payload data. This case is shown in row four of the following screenshot since the value 23199 corresponds to a dict with empty dataframes I will try to correlate the lower tail with the stealing decisions and will perform a few experiments with disabled sampling to see if this impacts the shuffle performance significantly. On a related note: I'm now wondering how often we typically have empty splits. I'm currently on the fence thinking this is a systematic problem since we're reusing the same number of splits every stage (and not even necessarily a prime number) and this is causing systematic hash/bucket collisions. the other side of me is wondering if this is a feature we abuse to make our staged shuffle algorithm work. I'm inclined to think the algorithm should work as long as we use the same number of splits per stage but I haven't put enough thought into it to be sure. If somebody else has thoughts on this, please stop me from going down the rabbit hole :) |
A much shorter rabbit hole, courtesy of @ncclementi I was able to see which of the groups was of dict type with the python icon. Then I looked at the memory label to see that it was similar to the others, around 250MB. The whole thing took less than a minute :) |
That's really nice. The dashboard doesn't show unsampled size, though, and that's what I wanted to compare since I am suspecting the sampling to be our problem |
Ah, I was concerned that it would come back as 28 bytes or something similar. I agree with your assessment that if it's overcounting (or counting anywhere close to normal) then we should be ok. |
I patched a few things to also collect the actual size when a task is finished. In fact, I collected both and submitted it to the scheduler and logged all stealing metric calculation. In belows table you can find the result of I collected about 136k of these events and all where eligible for stealing (compute time larger than 5ms, cost_multiplier below 100) That's way too much and I see constant stealing on the dashboard, maybe related, for now I'll ignore this since this analysis focuses on the size measurement. I am defining the relative difference as the ratio of actual to sampled costs. Note that this is the inverse definition of above but this is much more descriptive. In particular, this means that for values larger than one, we are underestimating the actual cost or in other words, the actual cost is higher. Big numbers are bad. One is perfect. Smaller than one is OK. In the last histogram (y-axis is log scale) we can clearly see that there are a few tasks which are much more costly in reality than what the sampled size suggest. While statistics still confirms that we are on average overestimating (which is good), there are a few very extreme cases where we're underestimating In particular, all underestimated tasks are splits Well, this shows at least that the splits are systematically affected by wrong measurements. I'm not convinced that this is the entire story. The transfer costs still seem to be too small to actually prohibit transfer. I'll collect a bit more data |
My gut says "check to make sure that sizes aren't in the kB range, and then forget then this line of inquiry" |
The incorrect measurement of the splits has two significant impacts which magnify the stealing impact. Above, I was ignoring the long tail of overestimating sizes but this actually plays a role when measuring bandwidths causing us to overestimate our bandwidth. that can amplify errors in the stealing calculation. (perf reports incoming; preview 300s vs 900s) On top of this, I noticed that we are systematically overestimating our bandwidth since we only account for measurement points with data above 1MiB. I guess this cut was built in to not introduce bias due to comm overhead. Below you can see the
I will follow up with a PR to Regarding the size measurements of the shuffle splits, here are perf reports with sampling (runtime ~900s)This is the default without sampling (runtime ~300s)In this example, I'm simply iterating over all splits with |
The size calculation for shuffle group results is very sensitive to sampling since there may be empty splits skewing the result. See also dask/distributed#4962
I don't think that sizeof comes up frequently in profiles, I'd be comfortable increasing the scale here. It sounds like a specific/non-general fix would be to increase the size of the maximum samples to above the size of our default splitting in dask.datafrrame.shufle. I'll admit though that I am surprised that getting ten samples out of the dict's values isn't enough to get a decent understanding of the size. |
In my analysis, I've seen groupby split results where 10 out of 11 splits where empty and all data was in that one split. As I mentioned above, I am suspecting that this is a systematic problem in our algorithm but I haven't had time to dive into this further. See dask/dask#7834 |
The size calculation for shuffle group results is very sensitive to sampling since there may be empty splits skewing the result. See also dask/distributed#4962
A dumb hack , but maybe we can do the following? diff --git a/dask/sizeof.py b/dask/sizeof.py
index 570b6251..a4075750 100644
--- a/dask/sizeof.py
+++ b/dask/sizeof.py
@@ -46,15 +46,15 @@ def sizeof_array(o):
def sizeof_python_collection(seq):
num_items = len(seq)
num_samples = 10
- if num_items > num_samples:
+ if num_items > 32:
if isinstance(seq, (set, frozenset)):
# As of Python v3.9, it is deprecated to call random.sample() on
# sets but since sets are unordered anyways we can simply pick
# the first `num_samples` items.
- samples = itertools.islice(seq, num_samples)
+ samples = itertools.islice(seq, 10)
else:
- samples = random.sample(seq, num_samples)
- return getsizeof(seq) + int(num_items / num_samples * sum(map(sizeof, samples)))
+ samples = random.sample(seq, 10)
+ return getsizeof(seq) + int(num_items / 10 * sum(map(sizeof, samples)))
else:
return getsizeof(seq) + sum(map(sizeof, seq)) We tend not to allow more than 32 splits by default (and I've never seen anyone increase beyond this default). This doesn't solve the general problem, but solves the specific one. |
Yeah, so this is interesting and maybe something that we should look into. Maybe we're running into a case where the unknown task duration is incorrect, and so lots of tiny tasks are piling up on a worker and giving us a surprisingly high cost? It looks like we have the following in our default config distributed:
scheduler:
default-task-durations:
shuffle-split: 1us This should probably be switched around. So maybe a bunch of split tasks land on some worker, and then future shuffling operations on that worker seem like they're not going to happen for a long time, so the scheduler decides to make a copy. |
FWIW dask/dask#7834 resolves the problem. It's also kind of a hack but very targeted. All measurements for bandwidth and size get much better with that fix and the system behaves much more stable. Increasing the sampling threshold would be equally possible.
Note that the default duration is also wrong similar to #4964 I opened dask/dask#7844 for visibility since this issue is currently spread over too many PRs/issues |
…#7834) The size calculation for shuffle group results is very sensitive to sampling since there may be empty splits skewing the result. See also dask/distributed#4962
@gjoseph92 now that a couple of Florian's PRs are in around avoiding stealing and properly assessing size, would it makes sense to try this again? |
FYI: the avoid stealing PR never left the draft stage #4920 I wanted to wait for gabes changes and reevaluate sizing is much better now, though. that has a big impact. another thing is that the behaviour will be different based on the size of tasks you're shuffling. I expect large-ish partitions to perform better due to better bandwidth measurement but I haven't tested this thoroughly, yet. |
TL:DR
We currently allow for stealing of
split-shuffle
tasks. That's a really bad idea...Intro
Our distributed shuffle algorithm currently employs three kinds of tasks which are repeated a few times in multiple stages
group-shuffle
split-shuffle
shuffle
where
group-shuffle
performs an actual groupby per chunk and outputs this to a tuple or list.split-shuffle
performs simply agetitem
on that list. It is therefore incredibly cheap and benefits greatly from data locality. Not only is a replica ofgroup-shuffle
useless, the dependentshuffle
can usually not benefit greatly from data locality itself. therefore moving asplit-shuffle
or assigning it to a suboptimal worker is a horrible decision.Problem
We currently have two mechanisms which (re-)assign a task to a worker
A) decide_worker
This is the first stop for a task and we typically choose a worker which holds a dependency for a task. However, we are currently opening this logic to allow for more flexible assignments #4925 In particular we'll include idling workers to the list which exposes us to the risk of moving
split-shuffle
tasks to the wrong worker, iiuc.cc @gjoseph92
B) Work stealing
There are two mechanisms in work stealing which serve to protect us from assigning tasks like
split-shuffle
to the wrong worker.B1) There is a dedicated blacklist for this specific task, see
distributed/distributed/stealing.py
Line 455 in 9f4165a
The problem, as you can see, is that the blacklisted task prefix
shuffle-split
the reverse of the actual name is. This is a regression which was introduced at some point in time (I remember fixing this one or two years ago; I apparently wrote a bad unit test :) )B2) There is a heuristic which blacklists tasks which are computing below 5ms or are relatively costly to move compared to their compute time, see
distributed/distributed/stealing.py
Lines 120 to 126 in 9f4165a
I'm currently investigating an issue where this logic might need to be removed or relaxed, see #4920 and #4471
which would remove the last layer of protection.
To see how bad this can become, I executed a shuffle computation on my local machine, see perf report below. My OS kernel seemed to be pretty busy with process management such that it slowed down everyhing. Great, this led to the split to be whitelisted for a steal and caused incredibly costly traffic costs.
https://gistcdn.githack.com/fjetter/c444f51e378331fb70aa8dd09d66ff63/raw/80bc5796687eccfa3ad328b63cae81f97bd70b4b/gh4471_dataframe_shuffle_main.html
Future
Short term, I'll add the
split-shuffle
to the blacklist of tasks to be stolen. However, we should ensure that for both the decide_worker and for future work stealing this is respected. I would like to come up with a heuristic to identify a task like this. Whilesplit-shuffle
s are probably the most prominent examples (and maybe most impactful) examples for this, I would imagine that this pattern is not unusual.cc @madsbk I beleive you have been looking into shuffle operations in the past and might be interested in this
Related PRs contributing to a fix
decide_worker
#4925The text was updated successfully, but these errors were encountered: