Skip to content

Commit

Permalink
Merge pull request #11806 from rapidsai/branch-22.10
Browse files Browse the repository at this point in the history
[gpuCI] Forward-merge branch-22.10 to branch-22.12 [skip gpuci]
  • Loading branch information
GPUtester authored Sep 28, 2022
2 parents 479514e + 9e9ba6e commit 5cf7fdf
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 77 deletions.
30 changes: 6 additions & 24 deletions python/dask_cudf/dask_cudf/sorting.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dask.base import tokenize
from dask.dataframe import methods
from dask.dataframe.core import DataFrame, Index, Series
from dask.dataframe.shuffle import rearrange_by_column
from dask.highlevelgraph import HighLevelGraph
from dask.utils import M

Expand Down Expand Up @@ -237,12 +238,6 @@ def sort_values(
):
"""Sort by the given list/tuple of column names."""

shuffle = _get_shuffle_type(shuffle)
# Note that we cannot import `rearrange_by_column` in
# the header, because we need to allow dask-cuda to
# patch this function before we import it here
from dask.dataframe.shuffle import rearrange_by_column

if not isinstance(ascending, bool):
raise ValueError("ascending must be either True or False")
if na_position not in ("first", "last"):
Expand Down Expand Up @@ -293,7 +288,7 @@ def sort_values(
"_partitions",
max_branch=max_branch,
npartitions=len(divisions) - 1,
shuffle=shuffle,
shuffle=_get_shuffle_type(shuffle),
ignore_index=ignore_index,
).drop(columns=["_partitions"])
df3.divisions = (None,) * (df3.npartitions + 1)
Expand All @@ -309,26 +304,13 @@ def sort_values(

def _get_shuffle_type(shuffle):
# Utility to set the shuffle-kwarg default
# and to validate user-specified options
#
# Supported Options:
# - "tasks"
# - "explicit-comms" (requires dask_cuda)
#
# and to validate user-specified options.
# The only supported options is currently "tasks"
shuffle = shuffle or dask.config.get("shuffle", "tasks")
if shuffle not in {"tasks", "explicit-comms"}:
if shuffle != "tasks":
raise ValueError(
f"Dask-cudf only supports in-memory shuffling with "
f"'tasks' or 'explicit-comms'. Got shuffle={shuffle}"
f"'tasks'. Got shuffle={shuffle}"
)

if shuffle == "explicit-comms":
try:
import dask_cuda # noqa: F401
except ImportError:
raise ValueError(
"shuffle='explicit-comms' requires dask_cuda. "
"Please install dask_cuda, or use shuffle='tasks'."
)

return shuffle
53 changes: 0 additions & 53 deletions python/dask_cudf/dask_cudf/tests/test_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import dask
from dask import dataframe as dd
from dask.distributed import Client
from dask.utils_test import hlg_layer
from distributed.utils_test import cleanup, loop, loop_in_thread # noqa: F401

import cudf
Expand Down Expand Up @@ -78,55 +77,3 @@ def test_str_series_roundtrip():

actual = dask_series.compute()
assert_eq(actual, expected)


def test_shuffle_explicit_comms():
with dask_cuda.LocalCUDACluster(n_workers=2) as cluster:
with Client(cluster):
df = cudf.DataFrame({"a": [1, 2, 3, 4], "b": [3, 1, 2, 4]})
ddf = dask_cudf.from_cudf(df, npartitions=4)

# Test sort_values API
got_ec = ddf.sort_values(["b"], shuffle="explicit-comms")
got_tasks = ddf.sort_values(["b"], shuffle="tasks")
assert hlg_layer(got_ec.dask, "explicit")
assert_eq(got_ec.compute(), got_tasks.compute())

# Test set_index API
got_ec = ddf.set_index("b", shuffle="explicit-comms")
got_tasks = ddf.set_index("b", shuffle="tasks")
assert got_ec.divisions == got_tasks.divisions
assert hlg_layer(got_ec.dask, "explicit")
assert_eq(got_ec.compute(), got_tasks.compute())

# Test shuffle API
got_ec = ddf.shuffle(["b"], shuffle="explicit-comms")
assert hlg_layer(got_ec.dask, "explicit")
assert len(got_ec) == len(ddf)

# Test merge API
got_ec = ddf.merge(ddf.copy(), on="b", shuffle="explicit-comms")
got_tasks = ddf.merge(ddf.copy(), on="b", shuffle="tasks")
assert hlg_layer(got_ec.dask, "explicit")
assert_eq(got_ec.compute(), got_tasks.compute())

# Test join API
got_ec = ddf.join(
ddf.set_index("b"),
on="b",
lsuffix="_l",
rsuffix="_r",
shuffle="explicit-comms",
)
got_tasks = ddf.join(
ddf.set_index("b"),
on="b",
lsuffix="_l",
rsuffix="_r",
shuffle="tasks",
)
assert hlg_layer(got_ec.dask, "explicit")
assert_eq(
got_ec.compute().sort_index(),
got_tasks.compute().sort_index(),
)

0 comments on commit 5cf7fdf

Please sign in to comment.