diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index bd3bf0b9c07..0b6ea1fb646 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -44,6 +44,15 @@ def _quantile_75(x): return x.quantile(0.75) +def _is_row_of(chunk, obj): + return ( + isinstance(chunk, cudf.Series) + and isinstance(obj, cudf.DataFrame) + and len(chunk.index) == len(obj._column_names) + and (chunk.index.to_pandas() == pd.Index(obj._column_names)).all() + ) + + groupby_doc_template = textwrap.dedent( """Group using a mapper or by a Series of columns. @@ -1186,19 +1195,44 @@ def _iterative_groupby_apply( grouped_values[s:e] for s, e in zip(offsets[:-1], offsets[1:]) ] chunk_results = [function(chk, *args) for chk in chunks] + return self._post_process_chunk_results( + chunk_results, group_names, group_keys, grouped_values + ) + + def _post_process_chunk_results( + self, chunk_results, group_names, group_keys, grouped_values + ): if not len(chunk_results): return self.obj.head(0) - if cudf.api.types.is_scalar(chunk_results[0]): result = cudf.Series._from_data( {None: chunk_results}, index=group_names ) result.index.names = self.grouping.names + return result elif isinstance(chunk_results[0], cudf.Series) and isinstance( self.obj, cudf.DataFrame ): - result = cudf.concat(chunk_results, axis=1).T - result.index.names = self.grouping.names + # When the UDF is like df.sum(), the result for each + # group is a row-like "Series" where the index labels + # are the same as the original calling DataFrame + if _is_row_of(chunk_results[0], self.obj): + result = cudf.concat(chunk_results, axis=1).T + result.index = group_names + result.index.names = self.grouping.names + # When the UDF is like df.x + df.y, the result for each + # group is the same length as the original group + elif len(self.obj) == sum(len(chk) for chk in chunk_results): + result = cudf.concat(chunk_results) + index_data = group_keys._data.copy(deep=True) + index_data[None] = grouped_values.index._column + result.index = cudf.MultiIndex._from_data(index_data) + else: + raise TypeError( + "Error handling Groupby apply output with input of " + f"type {type(self.obj)} and output of " + f"type {type(chunk_results[0])}" + ) else: result = cudf.concat(chunk_results) if self._group_keys: diff --git a/python/cudf/cudf/tests/test_groupby.py b/python/cudf/cudf/tests/test_groupby.py index 24db7008804..9a72b85dd13 100644 --- a/python/cudf/cudf/tests/test_groupby.py +++ b/python/cudf/cudf/tests/test_groupby.py @@ -571,6 +571,36 @@ def f(group): assert precompiled.currsize == 3 +@pytest.mark.parametrize("func", [lambda group: group.x + group.y]) +def test_groupby_apply_return_col_from_df(func): + # tests a UDF that consists of purely colwise + # ops, such as `lambda group: group.x + group.y` + # which returns a column + df = cudf.datasets.randomdata() + pdf = df.to_pandas() + + def func(df): + return df.x + df.y + + expect = pdf.groupby("id").apply(func) + got = df.groupby("id").apply(func) + + assert_groupby_results_equal(expect, got) + + +@pytest.mark.parametrize("func", [lambda group: group.sum()]) +def test_groupby_apply_return_df(func): + # tests a UDF that reduces over a dataframe + # and produces a series with the original column names + # as its index, such as lambda group: group.sum() + group.min() + df = cudf.DataFrame({"a": [1, 1, 2, 2], "b": [1, 2, 3, 4]}) + pdf = df.to_pandas() + + expect = pdf.groupby("a").apply(func) + got = df.groupby("a").apply(func) + assert_groupby_results_equal(expect, got) + + @pytest.mark.parametrize("nelem", [2, 3, 100, 500, 1000]) @pytest.mark.parametrize( "func",