Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bugs in iterative groupby apply algorithm #13078

Merged
merged 6 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 37 additions & 3 deletions python/cudf/cudf/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ def _quantile_75(x):
return x.quantile(0.75)


def _is_row_of(chunk, obj):
return (
isinstance(chunk, cudf.Series)
and isinstance(obj, cudf.DataFrame)
and len(chunk.index) == len(obj._column_names)
and (chunk.index.to_pandas() == pd.Index(obj._column_names)).all()
)


groupby_doc_template = textwrap.dedent(
"""Group using a mapper or by a Series of columns.

Expand Down Expand Up @@ -1177,19 +1186,44 @@ def _iterative_groupby_apply(
grouped_values[s:e] for s, e in zip(offsets[:-1], offsets[1:])
]
chunk_results = [function(chk, *args) for chk in chunks]
return self._post_process_chunk_results(
chunk_results, group_names, group_keys, grouped_values
)

def _post_process_chunk_results(
self, chunk_results, group_names, group_keys, grouped_values
):
if not len(chunk_results):
return self.obj.head(0)

if cudf.api.types.is_scalar(chunk_results[0]):
result = cudf.Series._from_data(
{None: chunk_results}, index=group_names
)
result.index.names = self.grouping.names
return result
elif isinstance(chunk_results[0], cudf.Series) and isinstance(
self.obj, cudf.DataFrame
):
result = cudf.concat(chunk_results, axis=1).T
result.index.names = self.grouping.names
# When the UDF is like df.sum(), the result for each
# group is a row-like "Series" where the index labels
# are the same as the original calling DataFrame
if _is_row_of(chunk_results[0], self.obj):
result = cudf.concat(chunk_results, axis=1).T
result.index = group_names
result.index.names = self.grouping.names
# When the UDF is like df.x + df.y, the result for each
# group is the same length as the original group
elif len(self.obj) == sum(len(chk) for chk in chunk_results):
bdice marked this conversation as resolved.
Show resolved Hide resolved
result = cudf.concat(chunk_results)
index_data = group_keys._data.copy(deep=True)
index_data[None] = grouped_values.index._column
result.index = cudf.MultiIndex._from_data(index_data)
else:
raise TypeError(
"Error handling Groupby apply output with input of "
f"type {type(self.obj)} and output of "
f"type {type(chunk_results[0])}"
)
else:
result = cudf.concat(chunk_results)
if self._group_keys:
Expand Down
30 changes: 30 additions & 0 deletions python/cudf/cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,36 @@ def f(group):
assert precompiled.currsize == 3


@pytest.mark.parametrize("func", [lambda group: group.x + group.y])
def test_groupby_apply_return_col_from_df(func):
# tests a UDF that consists of purely colwise
# ops, such as `lambda group: group.x + group.y`
# which returns a column
df = cudf.datasets.randomdata()
pdf = df.to_pandas()

def func(df):
return df.x + df.y

expect = pdf.groupby("id").apply(func)
got = df.groupby("id").apply(func)

assert_groupby_results_equal(expect, got)


@pytest.mark.parametrize("func", [lambda group: group.sum()])
def test_groupby_apply_return_df(func):
# tests a UDF that reduces over a dataframe
# and produces a series with the original column names
# as its index, such as lambda group: group.sum() + group.min()
df = cudf.DataFrame({"a": [1, 1, 2, 2], "b": [1, 2, 3, 4]})
pdf = df.to_pandas()

expect = pdf.groupby("a").apply(func)
got = df.groupby("a").apply(func)
assert_groupby_results_equal(expect, got)


@pytest.mark.parametrize("nelem", [2, 3, 100, 500, 1000])
@pytest.mark.parametrize(
"func",
Expand Down