diff --git a/python/cudf/cudf/_lib/aggregation.pyx b/python/cudf/cudf/_lib/aggregation.pyx index 4c94452c73d..120691add5e 100644 --- a/python/cudf/cudf/_lib/aggregation.pyx +++ b/python/cudf/cudf/_lib/aggregation.pyx @@ -153,6 +153,32 @@ cdef class Aggregation: size)) return agg + @classmethod + def first(cls): + cdef Aggregation agg = cls() + agg.c_obj = move( + libcudf_aggregation.make_nth_element_aggregation[aggregation]( + 0, + ( + NullHandling.EXCLUDE + ) + ) + ) + return agg + + @classmethod + def last(cls): + cdef Aggregation agg = cls() + agg.c_obj = move( + libcudf_aggregation.make_nth_element_aggregation[aggregation]( + -1, + ( + NullHandling.EXCLUDE + ) + ) + ) + return agg + @classmethod def any(cls): cdef Aggregation agg = cls() diff --git a/python/cudf/cudf/_lib/cpp/aggregation.pxd b/python/cudf/cudf/_lib/cpp/aggregation.pxd index b13815c925d..6daee5077ed 100644 --- a/python/cudf/cudf/_lib/cpp/aggregation.pxd +++ b/python/cudf/cudf/_lib/cpp/aggregation.pxd @@ -87,6 +87,11 @@ cdef extern from "cudf/aggregation.hpp" namespace "cudf" nogil: size_type n ) except + + cdef unique_ptr[T] make_nth_element_aggregation[T]( + size_type n, + null_policy null_handling + ) except + + cdef unique_ptr[T] make_collect_list_aggregation[T]() except + cdef unique_ptr[T] make_collect_set_aggregation[T]() except + diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index 81690c9797c..0cb3afde824 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -817,6 +817,14 @@ def cummax(self): """Get the column-wise cumulative maximum value in each group.""" return self.agg("cummax") + def first(self): + """Get the first non-null value in each group.""" + return self.agg("first") + + def last(self): + """Get the last non-null value in each group.""" + return self.agg("last") + def _scan_fill(self, method: str, limit: int) -> DataFrameOrSeries: """Internal implementation for `ffill` and `bfill` """ diff --git a/python/cudf/cudf/tests/test_groupby.py b/python/cudf/cudf/tests/test_groupby.py index 6f1ee99ca9d..09c1b3d153f 100644 --- a/python/cudf/cudf/tests/test_groupby.py +++ b/python/cudf/cudf/tests/test_groupby.py @@ -2088,6 +2088,24 @@ def test_groupby_describe(data, group): assert_groupby_results_equal(expect, got, check_dtype=False) +@pytest.mark.parametrize( + "data", + [ + {"a": [], "b": []}, + {"a": [2, 1, 2, 1, 1, 3], "b": [None, 1, 2, None, 2, None]}, + {"a": [None], "b": [None]}, + {"a": [2, 1, 1], "b": [None, 1, 0], "c": [None, 0, 1]}, + ], +) +@pytest.mark.parametrize("agg", ["first", "last", ["first", "last"]]) +def test_groupby_first(data, agg): + pdf = pd.DataFrame(data) + gdf = cudf.from_pandas(pdf) + expect = pdf.groupby("a").agg(agg) + got = gdf.groupby("a").agg(agg) + assert_groupby_results_equal(expect, got, check_dtype=False) + + def test_groupby_apply_series_name(): def foo(x): return x.sum() diff --git a/python/dask_cudf/dask_cudf/groupby.py b/python/dask_cudf/dask_cudf/groupby.py index 2ec457018d9..11184eb425e 100644 --- a/python/dask_cudf/dask_cudf/groupby.py +++ b/python/dask_cudf/dask_cudf/groupby.py @@ -71,6 +71,8 @@ def aggregate(self, arg, split_every=None, split_out=1): "min", "max", "collect", + "first", + "last", } if ( isinstance(self.obj, DaskDataFrame) @@ -127,7 +129,10 @@ def aggregate(self, arg, split_every=None, split_out=1): "min", "max", "collect", + "first", + "last", } + if ( isinstance(self.obj, DaskDataFrame) and isinstance(self.index, (str, list)) @@ -165,7 +170,16 @@ def groupby_agg( This aggregation algorithm only supports the following options: - {"count", "mean", "std", "var", "sum", "min", "max", "collect"} + - "count" + - "mean" + - "std" + - "var" + - "sum" + - "min" + - "max" + - "collect" + - "first" + - "last" This "optimized" approach is more performant than the algorithm in `dask.dataframe`, because it allows the cudf backend to @@ -208,6 +222,8 @@ def groupby_agg( "min", "max", "collect", + "first", + "last", } if not _is_supported(aggs, _supported): raise ValueError( diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index 84de32952e5..0c6f7686275 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -594,3 +594,40 @@ def test_groupby_unique_lists(): dd.assert_eq( gdf.groupby("a").b.unique(), gddf.groupby("a").b.unique().compute(), ) + + +@pytest.mark.parametrize( + "data", + [ + {"a": [], "b": []}, + {"a": [2, 1, 2, 1, 1, 3], "b": [None, 1, 2, None, 2, None]}, + {"a": [None], "b": [None]}, + {"a": [2, 1, 1], "b": [None, 1, 0], "c": [None, 0, 1]}, + ], +) +@pytest.mark.parametrize("agg", ["first", "last"]) +def test_groupby_first_last(data, agg): + pdf = pd.DataFrame(data) + gdf = cudf.DataFrame.from_pandas(pdf) + + ddf = dd.from_pandas(pdf, npartitions=2) + gddf = dask_cudf.from_cudf(gdf, npartitions=2) + + dd.assert_eq( + ddf.groupby("a").agg(agg).compute(), + gddf.groupby("a").agg(agg).compute(), + ) + + dd.assert_eq( + getattr(ddf.groupby("a"), agg)().compute(), + getattr(gddf.groupby("a"), agg)().compute(), + ) + + dd.assert_eq( + gdf.groupby("a").agg(agg), gddf.groupby("a").agg(agg).compute() + ) + + dd.assert_eq( + getattr(gdf.groupby("a"), agg)(), + getattr(gddf.groupby("a"), agg)().compute(), + )