diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index 249cb7f4343..1af84920057 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -516,7 +516,7 @@ def pipe(self, func, *args, **kwargs): """ return cudf.core.common.pipe(self, func, *args, **kwargs) - def apply(self, function): + def apply(self, function, *args): """Apply a python transformation function over the grouped chunk. Parameters @@ -595,8 +595,7 @@ def mult(df): chunks = [ grouped_values[s:e] for s, e in zip(offsets[:-1], offsets[1:]) ] - chunk_results = [function(chk) for chk in chunks] - + chunk_results = [function(chk, *args) for chk in chunks] if not len(chunk_results): return self.obj.head(0) @@ -604,8 +603,11 @@ def mult(df): result = cudf.Series(chunk_results, index=group_names) result.index.names = self.grouping.names elif isinstance(chunk_results[0], cudf.Series): - result = cudf.concat(chunk_results, axis=1).T - result.index.names = self.grouping.names + if isinstance(self.obj, cudf.DataFrame): + result = cudf.concat(chunk_results, axis=1).T + result.index.names = self.grouping.names + else: + result = cudf.concat(chunk_results) else: result = cudf.concat(chunk_results) @@ -1577,8 +1579,8 @@ def agg(self, func): return result - def apply(self, func): - result = super().apply(func) + def apply(self, func, *args): + result = super().apply(func, *args) # apply Series name to result result.name = self.obj.name diff --git a/python/cudf/cudf/tests/test_groupby.py b/python/cudf/cudf/tests/test_groupby.py index eba37c1f5af..1411d7ba64c 100644 --- a/python/cudf/cudf/tests/test_groupby.py +++ b/python/cudf/cudf/tests/test_groupby.py @@ -292,6 +292,40 @@ def foo(df): assert_groupby_results_equal(expect, got) +def create_test_groupby_apply_args_params(): + def f1(df, k): + df["out"] = df["val1"] + df["val2"] + k + return df + + def f2(df, k, L): + df["out"] = df["val1"] - df["val2"] + (k / L) + return df + + def f3(df, k, L, m): + df["out"] = ((k * df["val1"]) + (L * df["val2"])) / m + return df + + return [(f1, (42,)), (f2, (42, 119)), (f3, (42, 119, 212.1))] + + +@pytest.mark.parametrize("func,args", create_test_groupby_apply_args_params()) +def test_groupby_apply_args(func, args): + np.random.seed(0) + df = DataFrame() + nelem = 20 + df["key1"] = np.random.randint(0, 3, nelem) + df["key2"] = np.random.randint(0, 2, nelem) + df["val1"] = np.random.random(nelem) + df["val2"] = np.random.random(nelem) + + expect_grpby = df.to_pandas().groupby(["key1", "key2"], as_index=False) + got_grpby = df.groupby(["key1", "key2"]) + + expect = expect_grpby.apply(func, *args) + got = got_grpby.apply(func, *args) + assert_groupby_results_equal(expect, got) + + def test_groupby_apply_grouped(): np.random.seed(0) df = DataFrame() @@ -1595,7 +1629,38 @@ def test_groupby_pipe(): assert_groupby_results_equal(expected, actual) -def test_groupby_apply_return_scalars(): +def create_test_groupby_apply_return_scalars_params(): + def f0(x): + x = x[~x["B"].isna()] + ticker = x.shape[0] + full = ticker / 10 + return full + + def f1(x, k): + x = x[~x["B"].isna()] + ticker = x.shape[0] + full = ticker / k + return full + + def f2(x, k, L): + x = x[~x["B"].isna()] + ticker = x.shape[0] + full = L * (ticker / k) + return full + + def f3(x, k, L, m): + x = x[~x["B"].isna()] + ticker = x.shape[0] + full = L * (ticker / k) % m + return full + + return [(f0, ()), (f1, (42,)), (f2, (42, 119)), (f3, (42, 119, 212.1))] + + +@pytest.mark.parametrize( + "func,args", create_test_groupby_apply_return_scalars_params() +) +def test_groupby_apply_return_scalars(func, args): pdf = pd.DataFrame( { "A": [1, 1, 2, 2, 3, 3, 4, 4, 5, 5], @@ -1615,30 +1680,52 @@ def test_groupby_apply_return_scalars(): ) gdf = cudf.from_pandas(pdf) - def custom_map_func(x): - x = x[~x["B"].isna()] - ticker = x.shape[0] - full = ticker / 10 - return full - - expected = pdf.groupby("A").apply(lambda x: custom_map_func(x)) - actual = gdf.groupby("A").apply(lambda x: custom_map_func(x)) + expected = pdf.groupby("A").apply(func, *args) + actual = gdf.groupby("A").apply(func, *args) assert_groupby_results_equal(expected, actual) +def create_test_groupby_apply_return_series_dataframe_params(): + def f0(x): + return x - x.max() + + def f1(x): + return x.min() - x.max() + + def f2(x): + return x.min() + + def f3(x, k): + return x - x.max() + k + + def f4(x, k, L): + return x.min() - x.max() + (k / L) + + def f5(x, k, L, m): + return m * x.min() + (k / L) + + return [ + (f0, ()), + (f1, ()), + (f2, ()), + (f3, (42,)), + (f4, (42, 119)), + (f5, (41, 119, 212.1)), + ] + + @pytest.mark.parametrize( - "cust_func", - [lambda x: x - x.max(), lambda x: x.min() - x.max(), lambda x: x.min()], + "func,args", create_test_groupby_apply_return_series_dataframe_params() ) -def test_groupby_apply_return_series_dataframe(cust_func): +def test_groupby_apply_return_series_dataframe(func, args): pdf = pd.DataFrame( {"key": [0, 0, 1, 1, 2, 2, 2], "val": [0, 1, 2, 3, 4, 5, 6]} ) gdf = cudf.from_pandas(pdf) - expected = pdf.groupby(["key"]).apply(cust_func) - actual = gdf.groupby(["key"]).apply(cust_func) + expected = pdf.groupby(["key"]).apply(func, *args) + actual = gdf.groupby(["key"]).apply(func, *args) assert_groupby_results_equal(expected, actual) @@ -2213,6 +2300,22 @@ def foo(x): assert_groupby_results_equal(expect, got) +@pytest.mark.parametrize( + "func,args", + [ + (lambda x, k: x + k, (42,)), + (lambda x, k, L: x + k - L, (42, 191)), + (lambda x, k, L, m: (x + k) / (L * m), (42, 191, 99.9)), + ], +) +def test_groupby_apply_series_args(func, args): + + got = make_frame(DataFrame, 100).groupby("x").y.apply(func, *args) + expect = make_frame(pd.DataFrame, 100).groupby("x").y.apply(func, *args) + + assert_groupby_results_equal(expect, got) + + @pytest.mark.parametrize("label", [None, "left", "right"]) @pytest.mark.parametrize("closed", [None, "left", "right"]) def test_groupby_freq_week(label, closed):