Skip to content

Commit

Permalink
Update to Dask's shuffle_method kwarg (#14708)
Browse files Browse the repository at this point in the history
Authors:
  - Peter Andreas Entschev (https://github.com/pentschev)

Approvers:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

URL: #14708
  • Loading branch information
pentschev authored Jan 4, 2024
1 parent 4c01e95 commit fab5af2
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 46 deletions.
38 changes: 25 additions & 13 deletions python/dask_cudf/dask_cudf/core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2018-2023, NVIDIA CORPORATION.
# Copyright (c) 2018-2024, NVIDIA CORPORATION.

import math
import textwrap
Expand Down Expand Up @@ -26,7 +26,7 @@

from dask_cudf import sorting
from dask_cudf.accessors import ListMethods, StructMethods
from dask_cudf.sorting import _get_shuffle_type
from dask_cudf.sorting import _get_shuffle_method


class _Frame(dd.core._Frame, OperatorMethodMixin):
Expand Down Expand Up @@ -112,16 +112,19 @@ def do_apply_rows(df, func, incols, outcols, kwargs):
)

@_dask_cudf_nvtx_annotate
def merge(self, other, shuffle=None, **kwargs):
def merge(self, other, shuffle_method=None, **kwargs):
on = kwargs.pop("on", None)
if isinstance(on, tuple):
on = list(on)
return super().merge(
other, on=on, shuffle=_get_shuffle_type(shuffle), **kwargs
other,
on=on,
shuffle_method=_get_shuffle_method(shuffle_method),
**kwargs,
)

@_dask_cudf_nvtx_annotate
def join(self, other, shuffle=None, **kwargs):
def join(self, other, shuffle_method=None, **kwargs):
# CuDF doesn't support "right" join yet
how = kwargs.pop("how", "left")
if how == "right":
Expand All @@ -131,12 +134,21 @@ def join(self, other, shuffle=None, **kwargs):
if isinstance(on, tuple):
on = list(on)
return super().join(
other, how=how, on=on, shuffle=_get_shuffle_type(shuffle), **kwargs
other,
how=how,
on=on,
shuffle_method=_get_shuffle_method(shuffle_method),
**kwargs,
)

@_dask_cudf_nvtx_annotate
def set_index(
self, other, sorted=False, divisions=None, shuffle=None, **kwargs
self,
other,
sorted=False,
divisions=None,
shuffle_method=None,
**kwargs,
):

pre_sorted = sorted
Expand Down Expand Up @@ -172,7 +184,7 @@ def set_index(
divisions=divisions,
set_divisions=True,
ignore_index=True,
shuffle=shuffle,
shuffle_method=shuffle_method,
)

# Ignore divisions if its a dataframe
Expand All @@ -199,7 +211,7 @@ def set_index(
return super().set_index(
other,
sorted=pre_sorted,
shuffle=_get_shuffle_type(shuffle),
shuffle_method=_get_shuffle_method(shuffle_method),
divisions=divisions,
**kwargs,
)
Expand All @@ -216,7 +228,7 @@ def sort_values(
na_position="last",
sort_function=None,
sort_function_kwargs=None,
shuffle=None,
shuffle_method=None,
**kwargs,
):
if kwargs:
Expand All @@ -233,7 +245,7 @@ def sort_values(
ignore_index=ignore_index,
ascending=ascending,
na_position=na_position,
shuffle=shuffle,
shuffle_method=shuffle_method,
sort_function=sort_function,
sort_function_kwargs=sort_function_kwargs,
)
Expand Down Expand Up @@ -287,10 +299,10 @@ def var(
return _parallel_var(self, meta, skipna, split_every, out)

@_dask_cudf_nvtx_annotate
def shuffle(self, *args, shuffle=None, **kwargs):
def shuffle(self, *args, shuffle_method=None, **kwargs):
"""Wraps dask.dataframe DataFrame.shuffle method"""
return super().shuffle(
*args, shuffle=_get_shuffle_type(shuffle), **kwargs
*args, shuffle_method=_get_shuffle_method(shuffle_method), **kwargs
)

@_dask_cudf_nvtx_annotate
Expand Down
34 changes: 19 additions & 15 deletions python/dask_cudf/dask_cudf/groupby.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.

from functools import wraps
from typing import Set
Expand Down Expand Up @@ -211,7 +211,7 @@ def aggregate(self, arg, split_every=None, split_out=1, shuffle=None):
sep=self.sep,
sort=self.sort,
as_index=self.as_index,
shuffle=shuffle,
shuffle_method=shuffle,
**self.dropna,
)

Expand Down Expand Up @@ -364,7 +364,7 @@ def _shuffle_aggregate(
split_out,
token=None,
sort=None,
shuffle=None,
shuffle_method=None,
):
# Shuffle-based groupby aggregation
# NOTE: This function is the dask_cudf version of
Expand All @@ -391,7 +391,7 @@ def _shuffle_aggregate(
.sort_values(
gb_cols,
ignore_index=True,
shuffle=shuffle,
shuffle_method=shuffle_method,
)
.map_partitions(
aggregate,
Expand All @@ -405,7 +405,7 @@ def _shuffle_aggregate(
gb_cols,
npartitions=shuffle_npartitions,
ignore_index=True,
shuffle=shuffle,
shuffle_method=shuffle_method,
).map_partitions(
aggregate,
meta=aggregate(chunked._meta, **aggregate_kwargs),
Expand All @@ -429,7 +429,7 @@ def groupby_agg(
sep="___",
sort=False,
as_index=True,
shuffle=None,
shuffle_method=None,
):
"""Optimized groupby aggregation for Dask-CuDF.
Expand All @@ -450,7 +450,7 @@ def groupby_agg(
sort : bool
Sort the group keys, better performance is obtained when
not sorting.
shuffle : str (optional)
shuffle_method : str (optional)
Control how shuffling of the DataFrame is performed.
sep : str
Internal usage.
Expand Down Expand Up @@ -575,12 +575,12 @@ def groupby_agg(
"aggs_renames": aggs_renames,
}

# Use shuffle=True for split_out>1
if sort and split_out > 1 and shuffle is None:
shuffle = "tasks"
# Use shuffle_method=True for split_out>1
if sort and split_out > 1 and shuffle_method is None:
shuffle_method = "tasks"

# Check if we are using the shuffle-based algorithm
if shuffle:
if shuffle_method:
# Shuffle-based aggregation
return _shuffle_aggregate(
ddf,
Expand All @@ -593,7 +593,9 @@ def groupby_agg(
split_out,
token="cudf-aggregate",
sort=sort,
shuffle=shuffle if isinstance(shuffle, str) else None,
shuffle_method=shuffle_method
if isinstance(shuffle_method, str)
else None,
)

# Deal with sort/shuffle defaults
Expand All @@ -602,7 +604,7 @@ def groupby_agg(
"dask-cudf's groupby algorithm does not yet support "
"`sort=True` when `split_out>1`, unless a shuffle-based "
"algorithm is used. Please use `split_out=1`, group "
"with `sort=False`, or set `shuffle=True`."
"with `sort=False`, or set `shuffle_method=True`."
)

# Determine required columns to enable column projection
Expand All @@ -629,7 +631,9 @@ def groupby_agg(


@_dask_cudf_nvtx_annotate
def _make_groupby_agg_call(gb, aggs, split_every, split_out, shuffle=None):
def _make_groupby_agg_call(
gb, aggs, split_every, split_out, shuffle_method=None
):
"""Helper method to consolidate the common `groupby_agg` call for all
aggregations in one place
"""
Expand All @@ -643,7 +647,7 @@ def _make_groupby_agg_call(gb, aggs, split_every, split_out, shuffle=None):
sep=gb.sep,
sort=gb.sort,
as_index=gb.as_index,
shuffle=shuffle,
shuffle_method=shuffle_method,
**gb.dropna,
)

Expand Down
18 changes: 9 additions & 9 deletions python/dask_cudf/dask_cudf/sorting.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.

from collections.abc import Iterator

Expand Down Expand Up @@ -239,7 +239,7 @@ def sort_values(
ignore_index=False,
ascending=True,
na_position="last",
shuffle=None,
shuffle_method=None,
sort_function=None,
sort_function_kwargs=None,
):
Expand Down Expand Up @@ -295,7 +295,7 @@ def sort_values(
"_partitions",
max_branch=max_branch,
npartitions=len(divisions) - 1,
shuffle=_get_shuffle_type(shuffle),
shuffle_method=_get_shuffle_method(shuffle_method),
ignore_index=ignore_index,
).drop(columns=["_partitions"])
df3.divisions = (None,) * (df3.npartitions + 1)
Expand All @@ -320,14 +320,14 @@ def get_default_shuffle_method():
return default


def _get_shuffle_type(shuffle):
# Utility to set the shuffle-kwarg default
def _get_shuffle_method(shuffle_method):
# Utility to set the shuffle_method-kwarg default
# and to validate user-specified options
shuffle = shuffle or get_default_shuffle_method()
if shuffle not in _SHUFFLE_SUPPORT:
shuffle_method = shuffle_method or get_default_shuffle_method()
if shuffle_method not in _SHUFFLE_SUPPORT:
raise ValueError(
"Dask-cudf only supports the following shuffle "
f"methods: {_SHUFFLE_SUPPORT}. Got shuffle={shuffle}"
f"methods: {_SHUFFLE_SUPPORT}. Got shuffle_method={shuffle_method}"
)

return shuffle
return shuffle_method
6 changes: 3 additions & 3 deletions python/dask_cudf/dask_cudf/tests/test_core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION.
# Copyright (c) 2021-2024, NVIDIA CORPORATION.

import random

Expand Down Expand Up @@ -331,10 +331,10 @@ def test_rearrange_by_divisions(nelem, index):
divisions = (0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

expect = dd.shuffle.rearrange_by_divisions(
ddf1, "x", divisions=divisions, shuffle="tasks"
ddf1, "x", divisions=divisions, shuffle_method="tasks"
)
result = dd.shuffle.rearrange_by_divisions(
gdf1, "x", divisions=divisions, shuffle="tasks"
gdf1, "x", divisions=divisions, shuffle_method="tasks"
)
dd.assert_eq(expect, result)

Expand Down
6 changes: 3 additions & 3 deletions python/dask_cudf/dask_cudf/tests/test_distributed.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.

import numba.cuda
import pytest
Expand Down Expand Up @@ -80,7 +80,7 @@ def test_str_series_roundtrip():


def test_p2p_shuffle():
# Check that we can use `shuffle="p2p"`
# Check that we can use `shuffle_method="p2p"`
with dask_cuda.LocalCUDACluster(n_workers=1) as cluster:
with Client(cluster):
ddf = (
Expand All @@ -93,7 +93,7 @@ def test_p2p_shuffle():
.to_backend("cudf")
)
dd.assert_eq(
ddf.sort_values("x", shuffle="p2p").compute(),
ddf.sort_values("x", shuffle_method="p2p").compute(),
ddf.compute().sort_values("x"),
check_index=False,
)
2 changes: 1 addition & 1 deletion python/dask_cudf/dask_cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION.
# Copyright (c) 2021-2024, NVIDIA CORPORATION.

import numpy as np
import pandas as pd
Expand Down
4 changes: 2 additions & 2 deletions python/dask_cudf/dask_cudf/tests/test_sort.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2023, NVIDIA CORPORATION.
# Copyright (c) 2019-2024, NVIDIA CORPORATION.

import cupy as cp
import numpy as np
Expand Down Expand Up @@ -123,5 +123,5 @@ def test_disk_shuffle():
pytest.skip("need a version of dask that has partd_encode_dispatch")
df = cudf.DataFrame({"a": [1, 2, 3] * 20, "b": [4, 5, 6, 7] * 15})
ddf = dd.from_pandas(df, npartitions=4)
got = dd.DataFrame.shuffle(ddf, "a", shuffle="disk")
got = dd.DataFrame.shuffle(ddf, "a", shuffle_method="disk")
dd.assert_eq(got, df)

0 comments on commit fab5af2

Please sign in to comment.