Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to Dask's shuffle_method kwarg #14708

Merged
merged 4 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions python/dask_cudf/dask_cudf/core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2018-2023, NVIDIA CORPORATION.
# Copyright (c) 2018-2024, NVIDIA CORPORATION.

import math
import textwrap
Expand Down Expand Up @@ -26,7 +26,7 @@

from dask_cudf import sorting
from dask_cudf.accessors import ListMethods, StructMethods
from dask_cudf.sorting import _get_shuffle_type
from dask_cudf.sorting import _get_shuffle_method


class _Frame(dd.core._Frame, OperatorMethodMixin):
Expand Down Expand Up @@ -112,16 +112,19 @@ def do_apply_rows(df, func, incols, outcols, kwargs):
)

@_dask_cudf_nvtx_annotate
def merge(self, other, shuffle=None, **kwargs):
def merge(self, other, shuffle_method=None, **kwargs):
on = kwargs.pop("on", None)
if isinstance(on, tuple):
on = list(on)
return super().merge(
other, on=on, shuffle=_get_shuffle_type(shuffle), **kwargs
other,
on=on,
shuffle_method=_get_shuffle_method(shuffle_method),
**kwargs,
)

@_dask_cudf_nvtx_annotate
def join(self, other, shuffle=None, **kwargs):
def join(self, other, shuffle_method=None, **kwargs):
# CuDF doesn't support "right" join yet
how = kwargs.pop("how", "left")
if how == "right":
Expand All @@ -131,12 +134,21 @@ def join(self, other, shuffle=None, **kwargs):
if isinstance(on, tuple):
on = list(on)
return super().join(
other, how=how, on=on, shuffle=_get_shuffle_type(shuffle), **kwargs
other,
how=how,
on=on,
shuffle_method=_get_shuffle_method(shuffle_method),
**kwargs,
)

@_dask_cudf_nvtx_annotate
def set_index(
self, other, sorted=False, divisions=None, shuffle=None, **kwargs
self,
other,
sorted=False,
divisions=None,
shuffle_method=None,
**kwargs,
):

pre_sorted = sorted
Expand Down Expand Up @@ -172,7 +184,7 @@ def set_index(
divisions=divisions,
set_divisions=True,
ignore_index=True,
shuffle=shuffle,
shuffle_method=shuffle_method,
)

# Ignore divisions if its a dataframe
Expand All @@ -199,7 +211,7 @@ def set_index(
return super().set_index(
other,
sorted=pre_sorted,
shuffle=_get_shuffle_type(shuffle),
shuffle_method=_get_shuffle_method(shuffle_method),
divisions=divisions,
**kwargs,
)
Expand All @@ -216,7 +228,7 @@ def sort_values(
na_position="last",
sort_function=None,
sort_function_kwargs=None,
shuffle=None,
shuffle_method=None,
**kwargs,
):
if kwargs:
Expand All @@ -233,7 +245,7 @@ def sort_values(
ignore_index=ignore_index,
ascending=ascending,
na_position=na_position,
shuffle=shuffle,
shuffle_method=shuffle_method,
sort_function=sort_function,
sort_function_kwargs=sort_function_kwargs,
)
Expand Down Expand Up @@ -287,10 +299,10 @@ def var(
return _parallel_var(self, meta, skipna, split_every, out)

@_dask_cudf_nvtx_annotate
def shuffle(self, *args, shuffle=None, **kwargs):
def shuffle(self, *args, shuffle_method=None, **kwargs):
"""Wraps dask.dataframe DataFrame.shuffle method"""
return super().shuffle(
*args, shuffle=_get_shuffle_type(shuffle), **kwargs
*args, shuffle_method=_get_shuffle_method(shuffle_method), **kwargs
)

@_dask_cudf_nvtx_annotate
Expand Down
34 changes: 19 additions & 15 deletions python/dask_cudf/dask_cudf/groupby.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.

from functools import wraps
from typing import Set
Expand Down Expand Up @@ -211,7 +211,7 @@ def aggregate(self, arg, split_every=None, split_out=1, shuffle=None):
sep=self.sep,
sort=self.sort,
as_index=self.as_index,
shuffle=shuffle,
shuffle_method=shuffle,
**self.dropna,
)

Expand Down Expand Up @@ -364,7 +364,7 @@ def _shuffle_aggregate(
split_out,
token=None,
sort=None,
shuffle=None,
shuffle_method=None,
):
# Shuffle-based groupby aggregation
# NOTE: This function is the dask_cudf version of
Expand All @@ -391,7 +391,7 @@ def _shuffle_aggregate(
.sort_values(
gb_cols,
ignore_index=True,
shuffle=shuffle,
shuffle_method=shuffle_method,
)
.map_partitions(
aggregate,
Expand All @@ -405,7 +405,7 @@ def _shuffle_aggregate(
gb_cols,
npartitions=shuffle_npartitions,
ignore_index=True,
shuffle=shuffle,
shuffle_method=shuffle_method,
).map_partitions(
aggregate,
meta=aggregate(chunked._meta, **aggregate_kwargs),
Expand All @@ -429,7 +429,7 @@ def groupby_agg(
sep="___",
sort=False,
as_index=True,
shuffle=None,
shuffle_method=None,
):
"""Optimized groupby aggregation for Dask-CuDF.

Expand All @@ -450,7 +450,7 @@ def groupby_agg(
sort : bool
Sort the group keys, better performance is obtained when
not sorting.
shuffle : str (optional)
shuffle_method : str (optional)
Control how shuffling of the DataFrame is performed.
sep : str
Internal usage.
Expand Down Expand Up @@ -575,12 +575,12 @@ def groupby_agg(
"aggs_renames": aggs_renames,
}

# Use shuffle=True for split_out>1
if sort and split_out > 1 and shuffle is None:
shuffle = "tasks"
# Use shuffle_method=True for split_out>1
if sort and split_out > 1 and shuffle_method is None:
shuffle_method = "tasks"

# Check if we are using the shuffle-based algorithm
if shuffle:
if shuffle_method:
# Shuffle-based aggregation
return _shuffle_aggregate(
ddf,
Expand All @@ -593,7 +593,9 @@ def groupby_agg(
split_out,
token="cudf-aggregate",
sort=sort,
shuffle=shuffle if isinstance(shuffle, str) else None,
shuffle_method=shuffle_method
if isinstance(shuffle_method, str)
else None,
)

# Deal with sort/shuffle defaults
Expand All @@ -602,7 +604,7 @@ def groupby_agg(
"dask-cudf's groupby algorithm does not yet support "
"`sort=True` when `split_out>1`, unless a shuffle-based "
"algorithm is used. Please use `split_out=1`, group "
"with `sort=False`, or set `shuffle=True`."
"with `sort=False`, or set `shuffle_method=True`."
)

# Determine required columns to enable column projection
Expand All @@ -629,7 +631,9 @@ def groupby_agg(


@_dask_cudf_nvtx_annotate
def _make_groupby_agg_call(gb, aggs, split_every, split_out, shuffle=None):
def _make_groupby_agg_call(
gb, aggs, split_every, split_out, shuffle_method=None
):
"""Helper method to consolidate the common `groupby_agg` call for all
aggregations in one place
"""
Expand All @@ -643,7 +647,7 @@ def _make_groupby_agg_call(gb, aggs, split_every, split_out, shuffle=None):
sep=gb.sep,
sort=gb.sort,
as_index=gb.as_index,
shuffle=shuffle,
shuffle_method=shuffle_method,
**gb.dropna,
)

Expand Down
18 changes: 9 additions & 9 deletions python/dask_cudf/dask_cudf/sorting.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.

from collections.abc import Iterator

Expand Down Expand Up @@ -239,7 +239,7 @@ def sort_values(
ignore_index=False,
ascending=True,
na_position="last",
shuffle=None,
shuffle_method=None,
sort_function=None,
sort_function_kwargs=None,
):
Expand Down Expand Up @@ -295,7 +295,7 @@ def sort_values(
"_partitions",
max_branch=max_branch,
npartitions=len(divisions) - 1,
shuffle=_get_shuffle_type(shuffle),
shuffle_method=_get_shuffle_method(shuffle_method),
ignore_index=ignore_index,
).drop(columns=["_partitions"])
df3.divisions = (None,) * (df3.npartitions + 1)
Expand All @@ -320,14 +320,14 @@ def get_default_shuffle_method():
return default


def _get_shuffle_type(shuffle):
# Utility to set the shuffle-kwarg default
def _get_shuffle_method(shuffle_method):
# Utility to set the shuffle_method-kwarg default
# and to validate user-specified options
shuffle = shuffle or get_default_shuffle_method()
if shuffle not in _SHUFFLE_SUPPORT:
shuffle_method = shuffle_method or get_default_shuffle_method()
if shuffle_method not in _SHUFFLE_SUPPORT:
raise ValueError(
"Dask-cudf only supports the following shuffle "
f"methods: {_SHUFFLE_SUPPORT}. Got shuffle={shuffle}"
f"methods: {_SHUFFLE_SUPPORT}. Got shuffle_method={shuffle_method}"
)

return shuffle
return shuffle_method
6 changes: 3 additions & 3 deletions python/dask_cudf/dask_cudf/tests/test_core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION.
# Copyright (c) 2021-2024, NVIDIA CORPORATION.

import random

Expand Down Expand Up @@ -331,10 +331,10 @@ def test_rearrange_by_divisions(nelem, index):
divisions = (0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

expect = dd.shuffle.rearrange_by_divisions(
ddf1, "x", divisions=divisions, shuffle="tasks"
ddf1, "x", divisions=divisions, shuffle_method="tasks"
)
result = dd.shuffle.rearrange_by_divisions(
gdf1, "x", divisions=divisions, shuffle="tasks"
gdf1, "x", divisions=divisions, shuffle_method="tasks"
)
dd.assert_eq(expect, result)

Expand Down
6 changes: 3 additions & 3 deletions python/dask_cudf/dask_cudf/tests/test_distributed.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.

import numba.cuda
import pytest
Expand Down Expand Up @@ -80,7 +80,7 @@ def test_str_series_roundtrip():


def test_p2p_shuffle():
# Check that we can use `shuffle="p2p"`
# Check that we can use `shuffle_method="p2p"`
with dask_cuda.LocalCUDACluster(n_workers=1) as cluster:
with Client(cluster):
ddf = (
Expand All @@ -93,7 +93,7 @@ def test_p2p_shuffle():
.to_backend("cudf")
)
dd.assert_eq(
ddf.sort_values("x", shuffle="p2p").compute(),
ddf.sort_values("x", shuffle_method="p2p").compute(),
ddf.compute().sort_values("x"),
check_index=False,
)
2 changes: 1 addition & 1 deletion python/dask_cudf/dask_cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION.
# Copyright (c) 2021-2024, NVIDIA CORPORATION.

import numpy as np
import pandas as pd
Expand Down
4 changes: 2 additions & 2 deletions python/dask_cudf/dask_cudf/tests/test_sort.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2023, NVIDIA CORPORATION.
# Copyright (c) 2019-2024, NVIDIA CORPORATION.

import cupy as cp
import numpy as np
Expand Down Expand Up @@ -123,5 +123,5 @@ def test_disk_shuffle():
pytest.skip("need a version of dask that has partd_encode_dispatch")
df = cudf.DataFrame({"a": [1, 2, 3] * 20, "b": [4, 5, 6, 7] * 15})
ddf = dd.from_pandas(df, npartitions=4)
got = dd.DataFrame.shuffle(ddf, "a", shuffle="disk")
got = dd.DataFrame.shuffle(ddf, "a", shuffle_method="disk")
dd.assert_eq(got, df)