From ee8cd59e297ff3d6e462247b35c46e39f761db68 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Tue, 10 May 2022 13:13:46 -0400 Subject: [PATCH] Improve coverage of dask-cudf's groupby aggregation, add tests for `dropna` support (#10449) This PR does the following: - Make sure that all of dask-cudf's `SUPPORTED_AGGS` have an overriding method for upstream Dask's series / dataframe groupby methods - Add tests comparing dask-cudf's `dropna` support to upstream Dask's, as at the moment we are only comparing against cuDF - Fix the resulting failures of these changes (by properly parsing `self.dropna` in dask-cudf's groupby code) As a side note, I think that a larger rethinking of dask-cudf's groupby would pay off well, as currently it seems like we have some "duplicate" tests and aren't really able to discern if `groupby_agg` was called for a supported aggregation Authors: - Charles Blackmon-Luca (https://github.com/charlesbluca) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) - Richard (Rick) Zamora (https://github.com/rjzamora) URL: https://github.com/rapidsai/cudf/pull/10449 --- python/dask_cudf/dask_cudf/groupby.py | 281 ++++++++++++++++-- .../dask_cudf/dask_cudf/tests/test_groupby.py | 101 ++++++- 2 files changed, 352 insertions(+), 30 deletions(-) diff --git a/python/dask_cudf/dask_cudf/groupby.py b/python/dask_cudf/dask_cudf/groupby.py index 684b1f71099..d137fac5fe3 100644 --- a/python/dask_cudf/dask_cudf/groupby.py +++ b/python/dask_cudf/dask_cudf/groupby.py @@ -35,6 +35,25 @@ ) +def _check_groupby_supported(func): + """ + Decorator for dask-cudf's groupby methods that returns the dask-cudf + method if the groupby object is supported, otherwise reverting to the + upstream Dask method + """ + + def wrapper(*args, **kwargs): + gb = args[0] + if _groupby_supported(gb): + return func(*args, **kwargs) + # note that we use upstream Dask's default kwargs for this call if + # none are specified; this shouldn't be an issue as those defaults are + # consistent with dask-cudf + return getattr(super(type(gb), gb), func.__name__)(*args[1:], **kwargs) + + return wrapper + + class CudfDataFrameGroupBy(DataFrameGroupBy): @_dask_cudf_nvtx_annotate def __init__(self, *args, **kwargs): @@ -65,6 +84,22 @@ def __getitem__(self, key): return g @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def count(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "count" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported def mean(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -72,13 +107,89 @@ def mean(self, split_every=None, split_out=1): {c: "mean" for c in self.obj.columns if c not in self.by}, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def std(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "std" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def var(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "var" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def sum(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "sum" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def min(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "min" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, ) @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def max(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "max" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported def collect(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -86,10 +197,40 @@ def collect(self, split_every=None, split_out=1): {c: "collect" for c in self.obj.columns if c not in self.by}, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def first(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "first" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + ) + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def last(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {c: "last" for c in self.obj.columns if c not in self.by}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, ) @_dask_cudf_nvtx_annotate @@ -98,17 +239,7 @@ def aggregate(self, arg, split_every=None, split_out=1): return self.size() arg = _redirect_aggs(arg) - if ( - isinstance(self.obj, DaskDataFrame) - and ( - isinstance(self.by, str) - or ( - isinstance(self.by, list) - and all(isinstance(x, str) for x in self.by) - ) - ) - and _is_supported(arg, SUPPORTED_AGGS) - ): + if _groupby_supported(self) and _aggs_supported(arg, SUPPORTED_AGGS): if isinstance(self._meta.grouping.keys, cudf.MultiIndex): keys = self._meta.grouping.keys.names else: @@ -120,10 +251,10 @@ def aggregate(self, arg, split_every=None, split_out=1): arg, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, ) return super().aggregate( @@ -139,6 +270,22 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def count(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {self._slice: "count"}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + )[self._slice] + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported def mean(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -146,13 +293,14 @@ def mean(self, split_every=None, split_out=1): {self._slice: "mean"}, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, )[self._slice] @_dask_cudf_nvtx_annotate + @_check_groupby_supported def std(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -160,13 +308,14 @@ def std(self, split_every=None, split_out=1): {self._slice: "std"}, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, )[self._slice] @_dask_cudf_nvtx_annotate + @_check_groupby_supported def var(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -174,13 +323,59 @@ def var(self, split_every=None, split_out=1): {self._slice: "var"}, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, )[self._slice] @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def sum(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {self._slice: "sum"}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + )[self._slice] + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def min(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {self._slice: "min"}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + )[self._slice] + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def max(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {self._slice: "max"}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + )[self._slice] + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported def collect(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -188,10 +383,40 @@ def collect(self, split_every=None, split_out=1): {self._slice: "collect"}, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, + )[self._slice] + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def first(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {self._slice: "first"}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, + )[self._slice] + + @_dask_cudf_nvtx_annotate + @_check_groupby_supported + def last(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.by, + {self._slice: "last"}, + split_every=split_every, + split_out=split_out, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + **self.dropna, )[self._slice] @_dask_cudf_nvtx_annotate @@ -203,21 +428,17 @@ def aggregate(self, arg, split_every=None, split_out=1): if not isinstance(arg, dict): arg = {self._slice: arg} - if ( - isinstance(self.obj, DaskDataFrame) - and isinstance(self.by, (str, list)) - and _is_supported(arg, SUPPORTED_AGGS) - ): + if _groupby_supported(self) and _aggs_supported(arg, SUPPORTED_AGGS): return groupby_agg( self.obj, self.by, arg, split_every=split_every, split_out=split_out, - dropna=self.dropna, sep=self.sep, sort=self.sort, as_index=self.as_index, + **self.dropna, )[self._slice] return super().aggregate( @@ -258,7 +479,7 @@ def groupby_agg( """ # Assert that aggregations are supported aggs = _redirect_aggs(aggs_in) - if not _is_supported(aggs, SUPPORTED_AGGS): + if not _aggs_supported(aggs, SUPPORTED_AGGS): raise ValueError( f"Supported aggs include {SUPPORTED_AGGS} for groupby_agg API. " f"Aggregations must be specified with dict or list syntax." @@ -420,7 +641,7 @@ def _redirect_aggs(arg): @_dask_cudf_nvtx_annotate -def _is_supported(arg, supported: set): +def _aggs_supported(arg, supported: set): """Check that aggregations in `arg` are a subset of `supported`""" if isinstance(arg, (list, dict)): if isinstance(arg, dict): @@ -439,6 +660,14 @@ def _is_supported(arg, supported: set): return False +def _groupby_supported(gb): + """Check that groupby input is supported by dask-cudf""" + return isinstance(gb.obj, DaskDataFrame) and ( + isinstance(gb.by, str) + or (isinstance(gb.by, list) and all(isinstance(x, str) for x in gb.by)) + ) + + def _make_name(*args, sep="_"): """Combine elements of `args` into a new string""" _args = (arg for arg in args if arg != "") diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index e3545149c24..d2c9ecd0293 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -11,7 +11,7 @@ from cudf.core._compat import PANDAS_GE_120 import dask_cudf -from dask_cudf.groupby import SUPPORTED_AGGS, _is_supported +from dask_cudf.groupby import SUPPORTED_AGGS, _aggs_supported @pytest.mark.parametrize("aggregation", SUPPORTED_AGGS) @@ -235,8 +235,7 @@ def test_groupby_split_out(split_out, column): @pytest.mark.parametrize( "by", ["a", "b", "c", "d", ["a", "b"], ["a", "c"], ["a", "d"]] ) -def test_groupby_dropna(dropna, by): - +def test_groupby_dropna_cudf(dropna, by): # NOTE: This test is borrowed from upstream dask # (dask/dask/dataframe/tests/test_groupby.py) df = cudf.DataFrame( @@ -265,6 +264,100 @@ def test_groupby_dropna(dropna, by): dd.assert_eq(dask_result, cudf_result) +@pytest.mark.parametrize( + "dropna,by", + [ + (False, "a"), + (False, "b"), + (False, "c"), + pytest.param( + False, + "d", + marks=pytest.mark.xfail( + reason="dropna=False is broken in Dask CPU for groupbys on " + "categorical columns" + ), + ), + pytest.param( + False, + ["a", "b"], + marks=pytest.mark.xfail( + reason="https://github.com/dask/dask/issues/8817" + ), + ), + pytest.param( + False, + ["a", "c"], + marks=pytest.mark.xfail( + reason="https://github.com/dask/dask/issues/8817" + ), + ), + pytest.param( + False, + ["a", "d"], + marks=pytest.mark.xfail( + reason="multi-col groupbys on categorical columns are broken " + "in Dask CPU" + ), + ), + (True, "a"), + (True, "b"), + (True, "c"), + (True, "d"), + (True, ["a", "b"]), + (True, ["a", "c"]), + pytest.param( + True, + ["a", "d"], + marks=pytest.mark.xfail( + reason="multi-col groupbys on categorical columns are broken " + "in Dask CPU" + ), + ), + (None, "a"), + (None, "b"), + (None, "c"), + (None, "d"), + (None, ["a", "b"]), + (None, ["a", "c"]), + pytest.param( + None, + ["a", "d"], + marks=pytest.mark.xfail( + reason="multi-col groupbys on categorical columns are broken " + "in Dask CPU" + ), + ), + ], +) +def test_groupby_dropna_dask(dropna, by): + # NOTE: This test is borrowed from upstream dask + # (dask/dask/dataframe/tests/test_groupby.py) + df = pd.DataFrame( + { + "a": [1, 2, 3, 4, None, None, 7, 8], + "b": [1, None, 1, 3, None, 3, 1, 3], + "c": ["a", "b", None, None, "e", "f", "g", "h"], + "e": [4, 5, 6, 3, 2, 1, 0, 0], + } + ) + df["b"] = df["b"].astype("datetime64[ns]") + df["d"] = df["c"].astype("category") + + gdf = cudf.from_pandas(df) + ddf = dd.from_pandas(df, npartitions=3) + gddf = dask_cudf.from_cudf(gdf, npartitions=3) + + if dropna is None: + dask_cudf_result = gddf.groupby(by).e.sum() + dask_result = ddf.groupby(by).e.sum() + else: + dask_cudf_result = gddf.groupby(by, dropna=dropna).e.sum() + dask_result = ddf.groupby(by, dropna=dropna).e.sum() + + dd.assert_eq(dask_cudf_result, dask_result) + + @pytest.mark.parametrize("myindex", [[1, 2] * 4, ["s1", "s2"] * 4]) def test_groupby_string_index_name(myindex): # GH-Issue #3420 @@ -570,7 +663,7 @@ def test_groupby_agg_redirect(aggregations): [["not_supported"], {"a": "not_supported"}, {"a": ["not_supported"]}], ) def test_is_supported(arg): - assert _is_supported(arg, {"supported"}) is False + assert _aggs_supported(arg, {"supported"}) is False def test_groupby_unique_lists():