From 6571499395821b51cb58a56f603f10eefc8daf53 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Fri, 13 May 2022 13:01:08 -0700 Subject: [PATCH 01/13] Add checks for dask-cudf groupby HLG layers --- .../dask_cudf/dask_cudf/tests/test_groupby.py | 63 +++++++++++-------- 1 file changed, 38 insertions(+), 25 deletions(-) diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index 2b7f2bdae36..3e3d6d9e22f 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -6,6 +6,7 @@ import dask from dask import dataframe as dd +from dask.utils_test import hlg_layer import cudf from cudf.core._compat import PANDAS_GE_120 @@ -14,6 +15,17 @@ from dask_cudf.groupby import SUPPORTED_AGGS, _aggs_supported +def check_groupby_result(ddf): + """Assert that groupby result is using dask-cudf's codepath""" + + try: + hlg_layer(ddf.dask, "groupby_agg") + except KeyError: + raise AssertionError( + "Dask dataframe does not contain dask-cudf groupby layer" + ) + + @pytest.mark.parametrize("aggregation", SUPPORTED_AGGS) @pytest.mark.parametrize("series", [False, True]) def test_groupby_basic(series, aggregation): @@ -37,34 +49,36 @@ def test_groupby_basic(series, aggregation): gdf_grouped = gdf_grouped.xx ddf_grouped = ddf_grouped.xx - a = getattr(gdf_grouped, aggregation)() - b = getattr(ddf_grouped, aggregation)().compute() + check_dtype = False if aggregation == "count" else True - if aggregation == "count": - dd.assert_eq(a, b, check_dtype=False) - else: - dd.assert_eq(a, b) + expect = getattr(gdf_grouped, aggregation)() + actual = getattr(ddf_grouped, aggregation)() - a = gdf_grouped.agg({"xx": aggregation}) - b = ddf_grouped.agg({"xx": aggregation}).compute() + check_groupby_result(actual) - if aggregation == "count": - dd.assert_eq(a, b, check_dtype=False) - else: - dd.assert_eq(a, b) + dd.assert_eq(expect, actual, check_dtype=check_dtype) + + expect = gdf_grouped.agg({"xx": aggregation}) + actual = ddf_grouped.agg({"xx": aggregation}) + + check_groupby_result(actual) + + dd.assert_eq(expect, actual, check_dtype=check_dtype) +@pytest.mark.parametrize("aggregation", SUPPORTED_AGGS) @pytest.mark.parametrize( "func", [ - lambda df: df.groupby("x").agg({"y": "max"}), - lambda df: df.groupby("x").agg(["sum", "max"]), - lambda df: df.groupby("x").y.agg(["sum", "max"]), - lambda df: df.groupby("x").agg("sum"), - lambda df: df.groupby("x").y.agg("sum"), + lambda df, agg: df.groupby("x").agg({"y": agg}), + lambda df, agg: df.groupby("x").y.agg({"y": agg}), + lambda df, agg: df.groupby("x").agg([agg]), + lambda df, agg: df.groupby("x").y.agg([agg]), + lambda df, agg: df.groupby("x").agg(agg), + lambda df, agg: df.groupby("x").y.agg(agg), ], ) -def test_groupby_agg(func): +def test_groupby_agg(func, aggregation): pdf = pd.DataFrame( { "x": np.random.randint(0, 5, size=10000), @@ -76,15 +90,14 @@ def test_groupby_agg(func): ddf = dask_cudf.from_cudf(gdf, npartitions=5) - a = func(gdf).to_pandas() - b = func(ddf).compute().to_pandas() + actual = func(ddf, aggregation) + expect = func(gdf, aggregation) - a.index.name = None - a.name = None - b.index.name = None - b.name = None + check_dtype = False if aggregation == "count" else True - dd.assert_eq(a, b) + check_groupby_result(actual) + + dd.assert_eq(expect, actual, check_names=False, check_dtype=check_dtype) @pytest.mark.parametrize("split_out", [1, 3]) From 9bc66456c17502e2c359db99dce592d303d9965b Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Fri, 13 May 2022 13:03:44 -0700 Subject: [PATCH 02/13] Remove some tests that are now superfluous --- .../dask_cudf/dask_cudf/tests/test_groupby.py | 59 ------------------- 1 file changed, 59 deletions(-) diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index 3e3d6d9e22f..fae21e75e98 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -121,28 +121,6 @@ def test_groupby_agg_empty_partition(tmpdir, split_out): dd.assert_eq(gb.compute().sort_index(), expect) -@pytest.mark.parametrize( - "func", - [lambda df: df.groupby("x").std(), lambda df: df.groupby("x").y.std()], -) -def test_groupby_std(func): - pdf = pd.DataFrame( - { - "x": np.random.randint(0, 5, size=10000), - "y": np.random.normal(size=10000), - } - ) - - gdf = cudf.DataFrame.from_pandas(pdf) - - ddf = dask_cudf.from_cudf(gdf, npartitions=5) - - a = func(gdf).to_pandas() - b = func(ddf).compute().to_pandas() - - dd.assert_eq(a, b) - - # reason gotattr in cudf @pytest.mark.parametrize( "func", @@ -710,43 +688,6 @@ def test_groupby_unique_lists(): ) -@pytest.mark.parametrize( - "data", - [ - {"a": [], "b": []}, - {"a": [2, 1, 2, 1, 1, 3], "b": [None, 1, 2, None, 2, None]}, - {"a": [None], "b": [None]}, - {"a": [2, 1, 1], "b": [None, 1, 0], "c": [None, 0, 1]}, - ], -) -@pytest.mark.parametrize("agg", ["first", "last"]) -def test_groupby_first_last(data, agg): - pdf = pd.DataFrame(data) - gdf = cudf.DataFrame.from_pandas(pdf) - - ddf = dd.from_pandas(pdf, npartitions=2) - gddf = dask_cudf.from_cudf(gdf, npartitions=2) - - dd.assert_eq( - ddf.groupby("a").agg(agg).compute(), - gddf.groupby("a").agg(agg).compute(), - ) - - dd.assert_eq( - getattr(ddf.groupby("a"), agg)().compute(), - getattr(gddf.groupby("a"), agg)().compute(), - ) - - dd.assert_eq( - gdf.groupby("a").agg(agg), gddf.groupby("a").agg(agg).compute() - ) - - dd.assert_eq( - getattr(gdf.groupby("a"), agg)(), - getattr(gddf.groupby("a"), agg)().compute(), - ) - - def test_groupby_with_list_of_series(): df = cudf.DataFrame({"a": [1, 2, 3, 4, 5]}) gdf = dask_cudf.from_cudf(df, npartitions=2) From 1c700cb1e7c076af0becb2117df6c077348910f5 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Tue, 13 Sep 2022 11:28:50 -0700 Subject: [PATCH 03/13] Update check to use new layer names --- python/dask_cudf/dask_cudf/tests/test_groupby.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index 65c7890b059..3ac17422297 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -19,7 +19,8 @@ def check_groupby_result(ddf): """Assert that groupby result is using dask-cudf's codepath""" try: - hlg_layer(ddf.dask, "groupby_agg") + hlg_layer(ddf.dask, "cudf-aggregate-chunk") + hlg_layer(ddf.dask, "cudf-aggregate-agg") except KeyError: raise AssertionError( "Dask dataframe does not contain dask-cudf groupby layer" From 543f8db6df9fa45f4d3fe3e5a7a2e84dbbad7553 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Wed, 2 Nov 2022 09:51:20 -0400 Subject: [PATCH 04/13] Update python/dask_cudf/dask_cudf/tests/test_groupby.py Co-authored-by: Lawrence Mitchell --- python/dask_cudf/dask_cudf/tests/test_groupby.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index 428e0eeda30..b6832de40fc 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -55,7 +55,7 @@ def test_groupby_basic(series, aggregation, pdf): gdf_grouped = gdf_grouped.xx ddf_grouped = ddf_grouped.xx - check_dtype = False if aggregation == "count" else True + check_dtype = aggregation != "count" expect = getattr(gdf_grouped, aggregation)() actual = getattr(ddf_grouped, aggregation)() From b77a14a4745539a0882496e088d06318a84668d9 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Wed, 2 Nov 2022 07:26:01 -0700 Subject: [PATCH 05/13] Rename layer assertion function, improve error message --- .../dask_cudf/dask_cudf/tests/test_groupby.py | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index b6832de40fc..93c1f741928 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -15,16 +15,15 @@ from dask_cudf.groupby import AGGS, CUMULATIVE_AGGS, _aggs_supported -def check_groupby_result(ddf): - """Assert that groupby result is using dask-cudf's codepath""" - - try: - hlg_layer(ddf.dask, "cudf-aggregate-chunk") - hlg_layer(ddf.dask, "cudf-aggregate-agg") - except KeyError: - raise AssertionError( - "Dask dataframe does not contain dask-cudf groupby layer" - ) +def assert_cudf_groupby_layers(ddf): + for prefix in ("cudf-aggregate-chunk", "cudf-aggregate-agg"): + try: + hlg_layer(ddf.dask, prefix) + except KeyError: + raise AssertionError( + "Expected Dask dataframe to contain groupby layer with " + f"prefix {prefix}" + ) @pytest.fixture @@ -60,14 +59,14 @@ def test_groupby_basic(series, aggregation, pdf): expect = getattr(gdf_grouped, aggregation)() actual = getattr(ddf_grouped, aggregation)() - check_groupby_result(actual) + assert_cudf_groupby_layers(actual) dd.assert_eq(expect, actual, check_dtype=check_dtype) expect = gdf_grouped.agg({"xx": aggregation}) actual = ddf_grouped.agg({"xx": aggregation}) - check_groupby_result(actual) + assert_cudf_groupby_layers(actual) dd.assert_eq(expect, actual, check_dtype=check_dtype) @@ -124,7 +123,7 @@ def test_groupby_agg(func, aggregation): check_dtype = False if aggregation == "count" else True - check_groupby_result(actual) + assert_cudf_groupby_layers(actual) dd.assert_eq(expect, actual, check_names=False, check_dtype=check_dtype) From 9d458f8f241c994fb2b51c37fddd34b74ced9102 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Wed, 2 Nov 2022 10:07:40 -0700 Subject: [PATCH 06/13] Parametrize nulls in pdf fixtures, use in test_groupby_agg --- .../dask_cudf/dask_cudf/tests/test_groupby.py | 33 +++++++++---------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index 93c1f741928..3afdb7ca270 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -26,8 +26,8 @@ def assert_cudf_groupby_layers(ddf): ) -@pytest.fixture -def pdf(): +@pytest.fixture(params=["non_null", "null"]) +def pdf(request): np.random.seed(0) # note that column name "x" is a substring of the groupby key; @@ -39,13 +39,17 @@ def pdf(): "y": np.random.normal(size=10000), } ) + + # insert nulls into dataframe at random + if request.param == "null": + pdf = pdf.mask(np.random.choice([True, False], size=pdf.shape)) + return pdf @pytest.mark.parametrize("aggregation", AGGS) @pytest.mark.parametrize("series", [False, True]) def test_groupby_basic(series, aggregation, pdf): - gdf = cudf.DataFrame.from_pandas(pdf) gdf_grouped = gdf.groupby("xx") ddf_grouped = dask_cudf.from_cudf(gdf, npartitions=5).groupby("xx") @@ -98,22 +102,15 @@ def test_groupby_cumulative(aggregation, pdf, series): @pytest.mark.parametrize( "func", [ - lambda df, agg: df.groupby("x").agg({"y": agg}), - lambda df, agg: df.groupby("x").y.agg({"y": agg}), - lambda df, agg: df.groupby("x").agg([agg]), - lambda df, agg: df.groupby("x").y.agg([agg]), - lambda df, agg: df.groupby("x").agg(agg), - lambda df, agg: df.groupby("x").y.agg(agg), + lambda df, agg: df.groupby("xx").agg({"y": agg}), + lambda df, agg: df.groupby("xx").y.agg({"y": agg}), + lambda df, agg: df.groupby("xx").agg([agg]), + lambda df, agg: df.groupby("xx").y.agg([agg]), + lambda df, agg: df.groupby("xx").agg(agg), + lambda df, agg: df.groupby("xx").y.agg(agg), ], ) -def test_groupby_agg(func, aggregation): - pdf = pd.DataFrame( - { - "x": np.random.randint(0, 5, size=10000), - "y": np.random.normal(size=10000), - } - ) - +def test_groupby_agg(func, aggregation, pdf): gdf = cudf.DataFrame.from_pandas(pdf) ddf = dask_cudf.from_cudf(gdf, npartitions=5) @@ -121,7 +118,7 @@ def test_groupby_agg(func, aggregation): actual = func(ddf, aggregation) expect = func(gdf, aggregation) - check_dtype = False if aggregation == "count" else True + check_dtype = aggregation != "count" assert_cudf_groupby_layers(actual) From c7eaad11f63450b9970f7572c96bcf64e007b78f Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Wed, 2 Nov 2022 10:44:57 -0700 Subject: [PATCH 07/13] Merge cumulative agg tests into main groupby tests --- python/dask_cudf/dask_cudf/groupby.py | 60 +++++++++++++++++++ .../dask_cudf/dask_cudf/tests/test_groupby.py | 43 +++++-------- 2 files changed, 75 insertions(+), 28 deletions(-) diff --git a/python/dask_cudf/dask_cudf/groupby.py b/python/dask_cudf/dask_cudf/groupby.py index f5258e6cab8..1ffa1f8b36a 100644 --- a/python/dask_cudf/dask_cudf/groupby.py +++ b/python/dask_cudf/dask_cudf/groupby.py @@ -243,6 +243,36 @@ def last(self, split_every=None, split_out=1): **self.dropna, ) + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def cumsum(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + self._make_groupby_method_aggs("cumsum"), + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def cumcount(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + self._make_groupby_method_aggs("cumcount"), + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + ) + @_dask_cudf_nvtx_annotate def aggregate(self, arg, split_every=None, split_out=1, shuffle=None): if arg == "size": @@ -436,6 +466,36 @@ def last(self, split_every=None, split_out=1): **self.dropna, )[self._slice] + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def cumsum(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {self._slice: "cumsum"}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + )[self._slice] + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def cumcount(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {self._slice: "cumcount"}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + )[self._slice] + @_dask_cudf_nvtx_annotate def aggregate(self, arg, split_every=None, split_out=1, shuffle=None): if arg == "size": diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index 3afdb7ca270..50cd0ad9e0a 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -14,6 +14,8 @@ import dask_cudf from dask_cudf.groupby import AGGS, CUMULATIVE_AGGS, _aggs_supported +ALL_AGGS = AGGS + CUMULATIVE_AGGS + def assert_cudf_groupby_layers(ddf): for prefix in ("cudf-aggregate-chunk", "cudf-aggregate-agg"): @@ -47,14 +49,14 @@ def pdf(request): return pdf -@pytest.mark.parametrize("aggregation", AGGS) -@pytest.mark.parametrize("series", [False, True]) -def test_groupby_basic(series, aggregation, pdf): +@pytest.mark.parametrize("aggregation", ALL_AGGS) +@pytest.mark.parametrize("type", ["frame", "series"]) +def test_groupby_basic(type, aggregation, pdf): gdf = cudf.DataFrame.from_pandas(pdf) gdf_grouped = gdf.groupby("xx") ddf_grouped = dask_cudf.from_cudf(gdf, npartitions=5).groupby("xx") - if series: + if type == "series": gdf_grouped = gdf_grouped.xx ddf_grouped = ddf_grouped.xx @@ -65,40 +67,25 @@ def test_groupby_basic(series, aggregation, pdf): assert_cudf_groupby_layers(actual) - dd.assert_eq(expect, actual, check_dtype=check_dtype) + if aggregation == "cumsum" and type == "series": + with pytest.xfail(reason="https://github.com/dask/dask/issues/9313"): + dd.assert_eq(expect, actual, check_dtype=check_dtype) + else: + dd.assert_eq(expect, actual, check_dtype=check_dtype) expect = gdf_grouped.agg({"xx": aggregation}) actual = ddf_grouped.agg({"xx": aggregation}) assert_cudf_groupby_layers(actual) - dd.assert_eq(expect, actual, check_dtype=check_dtype) - - -@pytest.mark.parametrize("series", [True, False]) -@pytest.mark.parametrize("aggregation", CUMULATIVE_AGGS) -def test_groupby_cumulative(aggregation, pdf, series): - gdf = cudf.DataFrame.from_pandas(pdf) - ddf = dask_cudf.from_cudf(gdf, npartitions=5) - - gdf_grouped = gdf.groupby("xx") - ddf_grouped = ddf.groupby("xx") - - if series: - gdf_grouped = gdf_grouped.xx - ddf_grouped = ddf_grouped.xx - - a = getattr(gdf_grouped, aggregation)() - b = getattr(ddf_grouped, aggregation)().compute() - - if aggregation == "cumsum" and series: + if aggregation == "cumsum" and type == "series": with pytest.xfail(reason="https://github.com/dask/dask/issues/9313"): - dd.assert_eq(a, b) + dd.assert_eq(expect, actual, check_dtype=check_dtype) else: - dd.assert_eq(a, b) + dd.assert_eq(expect, actual, check_dtype=check_dtype) -@pytest.mark.parametrize("aggregation", AGGS) +@pytest.mark.parametrize("aggregation", ALL_AGGS) @pytest.mark.parametrize( "func", [ From 069fc65e68d86eea31f8ac21478b29d34c4cac94 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Wed, 2 Nov 2022 10:48:16 -0700 Subject: [PATCH 08/13] Add back in test_groupby_first_last with some changes --- .../dask_cudf/dask_cudf/tests/test_groupby.py | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index 50cd0ad9e0a..57f7af0f312 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -703,6 +703,41 @@ def test_groupby_unique_lists(): ) +@pytest.mark.parametrize( + "data", + [ + {"a": [], "b": []}, + {"a": [2, 1, 2, 1, 1, 3], "b": [None, 1, 2, None, 2, None]}, + {"a": [None], "b": [None]}, + {"a": [2, 1, 1], "b": [None, 1, 0], "c": [None, 0, 1]}, + ], +) +@pytest.mark.parametrize("agg", ["first", "last"]) +def test_groupby_first_last(data, agg): + pdf = pd.DataFrame(data) + gdf = cudf.DataFrame.from_pandas(pdf) + + ddf = dd.from_pandas(pdf, npartitions=2) + gddf = dask_cudf.from_cudf(gdf, npartitions=2) + + dd.assert_eq( + ddf.groupby("a").agg(agg), + gddf.groupby("a").agg(agg), + ) + + dd.assert_eq( + getattr(ddf.groupby("a"), agg)(), + getattr(gddf.groupby("a"), agg)(), + ) + + dd.assert_eq(gdf.groupby("a").agg(agg), gddf.groupby("a").agg(agg)) + + dd.assert_eq( + getattr(gdf.groupby("a"), agg)(), + getattr(gddf.groupby("a"), agg)(), + ) + + def test_groupby_with_list_of_series(): df = cudf.DataFrame({"a": [1, 2, 3, 4, 5]}) gdf = dask_cudf.from_cudf(df, npartitions=2) From 711470fabb1cad7d5013412a11e8a8963bedfafc Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Wed, 2 Nov 2022 12:38:00 -0700 Subject: [PATCH 09/13] Revert "Merge cumulative agg tests into main groupby tests" This reverts commit c7eaad11f63450b9970f7572c96bcf64e007b78f. --- python/dask_cudf/dask_cudf/groupby.py | 60 ------------------- .../dask_cudf/dask_cudf/tests/test_groupby.py | 43 ++++++++----- 2 files changed, 28 insertions(+), 75 deletions(-) diff --git a/python/dask_cudf/dask_cudf/groupby.py b/python/dask_cudf/dask_cudf/groupby.py index 1ffa1f8b36a..f5258e6cab8 100644 --- a/python/dask_cudf/dask_cudf/groupby.py +++ b/python/dask_cudf/dask_cudf/groupby.py @@ -243,36 +243,6 @@ def last(self, split_every=None, split_out=1): **self.dropna, ) - @_dask_cudf_nvtx_annotate - @_check_groupby_supported - def cumsum(self, split_every=None, split_out=1): - return groupby_agg( - self.obj, - self.by, - self._make_groupby_method_aggs("cumsum"), - split_every=split_every, - split_out=split_out, - sep=self.sep, - sort=self.sort, - as_index=self.as_index, - **self.dropna, - ) - - @_dask_cudf_nvtx_annotate - @_check_groupby_supported - def cumcount(self, split_every=None, split_out=1): - return groupby_agg( - self.obj, - self.by, - self._make_groupby_method_aggs("cumcount"), - split_every=split_every, - split_out=split_out, - sep=self.sep, - sort=self.sort, - as_index=self.as_index, - **self.dropna, - ) - @_dask_cudf_nvtx_annotate def aggregate(self, arg, split_every=None, split_out=1, shuffle=None): if arg == "size": @@ -466,36 +436,6 @@ def last(self, split_every=None, split_out=1): **self.dropna, )[self._slice] - @_dask_cudf_nvtx_annotate - @_check_groupby_supported - def cumsum(self, split_every=None, split_out=1): - return groupby_agg( - self.obj, - self.by, - {self._slice: "cumsum"}, - split_every=split_every, - split_out=split_out, - sep=self.sep, - sort=self.sort, - as_index=self.as_index, - **self.dropna, - )[self._slice] - - @_dask_cudf_nvtx_annotate - @_check_groupby_supported - def cumcount(self, split_every=None, split_out=1): - return groupby_agg( - self.obj, - self.by, - {self._slice: "cumcount"}, - split_every=split_every, - split_out=split_out, - sep=self.sep, - sort=self.sort, - as_index=self.as_index, - **self.dropna, - )[self._slice] - @_dask_cudf_nvtx_annotate def aggregate(self, arg, split_every=None, split_out=1, shuffle=None): if arg == "size": diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index 57f7af0f312..8a94b96ff2f 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -14,8 +14,6 @@ import dask_cudf from dask_cudf.groupby import AGGS, CUMULATIVE_AGGS, _aggs_supported -ALL_AGGS = AGGS + CUMULATIVE_AGGS - def assert_cudf_groupby_layers(ddf): for prefix in ("cudf-aggregate-chunk", "cudf-aggregate-agg"): @@ -49,14 +47,14 @@ def pdf(request): return pdf -@pytest.mark.parametrize("aggregation", ALL_AGGS) -@pytest.mark.parametrize("type", ["frame", "series"]) -def test_groupby_basic(type, aggregation, pdf): +@pytest.mark.parametrize("aggregation", AGGS) +@pytest.mark.parametrize("series", [False, True]) +def test_groupby_basic(series, aggregation, pdf): gdf = cudf.DataFrame.from_pandas(pdf) gdf_grouped = gdf.groupby("xx") ddf_grouped = dask_cudf.from_cudf(gdf, npartitions=5).groupby("xx") - if type == "series": + if series: gdf_grouped = gdf_grouped.xx ddf_grouped = ddf_grouped.xx @@ -67,25 +65,40 @@ def test_groupby_basic(type, aggregation, pdf): assert_cudf_groupby_layers(actual) - if aggregation == "cumsum" and type == "series": - with pytest.xfail(reason="https://github.com/dask/dask/issues/9313"): - dd.assert_eq(expect, actual, check_dtype=check_dtype) - else: - dd.assert_eq(expect, actual, check_dtype=check_dtype) + dd.assert_eq(expect, actual, check_dtype=check_dtype) expect = gdf_grouped.agg({"xx": aggregation}) actual = ddf_grouped.agg({"xx": aggregation}) assert_cudf_groupby_layers(actual) - if aggregation == "cumsum" and type == "series": + dd.assert_eq(expect, actual, check_dtype=check_dtype) + + +@pytest.mark.parametrize("series", [True, False]) +@pytest.mark.parametrize("aggregation", CUMULATIVE_AGGS) +def test_groupby_cumulative(aggregation, pdf, series): + gdf = cudf.DataFrame.from_pandas(pdf) + ddf = dask_cudf.from_cudf(gdf, npartitions=5) + + gdf_grouped = gdf.groupby("xx") + ddf_grouped = ddf.groupby("xx") + + if series: + gdf_grouped = gdf_grouped.xx + ddf_grouped = ddf_grouped.xx + + a = getattr(gdf_grouped, aggregation)() + b = getattr(ddf_grouped, aggregation)().compute() + + if aggregation == "cumsum" and series: with pytest.xfail(reason="https://github.com/dask/dask/issues/9313"): - dd.assert_eq(expect, actual, check_dtype=check_dtype) + dd.assert_eq(a, b) else: - dd.assert_eq(expect, actual, check_dtype=check_dtype) + dd.assert_eq(a, b) -@pytest.mark.parametrize("aggregation", ALL_AGGS) +@pytest.mark.parametrize("aggregation", AGGS) @pytest.mark.parametrize( "func", [ From 2112fea59e8efad1c3b0a36ebd1d1be82e2685ba Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 3 Nov 2022 07:11:11 -0700 Subject: [PATCH 10/13] xfail cumulative test for null dataframes --- python/dask_cudf/dask_cudf/tests/test_groupby.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index 8a94b96ff2f..38bb23ab1df 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -75,13 +75,21 @@ def test_groupby_basic(series, aggregation, pdf): dd.assert_eq(expect, actual, check_dtype=check_dtype) +# TODO: explore adding support with `.agg()` @pytest.mark.parametrize("series", [True, False]) @pytest.mark.parametrize("aggregation", CUMULATIVE_AGGS) def test_groupby_cumulative(aggregation, pdf, series): gdf = cudf.DataFrame.from_pandas(pdf) ddf = dask_cudf.from_cudf(gdf, npartitions=5) - gdf_grouped = gdf.groupby("xx") + if pdf.isna().sum().any(): + with pytest.xfail( + reason="https://github.com/rapidsai/cudf/issues/12055" + ): + gdf_grouped = gdf.groupby("xx") + else: + gdf_grouped = gdf.groupby("xx") + ddf_grouped = ddf.groupby("xx") if series: @@ -89,7 +97,7 @@ def test_groupby_cumulative(aggregation, pdf, series): ddf_grouped = ddf_grouped.xx a = getattr(gdf_grouped, aggregation)() - b = getattr(ddf_grouped, aggregation)().compute() + b = getattr(ddf_grouped, aggregation)() if aggregation == "cumsum" and series: with pytest.xfail(reason="https://github.com/dask/dask/issues/9313"): From 60649ba3db2b86ace015af1b4349e99c950d0953 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 3 Nov 2022 07:39:52 -0700 Subject: [PATCH 11/13] Remove cumulative aggs from SUPPORTED_AGGS --- python/dask_cudf/dask_cudf/groupby.py | 9 +-------- python/dask_cudf/dask_cudf/tests/test_groupby.py | 10 +++++----- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/python/dask_cudf/dask_cudf/groupby.py b/python/dask_cudf/dask_cudf/groupby.py index f5258e6cab8..3c7b0e14505 100644 --- a/python/dask_cudf/dask_cudf/groupby.py +++ b/python/dask_cudf/dask_cudf/groupby.py @@ -16,12 +16,7 @@ import cudf from cudf.utils.utils import _dask_cudf_nvtx_annotate -CUMULATIVE_AGGS = ( - "cumsum", - "cumcount", -) - -AGGS = ( +SUPPORTED_AGGS = ( "count", "mean", "std", @@ -34,8 +29,6 @@ "last", ) -SUPPORTED_AGGS = (*AGGS, *CUMULATIVE_AGGS) - def _check_groupby_supported(func): """ diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index 38bb23ab1df..fbf94b9e4a8 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -12,7 +12,7 @@ from cudf.core._compat import PANDAS_GE_120 import dask_cudf -from dask_cudf.groupby import AGGS, CUMULATIVE_AGGS, _aggs_supported +from dask_cudf.groupby import SUPPORTED_AGGS, _aggs_supported def assert_cudf_groupby_layers(ddf): @@ -47,7 +47,7 @@ def pdf(request): return pdf -@pytest.mark.parametrize("aggregation", AGGS) +@pytest.mark.parametrize("aggregation", SUPPORTED_AGGS) @pytest.mark.parametrize("series", [False, True]) def test_groupby_basic(series, aggregation, pdf): gdf = cudf.DataFrame.from_pandas(pdf) @@ -77,7 +77,7 @@ def test_groupby_basic(series, aggregation, pdf): # TODO: explore adding support with `.agg()` @pytest.mark.parametrize("series", [True, False]) -@pytest.mark.parametrize("aggregation", CUMULATIVE_AGGS) +@pytest.mark.parametrize("aggregation", ["cumsum", "cumcount"]) def test_groupby_cumulative(aggregation, pdf, series): gdf = cudf.DataFrame.from_pandas(pdf) ddf = dask_cudf.from_cudf(gdf, npartitions=5) @@ -106,7 +106,7 @@ def test_groupby_cumulative(aggregation, pdf, series): dd.assert_eq(a, b) -@pytest.mark.parametrize("aggregation", AGGS) +@pytest.mark.parametrize("aggregation", SUPPORTED_AGGS) @pytest.mark.parametrize( "func", [ @@ -706,7 +706,7 @@ def test_groupby_agg_redirect(aggregations): ], ) def test_is_supported(arg, supported): - assert _aggs_supported(arg, AGGS) is supported + assert _aggs_supported(arg, SUPPORTED_AGGS) is supported def test_groupby_unique_lists(): From 2001ffeed892d8ac6d431e06a0fa9985222a7c9d Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 3 Nov 2022 08:02:41 -0700 Subject: [PATCH 12/13] Rename util groupby functions from supported -> optimized --- python/dask_cudf/dask_cudf/groupby.py | 67 ++++++++++--------- .../dask_cudf/dask_cudf/tests/test_groupby.py | 8 +-- 2 files changed, 38 insertions(+), 37 deletions(-) diff --git a/python/dask_cudf/dask_cudf/groupby.py b/python/dask_cudf/dask_cudf/groupby.py index 3c7b0e14505..7ae7651e689 100644 --- a/python/dask_cudf/dask_cudf/groupby.py +++ b/python/dask_cudf/dask_cudf/groupby.py @@ -16,7 +16,8 @@ import cudf from cudf.utils.utils import _dask_cudf_nvtx_annotate -SUPPORTED_AGGS = ( +# aggregations that are dask-cudf optimized +OPTIMIZED_AGGS = ( "count", "mean", "std", @@ -30,16 +31,16 @@ ) -def _check_groupby_supported(func): +def _check_groupby_optimized(func): """ Decorator for dask-cudf's groupby methods that returns the dask-cudf - method if the groupby object is supported, otherwise reverting to the - upstream Dask method + optimized method if the groupby object is supported, otherwise + reverting to the upstream Dask method """ def wrapper(*args, **kwargs): gb = args[0] - if _groupby_supported(gb): + if _groupby_optimized(gb): return func(*args, **kwargs) # note that we use upstream Dask's default kwargs for this call if # none are specified; this shouldn't be an issue as those defaults are @@ -87,7 +88,7 @@ def _make_groupby_method_aggs(self, agg_name): return {c: agg_name for c in self.obj.columns if c != self.by} @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def count(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -102,7 +103,7 @@ def count(self, split_every=None, split_out=1): ) @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def mean(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -117,7 +118,7 @@ def mean(self, split_every=None, split_out=1): ) @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def std(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -132,7 +133,7 @@ def std(self, split_every=None, split_out=1): ) @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def var(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -147,7 +148,7 @@ def var(self, split_every=None, split_out=1): ) @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def sum(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -162,7 +163,7 @@ def sum(self, split_every=None, split_out=1): ) @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def min(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -177,7 +178,7 @@ def min(self, split_every=None, split_out=1): ) @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def max(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -192,7 +193,7 @@ def max(self, split_every=None, split_out=1): ) @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def collect(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -207,7 +208,7 @@ def collect(self, split_every=None, split_out=1): ) @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def first(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -222,7 +223,7 @@ def first(self, split_every=None, split_out=1): ) @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def last(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -243,7 +244,7 @@ def aggregate(self, arg, split_every=None, split_out=1, shuffle=None): arg = _redirect_aggs(arg) - if _groupby_supported(self) and _aggs_supported(arg, SUPPORTED_AGGS): + if _groupby_optimized(self) and _aggs_optimized(arg, OPTIMIZED_AGGS): if isinstance(self._meta.grouping.keys, cudf.MultiIndex): keys = self._meta.grouping.keys.names else: @@ -280,7 +281,7 @@ def __init__(self, *args, sort=None, **kwargs): super().__init__(*args, sort=sort, **kwargs) @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def count(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -295,7 +296,7 @@ def count(self, split_every=None, split_out=1): )[self._slice] @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def mean(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -310,7 +311,7 @@ def mean(self, split_every=None, split_out=1): )[self._slice] @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def std(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -325,7 +326,7 @@ def std(self, split_every=None, split_out=1): )[self._slice] @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def var(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -340,7 +341,7 @@ def var(self, split_every=None, split_out=1): )[self._slice] @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def sum(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -355,7 +356,7 @@ def sum(self, split_every=None, split_out=1): )[self._slice] @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def min(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -370,7 +371,7 @@ def min(self, split_every=None, split_out=1): )[self._slice] @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def max(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -385,7 +386,7 @@ def max(self, split_every=None, split_out=1): )[self._slice] @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def collect(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -400,7 +401,7 @@ def collect(self, split_every=None, split_out=1): )[self._slice] @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def first(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -415,7 +416,7 @@ def first(self, split_every=None, split_out=1): )[self._slice] @_dask_cudf_nvtx_annotate - @_check_groupby_supported + @_check_groupby_optimized def last(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -439,7 +440,7 @@ def aggregate(self, arg, split_every=None, split_out=1, shuffle=None): if not isinstance(arg, dict): arg = {self._slice: arg} - if _groupby_supported(self) and _aggs_supported(arg, SUPPORTED_AGGS): + if _groupby_optimized(self) and _aggs_optimized(arg, OPTIMIZED_AGGS): return groupby_agg( self.obj, self.by, @@ -562,9 +563,9 @@ def groupby_agg( """ # Assert that aggregations are supported aggs = _redirect_aggs(aggs_in) - if not _aggs_supported(aggs, SUPPORTED_AGGS): + if not _aggs_optimized(aggs, OPTIMIZED_AGGS): raise ValueError( - f"Supported aggs include {SUPPORTED_AGGS} for groupby_agg API. " + f"Supported aggs include {OPTIMIZED_AGGS} for groupby_agg API. " f"Aggregations must be specified with dict or list syntax." ) @@ -728,7 +729,7 @@ def _redirect_aggs(arg): @_dask_cudf_nvtx_annotate -def _aggs_supported(arg, supported: set): +def _aggs_optimized(arg, supported: set): """Check that aggregations in `arg` are a subset of `supported`""" if isinstance(arg, (list, dict)): if isinstance(arg, dict): @@ -750,8 +751,8 @@ def _aggs_supported(arg, supported: set): @_dask_cudf_nvtx_annotate -def _groupby_supported(gb): - """Check that groupby input is supported by dask-cudf""" +def _groupby_optimized(gb): + """Check that groupby input can use dask-cudf optimized codepath""" return isinstance(gb.obj, DaskDataFrame) and ( isinstance(gb.by, str) or (isinstance(gb.by, list) and all(isinstance(x, str) for x in gb.by)) @@ -823,7 +824,7 @@ def _tree_node_agg(df, gb_cols, dropna, sort, sep): agg = col.split(sep)[-1] if agg in ("count", "sum"): agg_dict[col] = ["sum"] - elif agg in SUPPORTED_AGGS: + elif agg in OPTIMIZED_AGGS: agg_dict[col] = [agg] else: raise ValueError(f"Unexpected aggregation: {agg}") diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index fbf94b9e4a8..e43fead0b63 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -12,7 +12,7 @@ from cudf.core._compat import PANDAS_GE_120 import dask_cudf -from dask_cudf.groupby import SUPPORTED_AGGS, _aggs_supported +from dask_cudf.groupby import OPTIMIZED_AGGS, _aggs_optimized def assert_cudf_groupby_layers(ddf): @@ -47,7 +47,7 @@ def pdf(request): return pdf -@pytest.mark.parametrize("aggregation", SUPPORTED_AGGS) +@pytest.mark.parametrize("aggregation", OPTIMIZED_AGGS) @pytest.mark.parametrize("series", [False, True]) def test_groupby_basic(series, aggregation, pdf): gdf = cudf.DataFrame.from_pandas(pdf) @@ -106,7 +106,7 @@ def test_groupby_cumulative(aggregation, pdf, series): dd.assert_eq(a, b) -@pytest.mark.parametrize("aggregation", SUPPORTED_AGGS) +@pytest.mark.parametrize("aggregation", OPTIMIZED_AGGS) @pytest.mark.parametrize( "func", [ @@ -706,7 +706,7 @@ def test_groupby_agg_redirect(aggregations): ], ) def test_is_supported(arg, supported): - assert _aggs_supported(arg, SUPPORTED_AGGS) is supported + assert _aggs_optimized(arg, OPTIMIZED_AGGS) is supported def test_groupby_unique_lists(): From cc64ebcf9062920504bbb548b7e260d92155c7b7 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 3 Nov 2022 11:44:48 -0700 Subject: [PATCH 13/13] Wrap groupby decorator with functools.wraps --- python/dask_cudf/dask_cudf/groupby.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/dask_cudf/dask_cudf/groupby.py b/python/dask_cudf/dask_cudf/groupby.py index 7ae7651e689..54f8958c9eb 100644 --- a/python/dask_cudf/dask_cudf/groupby.py +++ b/python/dask_cudf/dask_cudf/groupby.py @@ -1,5 +1,6 @@ # Copyright (c) 2020-2022, NVIDIA CORPORATION. +from functools import wraps from typing import Set import numpy as np @@ -38,6 +39,7 @@ def _check_groupby_optimized(func): reverting to the upstream Dask method """ + @wraps(func) def wrapper(*args, **kwargs): gb = args[0] if _groupby_optimized(gb):