Skip to content

Commit

Permalink
Add groupby.apply(include_groups=) to match pandas 2.2 deprecation (#…
Browse files Browse the repository at this point in the history
…15006)

Matching pandas-dev/pandas#54950

Authors:
  - Matthew Roeschke (https://github.com/mroeschke)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #15006
  • Loading branch information
mroeschke authored Feb 20, 2024
1 parent 2d6be38 commit c0e370b
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 62 deletions.
55 changes: 40 additions & 15 deletions python/cudf/cudf/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -1178,20 +1178,25 @@ def deserialize(cls, header, frames):
)
return cls(obj, grouping, **kwargs)

def _grouped(self):
def _grouped(self, *, include_groups: bool = True):
offsets, grouped_key_cols, grouped_value_cols = self._groupby.groups(
[*self.obj._index._columns, *self.obj._columns]
)
grouped_keys = cudf.core.index._index_from_columns(grouped_key_cols)
if isinstance(self.grouping.keys, cudf.MultiIndex):
grouped_keys.names = self.grouping.keys.names
to_drop = self.grouping.keys.names
else:
grouped_keys.name = self.grouping.keys.name
to_drop = (self.grouping.keys.name,)
grouped_values = self.obj._from_columns_like_self(
grouped_value_cols,
column_names=self.obj._column_names,
index_names=self.obj._index_names,
)
if not include_groups:
for col_name in to_drop:
del grouped_values[col_name]
group_names = grouped_keys.unique().sort_values()
return (group_names, offsets, grouped_keys, grouped_values)

Expand Down Expand Up @@ -1348,13 +1353,25 @@ def _post_process_chunk_results(
result.index.names = self.grouping.names
# When the UDF is like df.x + df.y, the result for each
# group is the same length as the original group
elif len(self.obj) == sum(len(chk) for chk in chunk_results):
elif (total_rows := sum(len(chk) for chk in chunk_results)) in {
len(self.obj),
len(group_names),
}:
with warnings.catch_warnings():
warnings.simplefilter("ignore", FutureWarning)
result = cudf.concat(chunk_results)
index_data = group_keys._data.copy(deep=True)
index_data[None] = grouped_values.index._column
result.index = cudf.MultiIndex._from_data(index_data)
if total_rows == len(group_names):
result.index = group_names
# TODO: Is there a better way to determine what
# the column name should be, especially if we applied
# a nameless UDF.
result = result.to_frame(
name=grouped_values._data.names[0]
)
else:
index_data = group_keys._data.copy(deep=True)
index_data[None] = grouped_values.index._column
result.index = cudf.MultiIndex._from_data(index_data)
else:
raise TypeError(
"Error handling Groupby apply output with input of "
Expand All @@ -1372,7 +1389,9 @@ def _post_process_chunk_results(
return result

@_cudf_nvtx_annotate
def apply(self, function, *args, engine="auto"):
def apply(
self, function, *args, engine="auto", include_groups: bool = True
):
"""Apply a python transformation function over the grouped chunk.
Parameters
Expand All @@ -1396,6 +1415,10 @@ def apply(self, function, *args, engine="auto"):
The default value `auto` will attempt to use the numba JIT pipeline
where possible and will fall back to the iterative algorithm if
necessary.
include_groups : bool, default True
When True, will attempt to apply ``func`` to the groupings in
the case that they are columns of the DataFrame. In the future,
this will default to ``False``.
Examples
--------
Expand Down Expand Up @@ -1444,15 +1467,15 @@ def mult(df):
... 'c': [1, 2, 3, 4],
... })
>>> gdf = cudf.from_pandas(df)
>>> df.groupby('a').apply(lambda x: x.iloc[[0]])
a b c
>>> df.groupby('a')[["b", "c"]].apply(lambda x: x.iloc[[0]])
b c
a
1 0 1 1 1
2 2 2 1 3
>>> gdf.groupby('a').apply(lambda x: x.iloc[[0]])
a b c
0 1 1 1
2 2 1 3
1 0 1 1
2 2 1 3
>>> gdf.groupby('a')[["b", "c"]].apply(lambda x: x.iloc[[0]])
b c
0 1 1
2 1 3
``engine='jit'`` may be used to accelerate certain functions,
initially those that contain reductions and arithmetic operations
Expand Down Expand Up @@ -1487,7 +1510,9 @@ def mult(df):

if not callable(function):
raise TypeError(f"type {type(function)} is not callable")
group_names, offsets, group_keys, grouped_values = self._grouped()
group_names, offsets, group_keys, grouped_values = self._grouped(
include_groups=include_groups
)

if engine == "auto":
if _can_be_jitted(grouped_values, function, args):
Expand Down
135 changes: 90 additions & 45 deletions python/cudf/cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,10 @@ def test_groupby_as_index_apply(pdf, gdf, as_index, engine):
gdf = gdf.groupby("y", as_index=as_index).apply(
lambda df: df["x"].mean(), engine=engine
)
pdf = pdf.groupby("y", as_index=as_index).apply(lambda df: df["x"].mean())
kwargs = {"func": lambda df: df["x"].mean()}
if PANDAS_GE_220:
kwargs["include_groups"] = False
pdf = pdf.groupby("y", as_index=as_index).apply(**kwargs)
assert_groupby_results_equal(pdf, gdf)


Expand Down Expand Up @@ -311,8 +314,12 @@ def foo(df):
df["out"] = df["val1"] + df["val2"]
return df

expect = expect_grpby.apply(foo)
got = got_grpby.apply(foo)
if PANDAS_GE_220:
kwargs = {"include_groups": False}
else:
kwargs = {}
expect = expect_grpby.apply(foo, **kwargs)
got = got_grpby.apply(foo, **kwargs)
assert_groupby_results_equal(expect, got)


Expand Down Expand Up @@ -346,24 +353,24 @@ def test_groupby_apply_args(func, args):
["key1", "key2"], as_index=False, group_keys=False
)
got_grpby = df.groupby(["key1", "key2"])

expect = expect_grpby.apply(func, *args)
got = got_grpby.apply(func, *args)
if PANDAS_GE_220:
kwargs = {"include_groups": False}
else:
kwargs = {}
expect = expect_grpby.apply(func, *args, **kwargs)
got = got_grpby.apply(func, *args, **kwargs)
assert_groupby_results_equal(expect, got)


def test_groupby_apply_grouped():
np.random.seed(0)
df = DataFrame()
nelem = 20
df["key1"] = np.random.randint(0, 3, nelem)
df["key2"] = np.random.randint(0, 2, nelem)
df["val1"] = np.random.random(nelem)
df["val2"] = np.random.random(nelem)
df["key1"] = range(nelem)
df["key2"] = range(nelem)
df["val1"] = range(nelem)
df["val2"] = range(nelem)

expect_grpby = df.to_pandas().groupby(
["key1", "key2"], as_index=False, group_keys=False
)
got_grpby = df.groupby(["key1", "key2"])

def foo(key1, val1, com1, com2):
Expand All @@ -380,14 +387,11 @@ def foo(key1, val1, com1, com2):

got = got.to_pandas()

# Get expected result by emulating the operation in pandas
def emulate(df):
df["com1"] = df.key1 * 10000 + df.val1
df["com2"] = np.arange(len(df), dtype=np.int32)
return df

expect = expect_grpby.apply(emulate)
expect = expect.sort_values(["key1", "key2"])
expect = df.copy()
expect["com1"] = (expect["key1"] * 10000 + expect["key1"]).astype(
np.float64
)
expect["com2"] = np.zeros(nelem, dtype=np.int32)

assert_groupby_results_equal(expect, got)

Expand Down Expand Up @@ -462,8 +466,14 @@ def run_groupby_apply_jit_test(data, func, keys, *args):
got_groupby_obj = data.groupby(keys)

# compare cuDF jit to pandas
cudf_jit_result = got_groupby_obj.apply(func, *args, engine="jit")
pandas_result = expect_groupby_obj.apply(func, *args)
if PANDAS_GE_220:
kwargs = {"include_groups": False}
else:
kwargs = {}
cudf_jit_result = got_groupby_obj.apply(
func, *args, engine="jit", **kwargs
)
pandas_result = expect_groupby_obj.apply(func, *args, **kwargs)
assert_groupby_results_equal(cudf_jit_result, pandas_result)


Expand Down Expand Up @@ -776,7 +786,7 @@ def test_groupby_apply_jit_block_divergence():
)

def diverging_block(grp_df):
if grp_df["a"].mean() > 0:
if grp_df["b"].mean() > 1:
return grp_df["b"].mean()
return 0

Expand Down Expand Up @@ -831,27 +841,41 @@ def f(group):
return group.sum()

part = partial(f)

expect = pdf.groupby("a").apply(part)
got = gdf.groupby("a").apply(part, engine="auto")

if PANDAS_GE_220:
kwargs = {"include_groups": False}
else:
kwargs = {}
expect = pdf.groupby("a").apply(part, **kwargs)
got = gdf.groupby("a").apply(part, engine="auto", **kwargs)
assert_groupby_results_equal(expect, got)


@pytest.mark.parametrize("func", [lambda group: group.x + group.y])
def test_groupby_apply_return_col_from_df(func):
def test_groupby_apply_return_col_from_df():
# tests a UDF that consists of purely colwise
# ops, such as `lambda group: group.x + group.y`
# which returns a column
df = cudf.datasets.randomdata()
func = lambda group: group.x + group.y # noqa:E731
df = cudf.DataFrame(
{
"id": range(10),
"x": range(10),
"y": range(10),
}
)
pdf = df.to_pandas()

def func(df):
return df.x + df.y

expect = pdf.groupby("id").apply(func)
got = df.groupby("id").apply(func)

if PANDAS_GE_220:
kwargs = {"include_groups": False}
else:
kwargs = {}
got = df.groupby("id").apply(func, **kwargs)
expect = pdf.groupby("id").apply(func, **kwargs)
# pandas seems to erroneously add an extra MI level of ids
# TODO: Figure out how pandas groupby.apply determines the columns
expect = pd.DataFrame(expect.droplevel(1), columns=got.columns)
assert_groupby_results_equal(expect, got)


Expand All @@ -863,8 +887,12 @@ def test_groupby_apply_return_df(func):
df = cudf.DataFrame({"a": [1, 1, 2, 2], "b": [1, 2, 3, 4]})
pdf = df.to_pandas()

expect = pdf.groupby("a").apply(func)
got = df.groupby("a").apply(func)
if PANDAS_GE_220:
kwargs = {"include_groups": False}
else:
kwargs = {}
expect = pdf.groupby("a").apply(func, **kwargs)
got = df.groupby("a").apply(func, **kwargs)
assert_groupby_results_equal(expect, got)


Expand Down Expand Up @@ -1910,14 +1938,21 @@ def test_groupby_apply_noempty_group():
{"a": [1, 1, 2, 2], "b": [1, 2, 1, 2], "c": [1, 2, 3, 4]}
)
gdf = cudf.from_pandas(pdf)
assert_groupby_results_equal(
if PANDAS_GE_220:
kwargs = {"include_groups": False}
else:
kwargs = {}
expect = (
pdf.groupby("a", group_keys=False)
.apply(lambda x: x.iloc[[0, 1]])
.reset_index(drop=True),
.apply(lambda x: x.iloc[[0, 1]], **kwargs)
.reset_index(drop=True)
)
got = (
gdf.groupby("a")
.apply(lambda x: x.iloc[[0, 1]])
.reset_index(drop=True),
.apply(lambda x: x.iloc[[0, 1]], **kwargs)
.reset_index(drop=True)
)
assert_groupby_results_equal(expect, got)


def test_reset_index_after_empty_groupby():
Expand Down Expand Up @@ -2198,8 +2233,12 @@ def test_groupby_apply_return_scalars(func, args):
)
gdf = cudf.from_pandas(pdf)

expected = pdf.groupby("A").apply(func, *args)
actual = gdf.groupby("A").apply(func, *args)
if PANDAS_GE_220:
kwargs = {"include_groups": False}
else:
kwargs = {}
expected = pdf.groupby("A").apply(func, *args, **kwargs)
actual = gdf.groupby("A").apply(func, *args, **kwargs)

assert_groupby_results_equal(expected, actual)

Expand Down Expand Up @@ -2242,8 +2281,14 @@ def test_groupby_apply_return_series_dataframe(func, args):
)
gdf = cudf.from_pandas(pdf)

expected = pdf.groupby(["key"], group_keys=False).apply(func, *args)
actual = gdf.groupby(["key"]).apply(func, *args)
if PANDAS_GE_220:
kwargs = {"include_groups": False}
else:
kwargs = {}
expected = pdf.groupby(["key"], group_keys=False).apply(
func, *args, **kwargs
)
actual = gdf.groupby(["key"]).apply(func, *args, **kwargs)

assert_groupby_results_equal(expected, actual)

Expand Down
12 changes: 10 additions & 2 deletions python/cudf/cudf_pandas_tests/test_cudf_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import pytest
from numba import NumbaDeprecationWarning

from cudf.core._compat import PANDAS_GE_220
from cudf.pandas import LOADED, Profiler
from cudf.pandas.fast_slow_proxy import _Unusable

Expand Down Expand Up @@ -506,10 +507,17 @@ def test_array_ufunc(series):
tm.assert_equal(expect, got)


@pytest.mark.xfail(strict=False, reason="Fails in CI, passes locally.")
def test_groupby_apply_func_returns_series(dataframe):
pdf, df = dataframe
expect = pdf.groupby("a").apply(lambda group: pd.Series({"x": 1}))
got = df.groupby("a").apply(lambda group: xpd.Series({"x": 1}))
if PANDAS_GE_220:
kwargs = {"include_groups": False}
else:
kwargs = {}
expect = pdf.groupby("a").apply(
lambda group: pd.Series({"x": 1}), **kwargs
)
got = df.groupby("a").apply(lambda group: xpd.Series({"x": 1}), **kwargs)
tm.assert_equal(expect, got)


Expand Down

0 comments on commit c0e370b

Please sign in to comment.