Skip to content

Commit

Permalink
Explicit-comms-shuffle: fine control of task scheduling (#1025)
Browse files Browse the repository at this point in the history
In shuffle, use `Client.submit()` to control where tasks are executed and release temporary dataframes ASAP.

#### Context
In the final step in explicit-comms shuffle, we call `getitem()` to extract the final dataframe partitions from the result of the local shuffle. In some cases, these `getitem()` tasks would not run on the worker that ran the local shuffle, which would result in extra communication and spilling.
We now use `submit(..., worker=...)` to make sure that the worker running the local shuffle also runs the `getitem()` task afterwards.

Is it possible to do this without the use of `submit()` to avoid the overhead of creating a `Future` for each dataframe partition?

Authors:
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - Peter Andreas Entschev (https://github.com/pentschev)
  - Lawrence Mitchell (https://github.com/wence-)

URL: #1025
  • Loading branch information
madsbk authored Oct 28, 2022
1 parent 91d605c commit 40bbfed
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 17 deletions.
41 changes: 26 additions & 15 deletions dask_cuda/explicit_comms/dataframe/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import dask
import dask.dataframe
import distributed
from dask.base import compute_as_if_collection, tokenize
from dask.dataframe.core import DataFrame, _concat as dd_concat, new_dd_object
from dask.dataframe.shuffle import shuffle_group
Expand Down Expand Up @@ -325,25 +324,37 @@ def shuffle(
rank_to_out_part_ids,
ignore_index,
)
distributed.wait(list(result_futures.values()))
del df_groups
wait(list(result_futures.values()))

# Step (c): extract individual dataframe-partitions
# Release dataframes from step (a)
for fut in df_groups:
fut.release()

# Step (c): extract individual dataframe-partitions. We use `submit()`
# to control where the tasks are executed.
# TODO: can we do this without using `submit()` to avoid the overhead
# of creating a Future for each dataframe partition?
name = f"explicit-comms-shuffle-getitem-{tokenize(name)}"
dsk = {}
meta = None
for rank, parts in rank_to_out_part_ids.items():
for i, part_id in enumerate(parts):
dsk[(name, part_id)] = (getitem, result_futures[rank], i)
if meta is None:
# Get the meta from the first output partition
meta = delayed(make_meta)(
delayed(getitem)(result_futures[rank], i)
).compute()
assert meta is not None
for rank, worker in enumerate(c.worker_addresses):
if rank in workers:
for i, part_id in enumerate(rank_to_out_part_ids[rank]):
dsk[(name, part_id)] = c.client.submit(
getitem, result_futures[rank], i, workers=[worker]
)

# Get the meta from the first output partition
meta = delayed(make_meta)(next(iter(dsk.values()))).compute()

# Create a distributed Dataframe from all the pieces
divs = [None] * (len(dsk) + 1)
return new_dd_object(dsk, name, meta, divs).persist()
ret = new_dd_object(dsk, name, meta, divs).persist()
wait(ret)

# Release all temporary dataframes
for fut in [*result_futures.values(), *dsk.values()]:
fut.release()
return ret


def get_rearrange_by_column_tasks_wrapper(func):
Expand Down
5 changes: 3 additions & 2 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@
# -- Extension configuration -------------------------------------------------



def setup(app):
app.add_css_file("https://docs.rapids.ai/assets/css/custom.css")
app.add_js_file("https://docs.rapids.ai/assets/js/custom.js", loading_method="defer")
app.add_js_file(
"https://docs.rapids.ai/assets/js/custom.js", loading_method="defer"
)

0 comments on commit 40bbfed

Please sign in to comment.