Skip to content

Commit

Permalink
Fix bugs in iterative groupby apply algorithm (#13078)
Browse files Browse the repository at this point in the history
Closes #13037
Closes #12987

Authors:
  - https://github.com/brandon-b-miller

Approvers:
  - Bradley Dice (https://github.com/bdice)

URL: #13078
  • Loading branch information
brandon-b-miller authored Apr 26, 2023
1 parent 588643f commit df9eefc
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 3 deletions.
40 changes: 37 additions & 3 deletions python/cudf/cudf/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ def _quantile_75(x):
return x.quantile(0.75)


def _is_row_of(chunk, obj):
return (
isinstance(chunk, cudf.Series)
and isinstance(obj, cudf.DataFrame)
and len(chunk.index) == len(obj._column_names)
and (chunk.index.to_pandas() == pd.Index(obj._column_names)).all()
)


groupby_doc_template = textwrap.dedent(
"""Group using a mapper or by a Series of columns.
Expand Down Expand Up @@ -1186,19 +1195,44 @@ def _iterative_groupby_apply(
grouped_values[s:e] for s, e in zip(offsets[:-1], offsets[1:])
]
chunk_results = [function(chk, *args) for chk in chunks]
return self._post_process_chunk_results(
chunk_results, group_names, group_keys, grouped_values
)

def _post_process_chunk_results(
self, chunk_results, group_names, group_keys, grouped_values
):
if not len(chunk_results):
return self.obj.head(0)

if cudf.api.types.is_scalar(chunk_results[0]):
result = cudf.Series._from_data(
{None: chunk_results}, index=group_names
)
result.index.names = self.grouping.names
return result
elif isinstance(chunk_results[0], cudf.Series) and isinstance(
self.obj, cudf.DataFrame
):
result = cudf.concat(chunk_results, axis=1).T
result.index.names = self.grouping.names
# When the UDF is like df.sum(), the result for each
# group is a row-like "Series" where the index labels
# are the same as the original calling DataFrame
if _is_row_of(chunk_results[0], self.obj):
result = cudf.concat(chunk_results, axis=1).T
result.index = group_names
result.index.names = self.grouping.names
# When the UDF is like df.x + df.y, the result for each
# group is the same length as the original group
elif len(self.obj) == sum(len(chk) for chk in chunk_results):
result = cudf.concat(chunk_results)
index_data = group_keys._data.copy(deep=True)
index_data[None] = grouped_values.index._column
result.index = cudf.MultiIndex._from_data(index_data)
else:
raise TypeError(
"Error handling Groupby apply output with input of "
f"type {type(self.obj)} and output of "
f"type {type(chunk_results[0])}"
)
else:
result = cudf.concat(chunk_results)
if self._group_keys:
Expand Down
30 changes: 30 additions & 0 deletions python/cudf/cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,36 @@ def f(group):
assert precompiled.currsize == 3


@pytest.mark.parametrize("func", [lambda group: group.x + group.y])
def test_groupby_apply_return_col_from_df(func):
# tests a UDF that consists of purely colwise
# ops, such as `lambda group: group.x + group.y`
# which returns a column
df = cudf.datasets.randomdata()
pdf = df.to_pandas()

def func(df):
return df.x + df.y

expect = pdf.groupby("id").apply(func)
got = df.groupby("id").apply(func)

assert_groupby_results_equal(expect, got)


@pytest.mark.parametrize("func", [lambda group: group.sum()])
def test_groupby_apply_return_df(func):
# tests a UDF that reduces over a dataframe
# and produces a series with the original column names
# as its index, such as lambda group: group.sum() + group.min()
df = cudf.DataFrame({"a": [1, 1, 2, 2], "b": [1, 2, 3, 4]})
pdf = df.to_pandas()

expect = pdf.groupby("a").apply(func)
got = df.groupby("a").apply(func)
assert_groupby_results_equal(expect, got)


@pytest.mark.parametrize("nelem", [2, 3, 100, 500, 1000])
@pytest.mark.parametrize(
"func",
Expand Down

0 comments on commit df9eefc

Please sign in to comment.