From 8bfff643cdb8274c32788b9ba9dff5a877d8c1d5 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 25 Oct 2022 16:32:16 +0200 Subject: [PATCH 1/5] Explicit release `df_groups` --- dask_cuda/explicit_comms/dataframe/shuffle.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index b1f99869e..36c81cc49 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -325,8 +325,11 @@ def shuffle( rank_to_out_part_ids, ignore_index, ) - distributed.wait(list(result_futures.values())) - del df_groups + wait(list(result_futures.values())) + + # Release dataframes from step (a) + for fut in df_groups: + fut.release() # Step (c): extract individual dataframe-partitions name = f"explicit-comms-shuffle-getitem-{tokenize(name)}" From 4f494417b13b05707cc5b48adff07034ef327838 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 26 Oct 2022 09:02:19 +0200 Subject: [PATCH 2/5] style --- docs/source/conf.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/source/conf.py b/docs/source/conf.py index 2f7825a32..08d8bfdfd 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -189,7 +189,8 @@ # -- Extension configuration ------------------------------------------------- - def setup(app): app.add_css_file("https://docs.rapids.ai/assets/css/custom.css") - app.add_js_file("https://docs.rapids.ai/assets/js/custom.js", loading_method="defer") + app.add_js_file( + "https://docs.rapids.ai/assets/js/custom.js", loading_method="defer" + ) From 9abc3163b4ab99cd92170bd19b9128270890365a Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 26 Oct 2022 09:03:08 +0200 Subject: [PATCH 3/5] Explicit specify worker when submitting getitem tasks --- dask_cuda/explicit_comms/dataframe/shuffle.py | 33 ++++++++++++------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 36c81cc49..7b115a7ca 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -11,7 +11,6 @@ import dask import dask.dataframe -import distributed from dask.base import compute_as_if_collection, tokenize from dask.dataframe.core import DataFrame, _concat as dd_concat, new_dd_object from dask.dataframe.shuffle import shuffle_group @@ -332,21 +331,31 @@ def shuffle( fut.release() # Step (c): extract individual dataframe-partitions + # We use `submit()` to control where the tasks are + # executed. name = f"explicit-comms-shuffle-getitem-{tokenize(name)}" dsk = {} - meta = None - for rank, parts in rank_to_out_part_ids.items(): - for i, part_id in enumerate(parts): - dsk[(name, part_id)] = (getitem, result_futures[rank], i) - if meta is None: - # Get the meta from the first output partition - meta = delayed(make_meta)( - delayed(getitem)(result_futures[rank], i) - ).compute() - assert meta is not None + for rank, worker in enumerate(c.worker_addresses): + if rank in workers: + for i, part_id in enumerate(rank_to_out_part_ids[rank]): + dsk[(name, part_id)] = c.client.submit( + getitem, result_futures[rank], i, workers=[worker] + ) + # Get the meta from the first output partition + meta = delayed(make_meta)(next(iter(dsk.values()))).compute() + + # Create a distributed Dataframe from all the pieces divs = [None] * (len(dsk) + 1) - return new_dd_object(dsk, name, meta, divs).persist() + ret = new_dd_object(dsk, name, meta, divs).persist() + wait(ret) + + # Release all temporary dataframes + for fut in result_futures.values(): + fut.release() + for fut in dsk.values(): + fut.release() + return ret def get_rearrange_by_column_tasks_wrapper(func): From 6c6b9168a8a42fb80404e16562210ccd81b94d35 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 26 Oct 2022 00:22:32 -0700 Subject: [PATCH 4/5] doc --- dask_cuda/explicit_comms/dataframe/shuffle.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 7b115a7ca..d945a5c9a 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -330,9 +330,10 @@ def shuffle( for fut in df_groups: fut.release() - # Step (c): extract individual dataframe-partitions - # We use `submit()` to control where the tasks are - # executed. + # Step (c): extract individual dataframe-partitions. We use `submit()` + # to control where the tasks are executed. + # TODO: can we do this without using `submit()` to avoid the overhead + # of creating a Future for each dataframe partition? name = f"explicit-comms-shuffle-getitem-{tokenize(name)}" dsk = {} for rank, worker in enumerate(c.worker_addresses): From 658c3dade34af25f78f468da8ae4848bbc60b2f5 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 28 Oct 2022 08:10:45 +0200 Subject: [PATCH 5/5] Apply suggestions from code review Co-authored-by: Peter Andreas Entschev --- dask_cuda/explicit_comms/dataframe/shuffle.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index d945a5c9a..294a8efd7 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -352,9 +352,7 @@ def shuffle( wait(ret) # Release all temporary dataframes - for fut in result_futures.values(): - fut.release() - for fut in dsk.values(): + for fut in [*result_futures.values(), *dsk.values()]: fut.release() return ret