Skip to content

Commit

Permalink
Add groupby first and last aggregations (#9004)
Browse files Browse the repository at this point in the history
Authors:
  - Ashwin Srinath (https://github.com/shwina)

Approvers:
  - Sheilah Kirui (https://github.com/skirui-source)
  - Christopher Harris (https://github.com/cwharris)
  - Richard (Rick) Zamora (https://github.com/rjzamora)

URL: #9004
  • Loading branch information
shwina authored Aug 10, 2021
1 parent d746689 commit 7d892d1
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 1 deletion.
26 changes: 26 additions & 0 deletions python/cudf/cudf/_lib/aggregation.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,32 @@ cdef class Aggregation:
size))
return agg

@classmethod
def first(cls):
cdef Aggregation agg = cls()
agg.c_obj = move(
libcudf_aggregation.make_nth_element_aggregation[aggregation](
0,
<libcudf_types.null_policy><underlying_type_t_null_policy>(
NullHandling.EXCLUDE
)
)
)
return agg

@classmethod
def last(cls):
cdef Aggregation agg = cls()
agg.c_obj = move(
libcudf_aggregation.make_nth_element_aggregation[aggregation](
-1,
<libcudf_types.null_policy><underlying_type_t_null_policy>(
NullHandling.EXCLUDE
)
)
)
return agg

@classmethod
def any(cls):
cdef Aggregation agg = cls()
Expand Down
5 changes: 5 additions & 0 deletions python/cudf/cudf/_lib/cpp/aggregation.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ cdef extern from "cudf/aggregation.hpp" namespace "cudf" nogil:
size_type n
) except +

cdef unique_ptr[T] make_nth_element_aggregation[T](
size_type n,
null_policy null_handling
) except +

cdef unique_ptr[T] make_collect_list_aggregation[T]() except +

cdef unique_ptr[T] make_collect_set_aggregation[T]() except +
Expand Down
8 changes: 8 additions & 0 deletions python/cudf/cudf/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,14 @@ def cummax(self):
"""Get the column-wise cumulative maximum value in each group."""
return self.agg("cummax")

def first(self):
"""Get the first non-null value in each group."""
return self.agg("first")

def last(self):
"""Get the last non-null value in each group."""
return self.agg("last")

def _scan_fill(self, method: str, limit: int) -> DataFrameOrSeries:
"""Internal implementation for `ffill` and `bfill`
"""
Expand Down
18 changes: 18 additions & 0 deletions python/cudf/cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -2088,6 +2088,24 @@ def test_groupby_describe(data, group):
assert_groupby_results_equal(expect, got, check_dtype=False)


@pytest.mark.parametrize(
"data",
[
{"a": [], "b": []},
{"a": [2, 1, 2, 1, 1, 3], "b": [None, 1, 2, None, 2, None]},
{"a": [None], "b": [None]},
{"a": [2, 1, 1], "b": [None, 1, 0], "c": [None, 0, 1]},
],
)
@pytest.mark.parametrize("agg", ["first", "last", ["first", "last"]])
def test_groupby_first(data, agg):
pdf = pd.DataFrame(data)
gdf = cudf.from_pandas(pdf)
expect = pdf.groupby("a").agg(agg)
got = gdf.groupby("a").agg(agg)
assert_groupby_results_equal(expect, got, check_dtype=False)


def test_groupby_apply_series_name():
def foo(x):
return x.sum()
Expand Down
18 changes: 17 additions & 1 deletion python/dask_cudf/dask_cudf/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def aggregate(self, arg, split_every=None, split_out=1):
"min",
"max",
"collect",
"first",
"last",
}
if (
isinstance(self.obj, DaskDataFrame)
Expand Down Expand Up @@ -127,7 +129,10 @@ def aggregate(self, arg, split_every=None, split_out=1):
"min",
"max",
"collect",
"first",
"last",
}

if (
isinstance(self.obj, DaskDataFrame)
and isinstance(self.index, (str, list))
Expand Down Expand Up @@ -165,7 +170,16 @@ def groupby_agg(
This aggregation algorithm only supports the following options:
{"count", "mean", "std", "var", "sum", "min", "max", "collect"}
- "count"
- "mean"
- "std"
- "var"
- "sum"
- "min"
- "max"
- "collect"
- "first"
- "last"
This "optimized" approach is more performant than the algorithm
in `dask.dataframe`, because it allows the cudf backend to
Expand Down Expand Up @@ -208,6 +222,8 @@ def groupby_agg(
"min",
"max",
"collect",
"first",
"last",
}
if not _is_supported(aggs, _supported):
raise ValueError(
Expand Down
37 changes: 37 additions & 0 deletions python/dask_cudf/dask_cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,3 +594,40 @@ def test_groupby_unique_lists():
dd.assert_eq(
gdf.groupby("a").b.unique(), gddf.groupby("a").b.unique().compute(),
)


@pytest.mark.parametrize(
"data",
[
{"a": [], "b": []},
{"a": [2, 1, 2, 1, 1, 3], "b": [None, 1, 2, None, 2, None]},
{"a": [None], "b": [None]},
{"a": [2, 1, 1], "b": [None, 1, 0], "c": [None, 0, 1]},
],
)
@pytest.mark.parametrize("agg", ["first", "last"])
def test_groupby_first_last(data, agg):
pdf = pd.DataFrame(data)
gdf = cudf.DataFrame.from_pandas(pdf)

ddf = dd.from_pandas(pdf, npartitions=2)
gddf = dask_cudf.from_cudf(gdf, npartitions=2)

dd.assert_eq(
ddf.groupby("a").agg(agg).compute(),
gddf.groupby("a").agg(agg).compute(),
)

dd.assert_eq(
getattr(ddf.groupby("a"), agg)().compute(),
getattr(gddf.groupby("a"), agg)().compute(),
)

dd.assert_eq(
gdf.groupby("a").agg(agg), gddf.groupby("a").agg(agg).compute()
)

dd.assert_eq(
getattr(gdf.groupby("a"), agg)(),
getattr(gddf.groupby("a"), agg)().compute(),
)

0 comments on commit 7d892d1

Please sign in to comment.