Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add checks for HLG layers in dask-cudf groupby tests #10853

Merged
Merged
Changes from 5 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
6571499
Add checks for dask-cudf groupby HLG layers
charlesbluca May 13, 2022
9bc6645
Remove some tests that are now superfluous
charlesbluca May 13, 2022
1115f1c
Merge remote-tracking branch 'origin/branch-22.10' into checkout-grou…
charlesbluca Sep 13, 2022
1c700cb
Update check to use new layer names
charlesbluca Sep 13, 2022
2bec53a
Merge remote-tracking branch 'origin/branch-22.12' into checkout-grou…
charlesbluca Nov 1, 2022
543f8db
Update python/dask_cudf/dask_cudf/tests/test_groupby.py
charlesbluca Nov 2, 2022
b77a14a
Rename layer assertion function, improve error message
charlesbluca Nov 2, 2022
9d458f8
Parametrize nulls in pdf fixtures, use in test_groupby_agg
charlesbluca Nov 2, 2022
c7eaad1
Merge cumulative agg tests into main groupby tests
charlesbluca Nov 2, 2022
069fc65
Add back in test_groupby_first_last with some changes
charlesbluca Nov 2, 2022
711470f
Revert "Merge cumulative agg tests into main groupby tests"
charlesbluca Nov 2, 2022
2112fea
xfail cumulative test for null dataframes
charlesbluca Nov 3, 2022
60649ba
Remove cumulative aggs from SUPPORTED_AGGS
charlesbluca Nov 3, 2022
5b2fe76
Merge remote-tracking branch 'origin/branch-22.12' into checkout-grou…
charlesbluca Nov 3, 2022
2001ffe
Rename util groupby functions from supported -> optimized
charlesbluca Nov 3, 2022
6eddbcf
Merge remote-tracking branch 'origin/branch-22.12' into checkout-grou…
charlesbluca Nov 3, 2022
cc64ebc
Wrap groupby decorator with functools.wraps
charlesbluca Nov 3, 2022
78be977
Merge remote-tracking branch 'origin/branch-22.12' into checkout-grou…
charlesbluca Nov 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 39 additions & 84 deletions python/dask_cudf/dask_cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import dask
from dask import dataframe as dd
from dask.utils_test import hlg_layer

import cudf
from cudf.core._compat import PANDAS_GE_120
Expand All @@ -14,6 +15,18 @@
from dask_cudf.groupby import AGGS, CUMULATIVE_AGGS, _aggs_supported


def check_groupby_result(ddf):
charlesbluca marked this conversation as resolved.
Show resolved Hide resolved
"""Assert that groupby result is using dask-cudf's codepath"""

try:
hlg_layer(ddf.dask, "cudf-aggregate-chunk")
hlg_layer(ddf.dask, "cudf-aggregate-agg")
except KeyError:
raise AssertionError(
"Dask dataframe does not contain dask-cudf groupby layer"
)


@pytest.fixture
def pdf():
np.random.seed(0)
Expand Down Expand Up @@ -42,21 +55,21 @@ def test_groupby_basic(series, aggregation, pdf):
gdf_grouped = gdf_grouped.xx
ddf_grouped = ddf_grouped.xx

a = getattr(gdf_grouped, aggregation)()
b = getattr(ddf_grouped, aggregation)().compute()
check_dtype = False if aggregation == "count" else True
charlesbluca marked this conversation as resolved.
Show resolved Hide resolved

if aggregation == "count":
dd.assert_eq(a, b, check_dtype=False)
else:
dd.assert_eq(a, b)
expect = getattr(gdf_grouped, aggregation)()
actual = getattr(ddf_grouped, aggregation)()

a = gdf_grouped.agg({"xx": aggregation})
b = ddf_grouped.agg({"xx": aggregation}).compute()
check_groupby_result(actual)

if aggregation == "count":
dd.assert_eq(a, b, check_dtype=False)
else:
dd.assert_eq(a, b)
dd.assert_eq(expect, actual, check_dtype=check_dtype)

expect = gdf_grouped.agg({"xx": aggregation})
actual = ddf_grouped.agg({"xx": aggregation})

check_groupby_result(actual)

dd.assert_eq(expect, actual, check_dtype=check_dtype)


@pytest.mark.parametrize("series", [True, False])
Expand All @@ -82,17 +95,19 @@ def test_groupby_cumulative(aggregation, pdf, series):
dd.assert_eq(a, b)


@pytest.mark.parametrize("aggregation", AGGS)
@pytest.mark.parametrize(
"func",
[
lambda df: df.groupby("x").agg({"y": "max"}),
lambda df: df.groupby("x").agg(["sum", "max"]),
lambda df: df.groupby("x").y.agg(["sum", "max"]),
lambda df: df.groupby("x").agg("sum"),
lambda df: df.groupby("x").y.agg("sum"),
lambda df, agg: df.groupby("x").agg({"y": agg}),
lambda df, agg: df.groupby("x").y.agg({"y": agg}),
lambda df, agg: df.groupby("x").agg([agg]),
lambda df, agg: df.groupby("x").y.agg([agg]),
lambda df, agg: df.groupby("x").agg(agg),
lambda df, agg: df.groupby("x").y.agg(agg),
],
)
def test_groupby_agg(func):
def test_groupby_agg(func, aggregation):
pdf = pd.DataFrame(
{
"x": np.random.randint(0, 5, size=10000),
Expand All @@ -104,15 +119,14 @@ def test_groupby_agg(func):

ddf = dask_cudf.from_cudf(gdf, npartitions=5)

a = func(gdf).to_pandas()
b = func(ddf).compute().to_pandas()
actual = func(ddf, aggregation)
expect = func(gdf, aggregation)

a.index.name = None
a.name = None
b.index.name = None
b.name = None
check_dtype = False if aggregation == "count" else True

dd.assert_eq(a, b)
check_groupby_result(actual)

dd.assert_eq(expect, actual, check_names=False, check_dtype=check_dtype)


@pytest.mark.parametrize("split_out", [1, 3])
Expand All @@ -136,28 +150,6 @@ def test_groupby_agg_empty_partition(tmpdir, split_out):
dd.assert_eq(gb.compute().sort_index(), expect)


@pytest.mark.parametrize(
"func",
[lambda df: df.groupby("x").std(), lambda df: df.groupby("x").y.std()],
)
def test_groupby_std(func):
pdf = pd.DataFrame(
{
"x": np.random.randint(0, 5, size=10000),
"y": np.random.normal(size=10000),
}
)

gdf = cudf.DataFrame.from_pandas(pdf)

ddf = dask_cudf.from_cudf(gdf, npartitions=5)

a = func(gdf).to_pandas()
b = func(ddf).compute().to_pandas()

dd.assert_eq(a, b)
charlesbluca marked this conversation as resolved.
Show resolved Hide resolved


# reason gotattr in cudf
@pytest.mark.parametrize(
"func",
Expand Down Expand Up @@ -728,43 +720,6 @@ def test_groupby_unique_lists():
)


@pytest.mark.parametrize(
"data",
[
{"a": [], "b": []},
{"a": [2, 1, 2, 1, 1, 3], "b": [None, 1, 2, None, 2, None]},
{"a": [None], "b": [None]},
{"a": [2, 1, 1], "b": [None, 1, 0], "c": [None, 0, 1]},
],
)
@pytest.mark.parametrize("agg", ["first", "last"])
def test_groupby_first_last(data, agg):
charlesbluca marked this conversation as resolved.
Show resolved Hide resolved
pdf = pd.DataFrame(data)
gdf = cudf.DataFrame.from_pandas(pdf)

ddf = dd.from_pandas(pdf, npartitions=2)
gddf = dask_cudf.from_cudf(gdf, npartitions=2)

dd.assert_eq(
ddf.groupby("a").agg(agg).compute(),
gddf.groupby("a").agg(agg).compute(),
)

dd.assert_eq(
getattr(ddf.groupby("a"), agg)().compute(),
getattr(gddf.groupby("a"), agg)().compute(),
)

dd.assert_eq(
gdf.groupby("a").agg(agg), gddf.groupby("a").agg(agg).compute()
)

dd.assert_eq(
getattr(gdf.groupby("a"), agg)(),
getattr(gddf.groupby("a"), agg)().compute(),
)


def test_groupby_with_list_of_series():
df = cudf.DataFrame({"a": [1, 2, 3, 4, 5]})
gdf = dask_cudf.from_cudf(df, npartitions=2)
Expand Down