Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add checks for HLG layers in dask-cudf groupby tests #10853

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
6571499
Add checks for dask-cudf groupby HLG layers
charlesbluca May 13, 2022
9bc6645
Remove some tests that are now superfluous
charlesbluca May 13, 2022
1115f1c
Merge remote-tracking branch 'origin/branch-22.10' into checkout-grou…
charlesbluca Sep 13, 2022
1c700cb
Update check to use new layer names
charlesbluca Sep 13, 2022
2bec53a
Merge remote-tracking branch 'origin/branch-22.12' into checkout-grou…
charlesbluca Nov 1, 2022
543f8db
Update python/dask_cudf/dask_cudf/tests/test_groupby.py
charlesbluca Nov 2, 2022
b77a14a
Rename layer assertion function, improve error message
charlesbluca Nov 2, 2022
9d458f8
Parametrize nulls in pdf fixtures, use in test_groupby_agg
charlesbluca Nov 2, 2022
c7eaad1
Merge cumulative agg tests into main groupby tests
charlesbluca Nov 2, 2022
069fc65
Add back in test_groupby_first_last with some changes
charlesbluca Nov 2, 2022
711470f
Revert "Merge cumulative agg tests into main groupby tests"
charlesbluca Nov 2, 2022
2112fea
xfail cumulative test for null dataframes
charlesbluca Nov 3, 2022
60649ba
Remove cumulative aggs from SUPPORTED_AGGS
charlesbluca Nov 3, 2022
5b2fe76
Merge remote-tracking branch 'origin/branch-22.12' into checkout-grou…
charlesbluca Nov 3, 2022
2001ffe
Rename util groupby functions from supported -> optimized
charlesbluca Nov 3, 2022
6eddbcf
Merge remote-tracking branch 'origin/branch-22.12' into checkout-grou…
charlesbluca Nov 3, 2022
cc64ebc
Wrap groupby decorator with functools.wraps
charlesbluca Nov 3, 2022
78be977
Merge remote-tracking branch 'origin/branch-22.12' into checkout-grou…
charlesbluca Nov 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions python/dask_cudf/dask_cudf/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,36 @@ def last(self, split_every=None, split_out=1):
**self.dropna,
)

@_dask_cudf_nvtx_annotate
@_check_groupby_supported
def cumsum(self, split_every=None, split_out=1):
return groupby_agg(
self.obj,
self.by,
self._make_groupby_method_aggs("cumsum"),
split_every=split_every,
split_out=split_out,
sep=self.sep,
sort=self.sort,
as_index=self.as_index,
**self.dropna,
)

@_dask_cudf_nvtx_annotate
@_check_groupby_supported
def cumcount(self, split_every=None, split_out=1):
return groupby_agg(
self.obj,
self.by,
self._make_groupby_method_aggs("cumcount"),
split_every=split_every,
split_out=split_out,
sep=self.sep,
sort=self.sort,
as_index=self.as_index,
**self.dropna,
)

@_dask_cudf_nvtx_annotate
def aggregate(self, arg, split_every=None, split_out=1, shuffle=None):
if arg == "size":
Expand Down Expand Up @@ -436,6 +466,36 @@ def last(self, split_every=None, split_out=1):
**self.dropna,
)[self._slice]

@_dask_cudf_nvtx_annotate
@_check_groupby_supported
def cumsum(self, split_every=None, split_out=1):
return groupby_agg(
self.obj,
self.by,
{self._slice: "cumsum"},
split_every=split_every,
split_out=split_out,
sep=self.sep,
sort=self.sort,
as_index=self.as_index,
**self.dropna,
)[self._slice]

@_dask_cudf_nvtx_annotate
@_check_groupby_supported
def cumcount(self, split_every=None, split_out=1):
return groupby_agg(
self.obj,
self.by,
{self._slice: "cumcount"},
split_every=split_every,
split_out=split_out,
sep=self.sep,
sort=self.sort,
as_index=self.as_index,
**self.dropna,
)[self._slice]

@_dask_cudf_nvtx_annotate
def aggregate(self, arg, split_every=None, split_out=1, shuffle=None):
if arg == "size":
Expand Down
145 changes: 59 additions & 86 deletions python/dask_cudf/dask_cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,30 @@

import dask
from dask import dataframe as dd
from dask.utils_test import hlg_layer

import cudf
from cudf.core._compat import PANDAS_GE_120

import dask_cudf
from dask_cudf.groupby import AGGS, CUMULATIVE_AGGS, _aggs_supported

ALL_AGGS = AGGS + CUMULATIVE_AGGS

@pytest.fixture
def pdf():

def assert_cudf_groupby_layers(ddf):
for prefix in ("cudf-aggregate-chunk", "cudf-aggregate-agg"):
try:
hlg_layer(ddf.dask, prefix)
except KeyError:
raise AssertionError(
"Expected Dask dataframe to contain groupby layer with "
f"prefix {prefix}"
)


@pytest.fixture(params=["non_null", "null"])
def pdf(request):
np.random.seed(0)

# note that column name "x" is a substring of the groupby key;
Expand All @@ -27,92 +41,75 @@ def pdf():
"y": np.random.normal(size=10000),
}
)
return pdf

# insert nulls into dataframe at random
if request.param == "null":
pdf = pdf.mask(np.random.choice([True, False], size=pdf.shape))

return pdf

@pytest.mark.parametrize("aggregation", AGGS)
@pytest.mark.parametrize("series", [False, True])
def test_groupby_basic(series, aggregation, pdf):

@pytest.mark.parametrize("aggregation", ALL_AGGS)
@pytest.mark.parametrize("type", ["frame", "series"])
def test_groupby_basic(type, aggregation, pdf):
gdf = cudf.DataFrame.from_pandas(pdf)
gdf_grouped = gdf.groupby("xx")
ddf_grouped = dask_cudf.from_cudf(gdf, npartitions=5).groupby("xx")

if series:
if type == "series":
gdf_grouped = gdf_grouped.xx
ddf_grouped = ddf_grouped.xx

a = getattr(gdf_grouped, aggregation)()
b = getattr(ddf_grouped, aggregation)().compute()
check_dtype = aggregation != "count"

if aggregation == "count":
dd.assert_eq(a, b, check_dtype=False)
else:
dd.assert_eq(a, b)
expect = getattr(gdf_grouped, aggregation)()
actual = getattr(ddf_grouped, aggregation)()

a = gdf_grouped.agg({"xx": aggregation})
b = ddf_grouped.agg({"xx": aggregation}).compute()
assert_cudf_groupby_layers(actual)

if aggregation == "count":
dd.assert_eq(a, b, check_dtype=False)
if aggregation == "cumsum" and type == "series":
with pytest.xfail(reason="https://github.com/dask/dask/issues/9313"):
dd.assert_eq(expect, actual, check_dtype=check_dtype)
else:
dd.assert_eq(a, b)
dd.assert_eq(expect, actual, check_dtype=check_dtype)

expect = gdf_grouped.agg({"xx": aggregation})
actual = ddf_grouped.agg({"xx": aggregation})

@pytest.mark.parametrize("series", [True, False])
@pytest.mark.parametrize("aggregation", CUMULATIVE_AGGS)
def test_groupby_cumulative(aggregation, pdf, series):
charlesbluca marked this conversation as resolved.
Show resolved Hide resolved
gdf = cudf.DataFrame.from_pandas(pdf)
ddf = dask_cudf.from_cudf(gdf, npartitions=5)
assert_cudf_groupby_layers(actual)

gdf_grouped = gdf.groupby("xx")
ddf_grouped = ddf.groupby("xx")

if series:
gdf_grouped = gdf_grouped.xx
ddf_grouped = ddf_grouped.xx

a = getattr(gdf_grouped, aggregation)()
b = getattr(ddf_grouped, aggregation)().compute()

if aggregation == "cumsum" and series:
if aggregation == "cumsum" and type == "series":
with pytest.xfail(reason="https://github.com/dask/dask/issues/9313"):
dd.assert_eq(a, b)
dd.assert_eq(expect, actual, check_dtype=check_dtype)
else:
dd.assert_eq(a, b)
dd.assert_eq(expect, actual, check_dtype=check_dtype)


@pytest.mark.parametrize("aggregation", ALL_AGGS)
@pytest.mark.parametrize(
"func",
[
lambda df: df.groupby("x").agg({"y": "max"}),
lambda df: df.groupby("x").agg(["sum", "max"]),
lambda df: df.groupby("x").y.agg(["sum", "max"]),
lambda df: df.groupby("x").agg("sum"),
lambda df: df.groupby("x").y.agg("sum"),
lambda df, agg: df.groupby("xx").agg({"y": agg}),
lambda df, agg: df.groupby("xx").y.agg({"y": agg}),
lambda df, agg: df.groupby("xx").agg([agg]),
lambda df, agg: df.groupby("xx").y.agg([agg]),
lambda df, agg: df.groupby("xx").agg(agg),
lambda df, agg: df.groupby("xx").y.agg(agg),
],
)
def test_groupby_agg(func):
pdf = pd.DataFrame(
{
"x": np.random.randint(0, 5, size=10000),
"y": np.random.normal(size=10000),
}
)

def test_groupby_agg(func, aggregation, pdf):
gdf = cudf.DataFrame.from_pandas(pdf)

ddf = dask_cudf.from_cudf(gdf, npartitions=5)

a = func(gdf).to_pandas()
b = func(ddf).compute().to_pandas()
actual = func(ddf, aggregation)
expect = func(gdf, aggregation)

a.index.name = None
a.name = None
b.index.name = None
b.name = None
check_dtype = aggregation != "count"

dd.assert_eq(a, b)
assert_cudf_groupby_layers(actual)

dd.assert_eq(expect, actual, check_names=False, check_dtype=check_dtype)


@pytest.mark.parametrize("split_out", [1, 3])
Expand All @@ -136,28 +133,6 @@ def test_groupby_agg_empty_partition(tmpdir, split_out):
dd.assert_eq(gb.compute().sort_index(), expect)


@pytest.mark.parametrize(
"func",
[lambda df: df.groupby("x").std(), lambda df: df.groupby("x").y.std()],
)
def test_groupby_std(func):
pdf = pd.DataFrame(
{
"x": np.random.randint(0, 5, size=10000),
"y": np.random.normal(size=10000),
}
)

gdf = cudf.DataFrame.from_pandas(pdf)

ddf = dask_cudf.from_cudf(gdf, npartitions=5)

a = func(gdf).to_pandas()
b = func(ddf).compute().to_pandas()

dd.assert_eq(a, b)
charlesbluca marked this conversation as resolved.
Show resolved Hide resolved


# reason gotattr in cudf
@pytest.mark.parametrize(
"func",
Expand Down Expand Up @@ -746,22 +721,20 @@ def test_groupby_first_last(data, agg):
gddf = dask_cudf.from_cudf(gdf, npartitions=2)

dd.assert_eq(
ddf.groupby("a").agg(agg).compute(),
gddf.groupby("a").agg(agg).compute(),
ddf.groupby("a").agg(agg),
gddf.groupby("a").agg(agg),
)

dd.assert_eq(
getattr(ddf.groupby("a"), agg)().compute(),
getattr(gddf.groupby("a"), agg)().compute(),
getattr(ddf.groupby("a"), agg)(),
getattr(gddf.groupby("a"), agg)(),
)

dd.assert_eq(
gdf.groupby("a").agg(agg), gddf.groupby("a").agg(agg).compute()
)
dd.assert_eq(gdf.groupby("a").agg(agg), gddf.groupby("a").agg(agg))

dd.assert_eq(
getattr(gdf.groupby("a"), agg)(),
getattr(gddf.groupby("a"), agg)().compute(),
getattr(gddf.groupby("a"), agg)(),
)


Expand Down