From ee27168f43a41ea6ce432030e062e118d2410b52 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Tue, 28 Mar 2023 14:08:20 -0700 Subject: [PATCH 1/4] initial --- python/cudf/cudf/core/groupby/groupby.py | 3 +-- python/cudf/cudf/tests/test_groupby.py | 12 ++++++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index cb4c0f6b48b..fe4d71736f7 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -1179,7 +1179,6 @@ def _iterative_groupby_apply( chunk_results = [function(chk, *args) for chk in chunks] if not len(chunk_results): return self.obj.head(0) - if cudf.api.types.is_scalar(chunk_results[0]): result = cudf.Series._from_data( {None: chunk_results}, index=group_names @@ -1187,7 +1186,7 @@ def _iterative_groupby_apply( result.index.names = self.grouping.names elif isinstance(chunk_results[0], cudf.Series) and isinstance( self.obj, cudf.DataFrame - ): + ) and (chunk_results[0].to_pandas().index.values == self.obj.columns).all(): result = cudf.concat(chunk_results, axis=1).T result.index.names = self.grouping.names else: diff --git a/python/cudf/cudf/tests/test_groupby.py b/python/cudf/cudf/tests/test_groupby.py index e58d70f49c7..fc49c14ff7b 100644 --- a/python/cudf/cudf/tests/test_groupby.py +++ b/python/cudf/cudf/tests/test_groupby.py @@ -570,6 +570,18 @@ def f(group): assert precompiled.currsize == 3 +def test_groupby_apply_return_column(): + df = cudf.datasets.randomdata() + pdf = df.to_pandas() + + def func(df): + return df.x + df.y + + expect = pdf.groupby("id").apply(func) + got = df.groupby("id").apply(func) + + assert_groupby_results_equal(expect, got) + @pytest.mark.parametrize("nelem", [2, 3, 100, 500, 1000]) @pytest.mark.parametrize( From 957dd9b6f9c8ece21075134f0599d58b9ddfbea2 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Thu, 6 Apr 2023 08:35:17 -0700 Subject: [PATCH 2/4] add tests, refactor logic --- python/cudf/cudf/core/groupby/groupby.py | 33 +++++++++++++++++++++--- python/cudf/cudf/tests/test_groupby.py | 20 +++++++++++++- 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index fe4d71736f7..2aa4289c1e5 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -44,6 +44,14 @@ def _quantile_75(x): return x.quantile(0.75) +def _is_row_of(chunk, obj): + if isinstance(chunk, cudf.Series) and isinstance(obj, cudf.DataFrame): + if len(chunk.index) == len(obj.columns): + if (chunk.index.to_pandas() == obj.columns).all(): + return True + return False + + groupby_doc_template = textwrap.dedent( """Group using a mapper or by a Series of columns. @@ -1177,6 +1185,13 @@ def _iterative_groupby_apply( grouped_values[s:e] for s, e in zip(offsets[:-1], offsets[1:]) ] chunk_results = [function(chk, *args) for chk in chunks] + return self._post_process_chunk_results( + chunk_results, group_names, group_keys, grouped_values + ) + + def _post_process_chunk_results( + self, chunk_results, group_names, group_keys, grouped_values + ): if not len(chunk_results): return self.obj.head(0) if cudf.api.types.is_scalar(chunk_results[0]): @@ -1186,9 +1201,21 @@ def _iterative_groupby_apply( result.index.names = self.grouping.names elif isinstance(chunk_results[0], cudf.Series) and isinstance( self.obj, cudf.DataFrame - ) and (chunk_results[0].to_pandas().index.values == self.obj.columns).all(): - result = cudf.concat(chunk_results, axis=1).T - result.index.names = self.grouping.names + ): + # When the UDF is like df.sum(), the result for each + # group is a row-like "Series" where the index labels + # are the same as the original calling DataFrame + if _is_row_of(chunk_results[0], self.obj): + result = cudf.concat(chunk_results, axis=1).T + result.index = group_names + result.index.names = self.grouping.names + # When the UDF is like df.x + df.y, the result for each + # group is the same length as the original group + elif len(self.obj) == sum(len(chk) for chk in chunk_results): + result = cudf.concat(chunk_results) + index_data = group_keys._data.copy(deep=True) + index_data[None] = grouped_values.index._column + result.index = cudf.MultiIndex._from_data(index_data) else: result = cudf.concat(chunk_results) if self._group_keys: diff --git a/python/cudf/cudf/tests/test_groupby.py b/python/cudf/cudf/tests/test_groupby.py index fc49c14ff7b..2a94c947cc2 100644 --- a/python/cudf/cudf/tests/test_groupby.py +++ b/python/cudf/cudf/tests/test_groupby.py @@ -570,7 +570,12 @@ def f(group): assert precompiled.currsize == 3 -def test_groupby_apply_return_column(): + +@pytest.mark.parametrize("func", [lambda group: group.x + group.y]) +def test_groupby_apply_return_col_from_df(func): + # tests a UDF that consists of purely colwise + # ops, such as `lambda group: group.x + group.y` + # which returns a column df = cudf.datasets.randomdata() pdf = df.to_pandas() @@ -583,6 +588,19 @@ def func(df): assert_groupby_results_equal(expect, got) +@pytest.mark.parametrize("func", [lambda group: group.sum()]) +def test_groupby_apply_return_df(func): + # tests a UDF that reduces over a dataframe + # and produces a series with the original column names + # as its index, such as lambda group: group.sum() + group.min() + df = cudf.DataFrame({"a": [1, 1, 2, 2], "b": [1, 2, 3, 4]}) + pdf = df.to_pandas() + + expect = pdf.groupby("a").apply(func) + got = df.groupby("a").apply(func) + assert_groupby_results_equal(expect, got) + + @pytest.mark.parametrize("nelem", [2, 3, 100, 500, 1000]) @pytest.mark.parametrize( "func", From 3b528be943ccfd1f65c7fa0f0b5f8319026d8f18 Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Fri, 21 Apr 2023 10:04:30 -0700 Subject: [PATCH 3/4] dont use external apis --- python/cudf/cudf/core/groupby/groupby.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index 484bb67342e..0e02f5b8c30 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -46,8 +46,8 @@ def _quantile_75(x): def _is_row_of(chunk, obj): if isinstance(chunk, cudf.Series) and isinstance(obj, cudf.DataFrame): - if len(chunk.index) == len(obj.columns): - if (chunk.index.to_pandas() == obj.columns).all(): + if len(chunk.index) == len(obj._column_names): + if (chunk.index.to_pandas() == pd.Index(obj._column_names)).all(): return True return False From 8d2370dc77bae3888d80a2a71e9c5b9459d7455a Mon Sep 17 00:00:00 2001 From: brandon-b-miller Date: Mon, 24 Apr 2023 08:29:17 -0700 Subject: [PATCH 4/4] raise if groupby output isn't handled --- python/cudf/cudf/core/groupby/groupby.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index 0e02f5b8c30..11ba5e1ceb1 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -45,11 +45,12 @@ def _quantile_75(x): def _is_row_of(chunk, obj): - if isinstance(chunk, cudf.Series) and isinstance(obj, cudf.DataFrame): - if len(chunk.index) == len(obj._column_names): - if (chunk.index.to_pandas() == pd.Index(obj._column_names)).all(): - return True - return False + return ( + isinstance(chunk, cudf.Series) + and isinstance(obj, cudf.DataFrame) + and len(chunk.index) == len(obj._column_names) + and (chunk.index.to_pandas() == pd.Index(obj._column_names)).all() + ) groupby_doc_template = textwrap.dedent( @@ -1199,6 +1200,7 @@ def _post_process_chunk_results( {None: chunk_results}, index=group_names ) result.index.names = self.grouping.names + return result elif isinstance(chunk_results[0], cudf.Series) and isinstance( self.obj, cudf.DataFrame ): @@ -1216,6 +1218,12 @@ def _post_process_chunk_results( index_data = group_keys._data.copy(deep=True) index_data[None] = grouped_values.index._column result.index = cudf.MultiIndex._from_data(index_data) + else: + raise TypeError( + "Error handling Groupby apply output with input of " + f"type {type(self.obj)} and output of " + f"type {type(chunk_results[0])}" + ) else: result = cudf.concat(chunk_results) if self._group_keys: