Skip to content

Commit

Permalink
Support args in groupby apply (#10682)
Browse files Browse the repository at this point in the history
  • Loading branch information
brandon-b-miller authored Apr 21, 2022
1 parent 8a4d1b2 commit 0a7c141
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 21 deletions.
16 changes: 9 additions & 7 deletions python/cudf/cudf/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ def pipe(self, func, *args, **kwargs):
"""
return cudf.core.common.pipe(self, func, *args, **kwargs)

def apply(self, function):
def apply(self, function, *args):
"""Apply a python transformation function over the grouped chunk.
Parameters
Expand Down Expand Up @@ -595,17 +595,19 @@ def mult(df):
chunks = [
grouped_values[s:e] for s, e in zip(offsets[:-1], offsets[1:])
]
chunk_results = [function(chk) for chk in chunks]

chunk_results = [function(chk, *args) for chk in chunks]
if not len(chunk_results):
return self.obj.head(0)

if cudf.api.types.is_scalar(chunk_results[0]):
result = cudf.Series(chunk_results, index=group_names)
result.index.names = self.grouping.names
elif isinstance(chunk_results[0], cudf.Series):
result = cudf.concat(chunk_results, axis=1).T
result.index.names = self.grouping.names
if isinstance(self.obj, cudf.DataFrame):
result = cudf.concat(chunk_results, axis=1).T
result.index.names = self.grouping.names
else:
result = cudf.concat(chunk_results)
else:
result = cudf.concat(chunk_results)

Expand Down Expand Up @@ -1577,8 +1579,8 @@ def agg(self, func):

return result

def apply(self, func):
result = super().apply(func)
def apply(self, func, *args):
result = super().apply(func, *args)

# apply Series name to result
result.name = self.obj.name
Expand Down
131 changes: 117 additions & 14 deletions python/cudf/cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,40 @@ def foo(df):
assert_groupby_results_equal(expect, got)


def create_test_groupby_apply_args_params():
def f1(df, k):
df["out"] = df["val1"] + df["val2"] + k
return df

def f2(df, k, L):
df["out"] = df["val1"] - df["val2"] + (k / L)
return df

def f3(df, k, L, m):
df["out"] = ((k * df["val1"]) + (L * df["val2"])) / m
return df

return [(f1, (42,)), (f2, (42, 119)), (f3, (42, 119, 212.1))]


@pytest.mark.parametrize("func,args", create_test_groupby_apply_args_params())
def test_groupby_apply_args(func, args):
np.random.seed(0)
df = DataFrame()
nelem = 20
df["key1"] = np.random.randint(0, 3, nelem)
df["key2"] = np.random.randint(0, 2, nelem)
df["val1"] = np.random.random(nelem)
df["val2"] = np.random.random(nelem)

expect_grpby = df.to_pandas().groupby(["key1", "key2"], as_index=False)
got_grpby = df.groupby(["key1", "key2"])

expect = expect_grpby.apply(func, *args)
got = got_grpby.apply(func, *args)
assert_groupby_results_equal(expect, got)


def test_groupby_apply_grouped():
np.random.seed(0)
df = DataFrame()
Expand Down Expand Up @@ -1595,7 +1629,38 @@ def test_groupby_pipe():
assert_groupby_results_equal(expected, actual)


def test_groupby_apply_return_scalars():
def create_test_groupby_apply_return_scalars_params():
def f0(x):
x = x[~x["B"].isna()]
ticker = x.shape[0]
full = ticker / 10
return full

def f1(x, k):
x = x[~x["B"].isna()]
ticker = x.shape[0]
full = ticker / k
return full

def f2(x, k, L):
x = x[~x["B"].isna()]
ticker = x.shape[0]
full = L * (ticker / k)
return full

def f3(x, k, L, m):
x = x[~x["B"].isna()]
ticker = x.shape[0]
full = L * (ticker / k) % m
return full

return [(f0, ()), (f1, (42,)), (f2, (42, 119)), (f3, (42, 119, 212.1))]


@pytest.mark.parametrize(
"func,args", create_test_groupby_apply_return_scalars_params()
)
def test_groupby_apply_return_scalars(func, args):
pdf = pd.DataFrame(
{
"A": [1, 1, 2, 2, 3, 3, 4, 4, 5, 5],
Expand All @@ -1615,30 +1680,52 @@ def test_groupby_apply_return_scalars():
)
gdf = cudf.from_pandas(pdf)

def custom_map_func(x):
x = x[~x["B"].isna()]
ticker = x.shape[0]
full = ticker / 10
return full

expected = pdf.groupby("A").apply(lambda x: custom_map_func(x))
actual = gdf.groupby("A").apply(lambda x: custom_map_func(x))
expected = pdf.groupby("A").apply(func, *args)
actual = gdf.groupby("A").apply(func, *args)

assert_groupby_results_equal(expected, actual)


def create_test_groupby_apply_return_series_dataframe_params():
def f0(x):
return x - x.max()

def f1(x):
return x.min() - x.max()

def f2(x):
return x.min()

def f3(x, k):
return x - x.max() + k

def f4(x, k, L):
return x.min() - x.max() + (k / L)

def f5(x, k, L, m):
return m * x.min() + (k / L)

return [
(f0, ()),
(f1, ()),
(f2, ()),
(f3, (42,)),
(f4, (42, 119)),
(f5, (41, 119, 212.1)),
]


@pytest.mark.parametrize(
"cust_func",
[lambda x: x - x.max(), lambda x: x.min() - x.max(), lambda x: x.min()],
"func,args", create_test_groupby_apply_return_series_dataframe_params()
)
def test_groupby_apply_return_series_dataframe(cust_func):
def test_groupby_apply_return_series_dataframe(func, args):
pdf = pd.DataFrame(
{"key": [0, 0, 1, 1, 2, 2, 2], "val": [0, 1, 2, 3, 4, 5, 6]}
)
gdf = cudf.from_pandas(pdf)

expected = pdf.groupby(["key"]).apply(cust_func)
actual = gdf.groupby(["key"]).apply(cust_func)
expected = pdf.groupby(["key"]).apply(func, *args)
actual = gdf.groupby(["key"]).apply(func, *args)

assert_groupby_results_equal(expected, actual)

Expand Down Expand Up @@ -2213,6 +2300,22 @@ def foo(x):
assert_groupby_results_equal(expect, got)


@pytest.mark.parametrize(
"func,args",
[
(lambda x, k: x + k, (42,)),
(lambda x, k, L: x + k - L, (42, 191)),
(lambda x, k, L, m: (x + k) / (L * m), (42, 191, 99.9)),
],
)
def test_groupby_apply_series_args(func, args):

got = make_frame(DataFrame, 100).groupby("x").y.apply(func, *args)
expect = make_frame(pd.DataFrame, 100).groupby("x").y.apply(func, *args)

assert_groupby_results_equal(expect, got)


@pytest.mark.parametrize("label", [None, "left", "right"])
@pytest.mark.parametrize("closed", [None, "left", "right"])
def test_groupby_freq_week(label, closed):
Expand Down

0 comments on commit 0a7c141

Please sign in to comment.