From d9977ed6eff40ae5feaa85fe69ea8322a68a2252 Mon Sep 17 00:00:00 2001 From: Ashwin Srinath Date: Wed, 5 Oct 2022 14:18:45 -0700 Subject: [PATCH 1/7] Add ngroup() impl --- python/cudf/cudf/_lib/groupby.pyx | 25 +++++++++++++- python/cudf/cudf/core/groupby/groupby.py | 43 ++++++++++++++++++++---- python/cudf/cudf/tests/test_groupby.py | 32 ++++++++++++++++++ 3 files changed, 93 insertions(+), 7 deletions(-) diff --git a/python/cudf/cudf/_lib/groupby.pyx b/python/cudf/cudf/_lib/groupby.pyx index be5bb2741b4..08a1d74f80f 100644 --- a/python/cudf/cudf/_lib/groupby.pyx +++ b/python/cudf/cudf/_lib/groupby.pyx @@ -121,13 +121,36 @@ cdef class GroupBy: self.dropna = dropna def groups(self, list values): + """ + Perform a sort groupby, using ``self.keys`` as the key columns + and ``values`` as the value columns. + + Parameters + ---------- + values: list of Columns + The value columns + + Returns + ------- + grouped_keys: list of Columns + The grouped key columns + grouped_values: list of Columns + The grouped value columns + offsets: list of integers + Integer offsets such that offsets[i+1] - offsets[i] + represents the size of group `i`. + """ cdef table_view values_view = table_view_from_columns(values) with nogil: c_groups = move(self.c_obj.get()[0].get_groups(values_view)) grouped_key_cols = columns_from_unique_ptr(move(c_groups.keys)) - grouped_value_cols = columns_from_unique_ptr(move(c_groups.values)) + + if values: + grouped_value_cols = columns_from_unique_ptr(move(c_groups.values)) + else: + grouped_value_cols = [] return grouped_key_cols, grouped_value_cols, c_groups.offsets def aggregate_internal(self, values, aggregations): diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index c96407a7ff9..a51d597fe61 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -8,6 +8,7 @@ from functools import cached_property from typing import Any, Iterable, List, Tuple, Union +import cupy as cp import numpy as np import pandas as pd @@ -544,6 +545,18 @@ def nth(self, n): return result[sizes > n] + def ngroup(self): + # Step 1: Using a sort-groupby, compute the group group offsets: + grouped_keys, _, offsets = self._groupby.groups([]) + + # the group IDs go from 0...len(group_offsets) - 1 + num_groups = len(offsets) - 1 + index = cudf.core.index._index_from_columns(grouped_keys) + group_ids = cudf.Series(cp.arange(num_groups), index=index.unique()) + + result = self._broadcast(group_ids) + return result.fillna(-1) + def serialize(self): header = {} frames = [] @@ -925,6 +938,29 @@ def rolling_avg(val, avg): kwargs.update({"chunks": offsets}) return grouped_values.apply_chunks(function, **kwargs) + def _broadcast(self, values): + """ + Broadcast the values of an aggregation to the group + + Parameters + ---------- + values: Series + A Series representing the values of an aggregation. The + index of the Series must be the (unique) values + representing the group keys. + + Returns + ------- + A Series of the same size and with the same index as + ``self.obj``. + """ + if not values.index.equals(self.grouping.keys): + values = values._align_to_index( + self.grouping.keys, how="right", allow_non_unique=True + ) + values.index = self.obj.index + return values + def transform(self, function): """Apply an aggregation, then broadcast the result to the group size. @@ -966,12 +1002,7 @@ def transform(self, function): "Currently, `transform()` supports only aggregations." ) from e - if not result.index.equals(self.grouping.keys): - result = result._align_to_index( - self.grouping.keys, how="right", allow_non_unique=True - ) - result.index = self.obj.index - return result + return self._broadcast(result) def rolling(self, *args, **kwargs): """ diff --git a/python/cudf/cudf/tests/test_groupby.py b/python/cudf/cudf/tests/test_groupby.py index c4c8e81dda2..8352f625572 100644 --- a/python/cudf/cudf/tests/test_groupby.py +++ b/python/cudf/cudf/tests/test_groupby.py @@ -2718,3 +2718,35 @@ def test_groupby_group_keys(group_keys, by): actual = g_group[["B", "C"]].apply(lambda x: x / x.sum()) expected = p_group[["B", "C"]].apply(lambda x: x / x.sum()) assert_eq(actual, expected) + + +@pytest.fixture +def df_ngroup(): + df = cudf.DataFrame( + { + "a": [2, 2, 1, 1, 2, 3], + "b": [1, 2, 1, 2, 1, 2], + "c": ["a", "a", "b", "c", "d", "c"], + }, + index=[1, 3, 5, 7, 4, 2], + ) + df.index.name = "foo" + return df + + +@pytest.mark.parametrize( + "by", + [ + lambda: "a", + lambda: "b", + lambda: ["a", "b"], + lambda: "c", + lambda: pd.Series([1, 2, 1, 2, 1, 2]), + lambda: pd.Series(["x", "y", "y", "x", "z", "x"]), + ], +) +def test_groupby_ngroup(by, df_ngroup): + by = by() + expected = df_ngroup.to_pandas().groupby(by).ngroup() + actual = df_ngroup.groupby(by).ngroup() + assert_eq(expected, actual, check_dtype=False) From 71e4cd4873cf629f44afc86cbb5bad5acf843062 Mon Sep 17 00:00:00 2001 From: Ashwin Srinath Date: Thu, 6 Oct 2022 08:28:39 -0700 Subject: [PATCH 2/7] Improve impl --- python/cudf/cudf/_lib/cpp/null_mask.pxd | 22 ++++-- python/cudf/cudf/_lib/null_mask.pyx | 29 ++++++++ python/cudf/cudf/core/groupby/groupby.py | 86 +++++++++++++++++++++--- python/cudf/cudf/tests/test_groupby.py | 7 +- 4 files changed, 126 insertions(+), 18 deletions(-) diff --git a/python/cudf/cudf/_lib/cpp/null_mask.pxd b/python/cudf/cudf/_lib/cpp/null_mask.pxd index c225a16297b..3050a9f3459 100644 --- a/python/cudf/cudf/_lib/cpp/null_mask.pxd +++ b/python/cudf/cudf/_lib/cpp/null_mask.pxd @@ -1,11 +1,13 @@ -# Copyright (c) 2020, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. from libc.stdint cimport int32_t +from libcpp.pair cimport pair from rmm._lib.device_buffer cimport device_buffer -cimport cudf._lib.cpp.types as libcudf_types from cudf._lib.cpp.column.column_view cimport column_view +from cudf._lib.cpp.table.table_view cimport table_view +from cudf._lib.cpp.types cimport mask_state, size_type ctypedef int32_t underlying_type_t_mask_state @@ -16,15 +18,23 @@ cdef extern from "cudf/null_mask.hpp" namespace "cudf" nogil: ) except + cdef size_t bitmask_allocation_size_bytes ( - libcudf_types.size_type number_of_bits, + size_type number_of_bits, size_t padding_boundary ) except + cdef size_t bitmask_allocation_size_bytes ( - libcudf_types.size_type number_of_bits + size_type number_of_bits ) except + cdef device_buffer create_null_mask ( - libcudf_types.size_type size, - libcudf_types.mask_state state + size_type size, + mask_state state ) except + + + cdef pair[device_buffer, size_type] bitmask_and( + table_view view + ) + + cdef pair[device_buffer, size_type] bitmask_or( + table_view view + ) diff --git a/python/cudf/cudf/_lib/null_mask.pyx b/python/cudf/cudf/_lib/null_mask.pyx index b0ee28baf29..976fe0e78fc 100644 --- a/python/cudf/cudf/_lib/null_mask.pyx +++ b/python/cudf/cudf/_lib/null_mask.pyx @@ -3,6 +3,7 @@ from enum import Enum from libcpp.memory cimport make_unique, unique_ptr +from libcpp.pair cimport pair from libcpp.utility cimport move from rmm._lib.device_buffer cimport DeviceBuffer, device_buffer @@ -11,11 +12,15 @@ from cudf._lib.column cimport Column from cudf._lib.cpp.column.column_view cimport column_view from cudf._lib.cpp.null_mask cimport ( bitmask_allocation_size_bytes as cpp_bitmask_allocation_size_bytes, + bitmask_and as cpp_bitmask_and, + bitmask_or as cpp_bitmask_or, copy_bitmask as cpp_copy_bitmask, create_null_mask as cpp_create_null_mask, underlying_type_t_mask_state, ) +from cudf._lib.cpp.table.table_view cimport table_view from cudf._lib.cpp.types cimport mask_state, size_type +from cudf._lib.utils cimport table_view_from_columns from cudf.core.buffer import as_device_buffer_like @@ -95,3 +100,27 @@ def create_null_mask(size_type size, state=MaskState.UNINITIALIZED): rmm_db = DeviceBuffer.c_from_unique_ptr(move(up_db)) buf = as_device_buffer_like(rmm_db) return buf + + +def bitmask_and(columns: list): + cdef table_view c_view = table_view_from_columns(columns) + cdef pair[device_buffer, size_type] c_result + cdef unique_ptr[device_buffer] up_db + with nogil: + c_result = move(cpp_bitmask_and(c_view)) + up_db = make_unique[device_buffer](move(c_result.first)) + dbuf = DeviceBuffer.c_from_unique_ptr(move(up_db)) + buf = as_device_buffer_like(dbuf) + return buf, c_result.second + + +def bitmask_or(columns: list): + cdef table_view c_view = table_view_from_columns(columns) + cdef pair[device_buffer, size_type] c_result + cdef unique_ptr[device_buffer] up_db + with nogil: + c_result = move(cpp_bitmask_or(c_view)) + up_db = make_unique[device_buffer](move(c_result.first)) + dbuf = DeviceBuffer.c_from_unique_ptr(move(up_db)) + buf = as_device_buffer_like(dbuf) + return buf, c_result.second diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index a51d597fe61..0cfc8ee6f28 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -14,6 +14,7 @@ import cudf from cudf._lib import groupby as libgroupby +from cudf._lib.null_mask import bitmask_or from cudf._lib.reshape import interleave_columns from cudf._typing import AggType, DataFrameOrSeries, MultiColumnAggType from cudf.api.types import is_list_like @@ -545,17 +546,80 @@ def nth(self, n): return result[sizes > n] - def ngroup(self): - # Step 1: Using a sort-groupby, compute the group group offsets: - grouped_keys, _, offsets = self._groupby.groups([]) + def ngroup(self, ascending=True): + """ + Number each group from 0 to the number of groups - 1. + + This is the enumerative complement of cumcount. Note that the + numbers given to the groups match the order in which the groups + would be seen when iterating over the groupby object, not the + order they are first observed. + + Parameters + ---------- + ascending : bool, default True + If False, number in reverse, from number of group - 1 to 0. + + Returns + ------- + Series + Unique numbers for each group. + + See Also + -------- + .cumcount : Number the rows in each group. - # the group IDs go from 0...len(group_offsets) - 1 - num_groups = len(offsets) - 1 - index = cudf.core.index._index_from_columns(grouped_keys) - group_ids = cudf.Series(cp.arange(num_groups), index=index.unique()) + Examples + -------- + >>> df = cudf.DataFrame({"A": list("aaabba")}) + >>> df + A + 0 a + 1 a + 2 a + 3 b + 4 b + 5 a + >>> df.groupby('A').ngroup() + 0 0 + 1 0 + 2 0 + 3 1 + 4 1 + 5 0 + dtype: int64 + >>> df.groupby('A').ngroup(ascending=False) + 0 1 + 1 1 + 2 1 + 3 0 + 4 0 + 5 1 + dtype: int64 + >>> df.groupby(["A", [1,1,2,3,2,1]]).ngroup() + 0 0 + 1 0 + 2 1 + 3 3 + 4 2 + 5 0 + dtype: int64 + """ + num_groups = len(index := self.grouping.keys.unique()) + + if ascending: + _, has_null_group = bitmask_or([*index._columns]) + if has_null_group: + # when there's a null group, + # the first group ID is -1 + group_ids = cp.arange(-1, num_groups - 1) + else: + group_ids = cp.arange(num_groups) + else: + # in the descending case, there's no difference: + group_ids = cp.arange(num_groups - 1, -1, -1) - result = self._broadcast(group_ids) - return result.fillna(-1) + return self._broadcast(cudf.Series(group_ids, index=index)) def serialize(self): header = {} @@ -1864,6 +1928,10 @@ def deserialize(cls, header, frames): out._key_columns = key_columns return out + # @property + # def has_null_group(self): + # # return True if any of the groups is null + def _is_multi_agg(aggs): """ diff --git a/python/cudf/cudf/tests/test_groupby.py b/python/cudf/cudf/tests/test_groupby.py index 8352f625572..b00e31115c9 100644 --- a/python/cudf/cudf/tests/test_groupby.py +++ b/python/cudf/cudf/tests/test_groupby.py @@ -2745,8 +2745,9 @@ def df_ngroup(): lambda: pd.Series(["x", "y", "y", "x", "z", "x"]), ], ) -def test_groupby_ngroup(by, df_ngroup): +@pytest.mark.parametrize("ascending", [True, False]) +def test_groupby_ngroup(by, ascending, df_ngroup): by = by() - expected = df_ngroup.to_pandas().groupby(by).ngroup() - actual = df_ngroup.groupby(by).ngroup() + expected = df_ngroup.to_pandas().groupby(by).ngroup(ascending=ascending) + actual = df_ngroup.groupby(by).ngroup(ascending=ascending) assert_eq(expected, actual, check_dtype=False) From e9ee35e0db3b3042281ff29da322bb9bbb49bec1 Mon Sep 17 00:00:00 2001 From: Ashwin Srinath Date: Thu, 6 Oct 2022 08:33:03 -0700 Subject: [PATCH 3/7] Stale code --- python/cudf/cudf/core/groupby/groupby.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index 0cfc8ee6f28..a446fdb0599 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -1928,10 +1928,6 @@ def deserialize(cls, header, frames): out._key_columns = key_columns return out - # @property - # def has_null_group(self): - # # return True if any of the groups is null - def _is_multi_agg(aggs): """ From 34cf5e031e6d69ba4a9bfe9087a1f0d6ab21e574 Mon Sep 17 00:00:00 2001 From: Ashwin Srinath <3190405+shwina@users.noreply.github.com> Date: Thu, 6 Oct 2022 16:54:23 -0400 Subject: [PATCH 4/7] Apply suggestions from code review --- python/cudf/cudf/core/groupby/groupby.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index a446fdb0599..c491bb975b6 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -1004,7 +1004,7 @@ def rolling_avg(val, avg): def _broadcast(self, values): """ - Broadcast the values of an aggregation to the group + Broadcast the results of an aggregation to the group Parameters ---------- From 59782b14b8db10e2214491983af280daf1bd0003 Mon Sep 17 00:00:00 2001 From: Ashwin Srinath <3190405+shwina@users.noreply.github.com> Date: Thu, 6 Oct 2022 16:54:54 -0400 Subject: [PATCH 5/7] Apply suggestions from code review --- python/cudf/cudf/core/groupby/groupby.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index c491bb975b6..4099732ce6a 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -1009,7 +1009,7 @@ def _broadcast(self, values): Parameters ---------- values: Series - A Series representing the values of an aggregation. The + A Series representing the results of an aggregation. The index of the Series must be the (unique) values representing the group keys. From 1da500824c00d5f559c22baac2c96561bf5ce72f Mon Sep 17 00:00:00 2001 From: Ashwin Srinath Date: Mon, 10 Oct 2022 08:02:51 -0700 Subject: [PATCH 6/7] Fix for Pandas 1.5 --- python/cudf/cudf/core/groupby/groupby.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index a446fdb0599..84b0da157a8 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -606,20 +606,27 @@ def ngroup(self, ascending=True): dtype: int64 """ num_groups = len(index := self.grouping.keys.unique()) + _, has_null_group = bitmask_or([*index._columns]) if ascending: - _, has_null_group = bitmask_or([*index._columns]) if has_null_group: - # when there's a null group, - # the first group ID is -1 - group_ids = cp.arange(-1, num_groups - 1) + group_ids = cudf.Series._from_data( + {None: cp.arange(-1, num_groups - 1)} + ) else: - group_ids = cp.arange(num_groups) + group_ids = cudf.Series._from_data( + {None: cp.arange(num_groups)} + ) else: - # in the descending case, there's no difference: - group_ids = cp.arange(num_groups - 1, -1, -1) + group_ids = cudf.Series._from_data( + {None: cp.arange(num_groups - 1, -1, -1)} + ) + + if has_null_group: + group_ids.iloc[0] = cudf.NA - return self._broadcast(cudf.Series(group_ids, index=index)) + group_ids._index = index + return self._broadcast(group_ids) def serialize(self): header = {} From dbcf905a320d645adbf0a2122e6219c37c0f33aa Mon Sep 17 00:00:00 2001 From: Ashwin Srinath Date: Tue, 11 Oct 2022 06:57:54 -0700 Subject: [PATCH 7/7] Add to API docs --- docs/cudf/source/api_docs/groupby.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/cudf/source/api_docs/groupby.rst b/docs/cudf/source/api_docs/groupby.rst index 141e5adba93..f36951749fb 100644 --- a/docs/cudf/source/api_docs/groupby.rst +++ b/docs/cudf/source/api_docs/groupby.rst @@ -53,6 +53,7 @@ Computations / descriptive stats GroupBy.mean GroupBy.median GroupBy.min + GroupBy.ngroup GroupBy.nth GroupBy.pad GroupBy.prod