diff --git a/docs/cudf/source/api_docs/groupby.rst b/docs/cudf/source/api_docs/groupby.rst index 141e5adba93..f36951749fb 100644 --- a/docs/cudf/source/api_docs/groupby.rst +++ b/docs/cudf/source/api_docs/groupby.rst @@ -53,6 +53,7 @@ Computations / descriptive stats GroupBy.mean GroupBy.median GroupBy.min + GroupBy.ngroup GroupBy.nth GroupBy.pad GroupBy.prod diff --git a/python/cudf/cudf/_lib/cpp/null_mask.pxd b/python/cudf/cudf/_lib/cpp/null_mask.pxd index c225a16297b..3050a9f3459 100644 --- a/python/cudf/cudf/_lib/cpp/null_mask.pxd +++ b/python/cudf/cudf/_lib/cpp/null_mask.pxd @@ -1,11 +1,13 @@ -# Copyright (c) 2020, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. from libc.stdint cimport int32_t +from libcpp.pair cimport pair from rmm._lib.device_buffer cimport device_buffer -cimport cudf._lib.cpp.types as libcudf_types from cudf._lib.cpp.column.column_view cimport column_view +from cudf._lib.cpp.table.table_view cimport table_view +from cudf._lib.cpp.types cimport mask_state, size_type ctypedef int32_t underlying_type_t_mask_state @@ -16,15 +18,23 @@ cdef extern from "cudf/null_mask.hpp" namespace "cudf" nogil: ) except + cdef size_t bitmask_allocation_size_bytes ( - libcudf_types.size_type number_of_bits, + size_type number_of_bits, size_t padding_boundary ) except + cdef size_t bitmask_allocation_size_bytes ( - libcudf_types.size_type number_of_bits + size_type number_of_bits ) except + cdef device_buffer create_null_mask ( - libcudf_types.size_type size, - libcudf_types.mask_state state + size_type size, + mask_state state ) except + + + cdef pair[device_buffer, size_type] bitmask_and( + table_view view + ) + + cdef pair[device_buffer, size_type] bitmask_or( + table_view view + ) diff --git a/python/cudf/cudf/_lib/groupby.pyx b/python/cudf/cudf/_lib/groupby.pyx index be5bb2741b4..08a1d74f80f 100644 --- a/python/cudf/cudf/_lib/groupby.pyx +++ b/python/cudf/cudf/_lib/groupby.pyx @@ -121,13 +121,36 @@ cdef class GroupBy: self.dropna = dropna def groups(self, list values): + """ + Perform a sort groupby, using ``self.keys`` as the key columns + and ``values`` as the value columns. + + Parameters + ---------- + values: list of Columns + The value columns + + Returns + ------- + grouped_keys: list of Columns + The grouped key columns + grouped_values: list of Columns + The grouped value columns + offsets: list of integers + Integer offsets such that offsets[i+1] - offsets[i] + represents the size of group `i`. + """ cdef table_view values_view = table_view_from_columns(values) with nogil: c_groups = move(self.c_obj.get()[0].get_groups(values_view)) grouped_key_cols = columns_from_unique_ptr(move(c_groups.keys)) - grouped_value_cols = columns_from_unique_ptr(move(c_groups.values)) + + if values: + grouped_value_cols = columns_from_unique_ptr(move(c_groups.values)) + else: + grouped_value_cols = [] return grouped_key_cols, grouped_value_cols, c_groups.offsets def aggregate_internal(self, values, aggregations): diff --git a/python/cudf/cudf/_lib/null_mask.pyx b/python/cudf/cudf/_lib/null_mask.pyx index b0ee28baf29..976fe0e78fc 100644 --- a/python/cudf/cudf/_lib/null_mask.pyx +++ b/python/cudf/cudf/_lib/null_mask.pyx @@ -3,6 +3,7 @@ from enum import Enum from libcpp.memory cimport make_unique, unique_ptr +from libcpp.pair cimport pair from libcpp.utility cimport move from rmm._lib.device_buffer cimport DeviceBuffer, device_buffer @@ -11,11 +12,15 @@ from cudf._lib.column cimport Column from cudf._lib.cpp.column.column_view cimport column_view from cudf._lib.cpp.null_mask cimport ( bitmask_allocation_size_bytes as cpp_bitmask_allocation_size_bytes, + bitmask_and as cpp_bitmask_and, + bitmask_or as cpp_bitmask_or, copy_bitmask as cpp_copy_bitmask, create_null_mask as cpp_create_null_mask, underlying_type_t_mask_state, ) +from cudf._lib.cpp.table.table_view cimport table_view from cudf._lib.cpp.types cimport mask_state, size_type +from cudf._lib.utils cimport table_view_from_columns from cudf.core.buffer import as_device_buffer_like @@ -95,3 +100,27 @@ def create_null_mask(size_type size, state=MaskState.UNINITIALIZED): rmm_db = DeviceBuffer.c_from_unique_ptr(move(up_db)) buf = as_device_buffer_like(rmm_db) return buf + + +def bitmask_and(columns: list): + cdef table_view c_view = table_view_from_columns(columns) + cdef pair[device_buffer, size_type] c_result + cdef unique_ptr[device_buffer] up_db + with nogil: + c_result = move(cpp_bitmask_and(c_view)) + up_db = make_unique[device_buffer](move(c_result.first)) + dbuf = DeviceBuffer.c_from_unique_ptr(move(up_db)) + buf = as_device_buffer_like(dbuf) + return buf, c_result.second + + +def bitmask_or(columns: list): + cdef table_view c_view = table_view_from_columns(columns) + cdef pair[device_buffer, size_type] c_result + cdef unique_ptr[device_buffer] up_db + with nogil: + c_result = move(cpp_bitmask_or(c_view)) + up_db = make_unique[device_buffer](move(c_result.first)) + dbuf = DeviceBuffer.c_from_unique_ptr(move(up_db)) + buf = as_device_buffer_like(dbuf) + return buf, c_result.second diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index c96407a7ff9..0ab64bd985a 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -8,11 +8,13 @@ from functools import cached_property from typing import Any, Iterable, List, Tuple, Union +import cupy as cp import numpy as np import pandas as pd import cudf from cudf._lib import groupby as libgroupby +from cudf._lib.null_mask import bitmask_or from cudf._lib.reshape import interleave_columns from cudf._typing import AggType, DataFrameOrSeries, MultiColumnAggType from cudf.api.types import is_list_like @@ -544,6 +546,88 @@ def nth(self, n): return result[sizes > n] + def ngroup(self, ascending=True): + """ + Number each group from 0 to the number of groups - 1. + + This is the enumerative complement of cumcount. Note that the + numbers given to the groups match the order in which the groups + would be seen when iterating over the groupby object, not the + order they are first observed. + + Parameters + ---------- + ascending : bool, default True + If False, number in reverse, from number of group - 1 to 0. + + Returns + ------- + Series + Unique numbers for each group. + + See Also + -------- + .cumcount : Number the rows in each group. + + Examples + -------- + >>> df = cudf.DataFrame({"A": list("aaabba")}) + >>> df + A + 0 a + 1 a + 2 a + 3 b + 4 b + 5 a + >>> df.groupby('A').ngroup() + 0 0 + 1 0 + 2 0 + 3 1 + 4 1 + 5 0 + dtype: int64 + >>> df.groupby('A').ngroup(ascending=False) + 0 1 + 1 1 + 2 1 + 3 0 + 4 0 + 5 1 + dtype: int64 + >>> df.groupby(["A", [1,1,2,3,2,1]]).ngroup() + 0 0 + 1 0 + 2 1 + 3 3 + 4 2 + 5 0 + dtype: int64 + """ + num_groups = len(index := self.grouping.keys.unique()) + _, has_null_group = bitmask_or([*index._columns]) + + if ascending: + if has_null_group: + group_ids = cudf.Series._from_data( + {None: cp.arange(-1, num_groups - 1)} + ) + else: + group_ids = cudf.Series._from_data( + {None: cp.arange(num_groups)} + ) + else: + group_ids = cudf.Series._from_data( + {None: cp.arange(num_groups - 1, -1, -1)} + ) + + if has_null_group: + group_ids.iloc[0] = cudf.NA + + group_ids._index = index + return self._broadcast(group_ids) + def serialize(self): header = {} frames = [] @@ -925,6 +1009,29 @@ def rolling_avg(val, avg): kwargs.update({"chunks": offsets}) return grouped_values.apply_chunks(function, **kwargs) + def _broadcast(self, values): + """ + Broadcast the results of an aggregation to the group + + Parameters + ---------- + values: Series + A Series representing the results of an aggregation. The + index of the Series must be the (unique) values + representing the group keys. + + Returns + ------- + A Series of the same size and with the same index as + ``self.obj``. + """ + if not values.index.equals(self.grouping.keys): + values = values._align_to_index( + self.grouping.keys, how="right", allow_non_unique=True + ) + values.index = self.obj.index + return values + def transform(self, function): """Apply an aggregation, then broadcast the result to the group size. @@ -966,12 +1073,7 @@ def transform(self, function): "Currently, `transform()` supports only aggregations." ) from e - if not result.index.equals(self.grouping.keys): - result = result._align_to_index( - self.grouping.keys, how="right", allow_non_unique=True - ) - result.index = self.obj.index - return result + return self._broadcast(result) def rolling(self, *args, **kwargs): """ diff --git a/python/cudf/cudf/tests/test_groupby.py b/python/cudf/cudf/tests/test_groupby.py index c4c8e81dda2..b00e31115c9 100644 --- a/python/cudf/cudf/tests/test_groupby.py +++ b/python/cudf/cudf/tests/test_groupby.py @@ -2718,3 +2718,36 @@ def test_groupby_group_keys(group_keys, by): actual = g_group[["B", "C"]].apply(lambda x: x / x.sum()) expected = p_group[["B", "C"]].apply(lambda x: x / x.sum()) assert_eq(actual, expected) + + +@pytest.fixture +def df_ngroup(): + df = cudf.DataFrame( + { + "a": [2, 2, 1, 1, 2, 3], + "b": [1, 2, 1, 2, 1, 2], + "c": ["a", "a", "b", "c", "d", "c"], + }, + index=[1, 3, 5, 7, 4, 2], + ) + df.index.name = "foo" + return df + + +@pytest.mark.parametrize( + "by", + [ + lambda: "a", + lambda: "b", + lambda: ["a", "b"], + lambda: "c", + lambda: pd.Series([1, 2, 1, 2, 1, 2]), + lambda: pd.Series(["x", "y", "y", "x", "z", "x"]), + ], +) +@pytest.mark.parametrize("ascending", [True, False]) +def test_groupby_ngroup(by, ascending, df_ngroup): + by = by() + expected = df_ngroup.to_pandas().groupby(by).ngroup(ascending=ascending) + actual = df_ngroup.groupby(by).ngroup(ascending=ascending) + assert_eq(expected, actual, check_dtype=False)