Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add groupby.apply(include_groups=) to match pandas 2.2 deprecation #15006

Merged
Merged
55 changes: 40 additions & 15 deletions python/cudf/cudf/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -1178,20 +1178,25 @@ def deserialize(cls, header, frames):
)
return cls(obj, grouping, **kwargs)

def _grouped(self):
def _grouped(self, *, include_groups: bool = True):
offsets, grouped_key_cols, grouped_value_cols = self._groupby.groups(
[*self.obj._index._columns, *self.obj._columns]
)
grouped_keys = cudf.core.index._index_from_columns(grouped_key_cols)
if isinstance(self.grouping.keys, cudf.MultiIndex):
grouped_keys.names = self.grouping.keys.names
to_drop = self.grouping.keys.names
else:
grouped_keys.name = self.grouping.keys.name
to_drop = (self.grouping.keys.name,)
grouped_values = self.obj._from_columns_like_self(
grouped_value_cols,
column_names=self.obj._column_names,
index_names=self.obj._index_names,
)
if not include_groups:
for col_name in to_drop:
del grouped_values[col_name]
group_names = grouped_keys.unique().sort_values()
return (group_names, offsets, grouped_keys, grouped_values)

Expand Down Expand Up @@ -1348,13 +1353,25 @@ def _post_process_chunk_results(
result.index.names = self.grouping.names
# When the UDF is like df.x + df.y, the result for each
# group is the same length as the original group
elif len(self.obj) == sum(len(chk) for chk in chunk_results):
elif (total_rows := sum(len(chk) for chk in chunk_results)) in {
len(self.obj),
len(group_names),
}:
with warnings.catch_warnings():
warnings.simplefilter("ignore", FutureWarning)
result = cudf.concat(chunk_results)
index_data = group_keys._data.copy(deep=True)
index_data[None] = grouped_values.index._column
result.index = cudf.MultiIndex._from_data(index_data)
if total_rows == len(group_names):
result.index = group_names
# TODO: Is there a better way to determine what
# the column name should be, especially if we applied
# a nameless UDF.
result = result.to_frame(
name=grouped_values._data.names[0]
)
else:
index_data = group_keys._data.copy(deep=True)
index_data[None] = grouped_values.index._column
result.index = cudf.MultiIndex._from_data(index_data)
else:
raise TypeError(
"Error handling Groupby apply output with input of "
Expand All @@ -1372,7 +1389,9 @@ def _post_process_chunk_results(
return result

@_cudf_nvtx_annotate
def apply(self, function, *args, engine="auto"):
def apply(
self, function, *args, engine="auto", include_groups: bool = True
):
"""Apply a python transformation function over the grouped chunk.

Parameters
Expand All @@ -1396,6 +1415,10 @@ def apply(self, function, *args, engine="auto"):
The default value `auto` will attempt to use the numba JIT pipeline
where possible and will fall back to the iterative algorithm if
necessary.
include_groups : bool, default True
When True, will attempt to apply ``func`` to the groupings in
the case that they are columns of the DataFrame. In the future,
this will default to ``False``.

Examples
--------
Expand Down Expand Up @@ -1444,15 +1467,15 @@ def mult(df):
... 'c': [1, 2, 3, 4],
... })
>>> gdf = cudf.from_pandas(df)
>>> df.groupby('a').apply(lambda x: x.iloc[[0]])
a b c
>>> df.groupby('a')[["b", "c"]].apply(lambda x: x.iloc[[0]])
b c
a
1 0 1 1 1
2 2 2 1 3
>>> gdf.groupby('a').apply(lambda x: x.iloc[[0]])
a b c
0 1 1 1
2 2 1 3
1 0 1 1
2 2 1 3
>>> gdf.groupby('a')[["b", "c"]].apply(lambda x: x.iloc[[0]])
b c
0 1 1
2 1 3

``engine='jit'`` may be used to accelerate certain functions,
initially those that contain reductions and arithmetic operations
Expand Down Expand Up @@ -1487,7 +1510,9 @@ def mult(df):

if not callable(function):
raise TypeError(f"type {type(function)} is not callable")
group_names, offsets, group_keys, grouped_values = self._grouped()
group_names, offsets, group_keys, grouped_values = self._grouped(
include_groups=include_groups
)

if engine == "auto":
if _can_be_jitted(grouped_values, function, args):
Expand Down
137 changes: 91 additions & 46 deletions python/cudf/cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,10 @@ def test_groupby_as_index_apply(pdf, gdf, as_index, engine):
gdf = gdf.groupby("y", as_index=as_index).apply(
lambda df: df["x"].mean(), engine=engine
)
pdf = pdf.groupby("y", as_index=as_index).apply(lambda df: df["x"].mean())
kwargs = {"func": lambda df: df["x"].mean()}
if PANDAS_GE_220:
kwargs["include_groups"] = False
pdf = pdf.groupby("y", as_index=as_index).apply(**kwargs)
assert_groupby_results_equal(pdf, gdf)


Expand Down Expand Up @@ -311,8 +314,12 @@ def foo(df):
df["out"] = df["val1"] + df["val2"]
return df

expect = expect_grpby.apply(foo)
got = got_grpby.apply(foo)
if PANDAS_GE_220:
kwargs = {"include_groups": False}
else:
kwargs = {}
expect = expect_grpby.apply(foo, **kwargs)
got = got_grpby.apply(foo, **kwargs)
assert_groupby_results_equal(expect, got)


Expand Down Expand Up @@ -346,24 +353,24 @@ def test_groupby_apply_args(func, args):
["key1", "key2"], as_index=False, group_keys=False
)
got_grpby = df.groupby(["key1", "key2"])

expect = expect_grpby.apply(func, *args)
got = got_grpby.apply(func, *args)
if PANDAS_GE_220:
kwargs = {"include_groups": False}
else:
kwargs = {}
expect = expect_grpby.apply(func, *args, **kwargs)
got = got_grpby.apply(func, *args, **kwargs)
assert_groupby_results_equal(expect, got)


def test_groupby_apply_grouped():
np.random.seed(0)
df = DataFrame()
nelem = 20
df["key1"] = np.random.randint(0, 3, nelem)
df["key2"] = np.random.randint(0, 2, nelem)
df["val1"] = np.random.random(nelem)
df["val2"] = np.random.random(nelem)
df["key1"] = range(nelem)
df["key2"] = range(nelem)
df["val1"] = range(nelem)
df["val2"] = range(nelem)

expect_grpby = df.to_pandas().groupby(
["key1", "key2"], as_index=False, group_keys=False
)
got_grpby = df.groupby(["key1", "key2"])

def foo(key1, val1, com1, com2):
Expand All @@ -380,14 +387,11 @@ def foo(key1, val1, com1, com2):

got = got.to_pandas()

# Get expected result by emulating the operation in pandas
def emulate(df):
df["com1"] = df.key1 * 10000 + df.val1
df["com2"] = np.arange(len(df), dtype=np.int32)
return df

expect = expect_grpby.apply(emulate)
expect = expect.sort_values(["key1", "key2"])
expect = df.copy()
expect["com1"] = (expect["key1"] * 10000 + expect["key1"]).astype(
np.float64
)
expect["com2"] = np.zeros(nelem, dtype=np.int32)

assert_groupby_results_equal(expect, got)

Expand Down Expand Up @@ -462,8 +466,14 @@ def run_groupby_apply_jit_test(data, func, keys, *args):
got_groupby_obj = data.groupby(keys)

# compare cuDF jit to pandas
cudf_jit_result = got_groupby_obj.apply(func, *args, engine="jit")
pandas_result = expect_groupby_obj.apply(func, *args)
if PANDAS_GE_220:
kwargs = {"include_groups": False}
else:
kwargs = {}
cudf_jit_result = got_groupby_obj.apply(
func, *args, engine="jit", **kwargs
)
pandas_result = expect_groupby_obj.apply(func, *args, **kwargs)
assert_groupby_results_equal(cudf_jit_result, pandas_result)


Expand Down Expand Up @@ -776,7 +786,7 @@ def test_groupby_apply_jit_block_divergence():
)

def diverging_block(grp_df):
if grp_df["a"].mean() > 0:
if grp_df["b"].mean() > 1:
return grp_df["b"].mean()
return 0

Expand Down Expand Up @@ -831,27 +841,41 @@ def f(group):
return group.sum()

part = partial(f)

expect = pdf.groupby("a").apply(part)
got = gdf.groupby("a").apply(part, engine="auto")

if PANDAS_GE_220:
kwargs = {"include_groups": False}
else:
kwargs = {}
expect = pdf.groupby("a").apply(part, **kwargs)
got = gdf.groupby("a").apply(part, engine="auto", **kwargs)
assert_groupby_results_equal(expect, got)


@pytest.mark.parametrize("func", [lambda group: group.x + group.y])
def test_groupby_apply_return_col_from_df(func):
def test_groupby_apply_return_col_from_df():
# tests a UDF that consists of purely colwise
# ops, such as `lambda group: group.x + group.y`
# which returns a column
df = cudf.datasets.randomdata()
func = lambda group: group.x + group.y # noqa:E731
df = cudf.DataFrame(
{
"id": range(10),
"x": range(10),
"y": range(10),
}
)
pdf = df.to_pandas()

def func(df):
return df.x + df.y

expect = pdf.groupby("id").apply(func)
got = df.groupby("id").apply(func)

if PANDAS_GE_220:
kwargs = {"include_groups": False}
else:
kwargs = {}
got = df.groupby("id").apply(func, **kwargs)
expect = pdf.groupby("id").apply(func, **kwargs)
# pandas seems to erroneously add an extra MI level of ids
# TODO: Figure out how pandas groupby.apply determines the columns
expect = pd.DataFrame(expect.droplevel(1), columns=got.columns)
assert_groupby_results_equal(expect, got)


Expand All @@ -863,8 +887,12 @@ def test_groupby_apply_return_df(func):
df = cudf.DataFrame({"a": [1, 1, 2, 2], "b": [1, 2, 3, 4]})
pdf = df.to_pandas()

expect = pdf.groupby("a").apply(func)
got = df.groupby("a").apply(func)
if PANDAS_GE_220:
kwargs = {"include_groups": False}
else:
kwargs = {}
expect = pdf.groupby("a").apply(func, **kwargs)
got = df.groupby("a").apply(func, **kwargs)
assert_groupby_results_equal(expect, got)


Expand Down Expand Up @@ -1433,7 +1461,7 @@ def test_groupby_nulls_basic(agg):

# TODO: fillna() used here since we don't follow
# Pandas' null semantics. Should we change it?
with expect_warning_if(agg in {"idxmax", "idxmin"}):
with expect_warning_if(agg in {"idxmax", "idxmin"} and not PANDAS_GE_220):
assert_groupby_results_equal(
getattr(pdf.groupby("a"), agg)().fillna(0),
getattr(gdf.groupby("a"), agg)().fillna(0 if agg != "prod" else 1),
Expand Down Expand Up @@ -1910,14 +1938,21 @@ def test_groupby_apply_noempty_group():
{"a": [1, 1, 2, 2], "b": [1, 2, 1, 2], "c": [1, 2, 3, 4]}
)
gdf = cudf.from_pandas(pdf)
assert_groupby_results_equal(
if PANDAS_GE_220:
kwargs = {"include_groups": False}
else:
kwargs = {}
expect = (
pdf.groupby("a", group_keys=False)
.apply(lambda x: x.iloc[[0, 1]])
.reset_index(drop=True),
.apply(lambda x: x.iloc[[0, 1]], **kwargs)
.reset_index(drop=True)
)
got = (
gdf.groupby("a")
.apply(lambda x: x.iloc[[0, 1]])
.reset_index(drop=True),
.apply(lambda x: x.iloc[[0, 1]], **kwargs)
.reset_index(drop=True)
)
assert_groupby_results_equal(expect, got)


def test_reset_index_after_empty_groupby():
Expand Down Expand Up @@ -2198,8 +2233,12 @@ def test_groupby_apply_return_scalars(func, args):
)
gdf = cudf.from_pandas(pdf)

expected = pdf.groupby("A").apply(func, *args)
actual = gdf.groupby("A").apply(func, *args)
if PANDAS_GE_220:
kwargs = {"include_groups": False}
else:
kwargs = {}
expected = pdf.groupby("A").apply(func, *args, **kwargs)
actual = gdf.groupby("A").apply(func, *args, **kwargs)

assert_groupby_results_equal(expected, actual)

Expand Down Expand Up @@ -2242,8 +2281,14 @@ def test_groupby_apply_return_series_dataframe(func, args):
)
gdf = cudf.from_pandas(pdf)

expected = pdf.groupby(["key"], group_keys=False).apply(func, *args)
actual = gdf.groupby(["key"]).apply(func, *args)
if PANDAS_GE_220:
kwargs = {"include_groups": False}
else:
kwargs = {}
expected = pdf.groupby(["key"], group_keys=False).apply(
func, *args, **kwargs
)
actual = gdf.groupby(["key"]).apply(func, *args, **kwargs)

assert_groupby_results_equal(expected, actual)

Expand Down
12 changes: 10 additions & 2 deletions python/cudf/cudf_pandas_tests/test_cudf_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import pytest
from numba import NumbaDeprecationWarning

from cudf.core._compat import PANDAS_GE_220
from cudf.pandas import LOADED, Profiler
from cudf.pandas.fast_slow_proxy import _Unusable

Expand Down Expand Up @@ -506,10 +507,17 @@ def test_array_ufunc(series):
tm.assert_equal(expect, got)


@pytest.mark.xfail(strict=False, reason="Fails in CI, passes locally.")
def test_groupby_apply_func_returns_series(dataframe):
pdf, df = dataframe
expect = pdf.groupby("a").apply(lambda group: pd.Series({"x": 1}))
got = df.groupby("a").apply(lambda group: xpd.Series({"x": 1}))
if PANDAS_GE_220:
kwargs = {"include_groups": False}
else:
kwargs = {}
expect = pdf.groupby("a").apply(
lambda group: pd.Series({"x": 1}), **kwargs
)
got = df.groupby("a").apply(lambda group: xpd.Series({"x": 1}), **kwargs)
tm.assert_equal(expect, got)


Expand Down
Loading