diff --git a/python/cudf/cudf/_lib/groupby.pyx b/python/cudf/cudf/_lib/groupby.pyx index 49a924c9104..48f566b846d 100644 --- a/python/cudf/cudf/_lib/groupby.pyx +++ b/python/cudf/cudf/_lib/groupby.pyx @@ -26,7 +26,12 @@ import cudf from cudf._lib.column cimport Column from cudf._lib.scalar cimport DeviceScalar -from cudf._lib.utils cimport table_view_from_table +from cudf._lib.utils cimport ( + columns_from_unique_ptr, + data_from_unique_ptr, + table_view_from_columns, + table_view_from_table, +) from cudf._lib.scalar import as_device_scalar @@ -46,7 +51,6 @@ from cudf._lib.cpp.scalar.scalar cimport scalar from cudf._lib.cpp.table.table cimport table, table_view from cudf._lib.cpp.types cimport size_type from cudf._lib.cpp.utilities.host_span cimport host_span -from cudf._lib.utils cimport data_from_unique_ptr # The sets below define the possible aggregations that can be performed on # different dtypes. These strings must be elements of the AggregationKind enum. @@ -62,11 +66,39 @@ _DECIMAL_AGGS = {"COUNT", "SUM", "ARGMIN", "ARGMAX", "MIN", "MAX", "NUNIQUE", # workaround for https://github.com/cython/cython/issues/3885 ctypedef const scalar constscalar + +cdef _agg_result_from_columns( + vector[libcudf_groupby.aggregation_result]& c_result_columns, + set column_included, + int n_input_columns +): + """Construct the list of result columns from libcudf result. The result + contains the same number of lists as the number of input columns. Result + for an input column that has no applicable aggregations is an empty list. + """ + cdef: + int i + int j + int result_index = 0 + vector[unique_ptr[column]]* c_result + result_columns = [] + for i in range(n_input_columns): + if i in column_included: + c_result = &c_result_columns[result_index].results + result_columns.append([ + Column.from_unique_ptr(move(c_result[0][j])) + for j in range(c_result[0].size()) + ]) + result_index += 1 + else: + result_columns.append([]) + return result_columns + cdef class GroupBy: cdef unique_ptr[libcudf_groupby.groupby] c_obj cdef dict __dict__ - def __cinit__(self, keys, bool dropna=True, *args, **kwargs): + def __cinit__(self, list keys, bool dropna=True, *args, **kwargs): cdef libcudf_types.null_policy c_null_handling if dropna: @@ -74,7 +106,7 @@ cdef class GroupBy: else: c_null_handling = libcudf_types.null_policy.INCLUDE - cdef table_view keys_view = table_view_from_table(keys) + cdef table_view keys_view = table_view_from_columns(keys) with nogil: self.c_obj.reset( @@ -84,46 +116,42 @@ cdef class GroupBy: ) ) - def __init__(self, keys, bool dropna=True): + def __init__(self, list keys, bool dropna=True): self.keys = keys self.dropna = dropna - def groups(self, values): - - cdef table_view values_view = table_view_from_table(values) + def groups(self, list values): + cdef table_view values_view = table_view_from_columns(values) with nogil: c_groups = move(self.c_obj.get()[0].get_groups(values_view)) - c_grouped_keys = move(c_groups.keys) - c_grouped_values = move(c_groups.values) - c_group_offsets = c_groups.offsets - - grouped_keys = cudf.core.index._index_from_data( - *data_from_unique_ptr( - move(c_grouped_keys), - column_names=range(c_grouped_keys.get()[0].num_columns()) - ) - ) - grouped_values = data_from_unique_ptr( - move(c_grouped_values), - index_names=values._index_names, - column_names=values._column_names - ) - return grouped_keys, grouped_values, c_group_offsets + grouped_key_cols = columns_from_unique_ptr(move(c_groups.keys)) + grouped_value_cols = columns_from_unique_ptr(move(c_groups.values)) + return grouped_key_cols, grouped_value_cols, c_groups.offsets def aggregate_internal(self, values, aggregations): - from cudf.core.column_accessor import ColumnAccessor + """`values` is a list of columns and `aggregations` is a list of list + of aggregations. `aggregations[i]` is a list of aggregations for + `values[i]`. Returns a tuple containing 1) list of list of aggregation + results, 2) a list of grouped keys, and 3) a list of list of + aggregations performed. + """ cdef vector[libcudf_groupby.aggregation_request] c_agg_requests cdef libcudf_groupby.aggregation_request c_agg_request cdef Column col cdef GroupbyAggregation agg_obj - allow_empty = all(len(v) == 0 for v in aggregations.values()) + cdef pair[ + unique_ptr[table], + vector[libcudf_groupby.aggregation_result] + ] c_result - included_aggregations = defaultdict(list) - for i, (col_name, aggs) in enumerate(aggregations.items()): - col = values._data[col_name] + allow_empty = all(len(v) == 0 for v in aggregations) + + included_aggregations = [] + column_included = set() + for i, (col, aggs) in enumerate(zip(values, aggregations)): dtype = col.dtype valid_aggregations = ( @@ -135,36 +163,27 @@ cdef class GroupBy: else _DECIMAL_AGGS if is_decimal_dtype(dtype) else "ALL" ) - if (valid_aggregations is _DECIMAL_AGGS - and rmm._cuda.gpu.runtimeGetVersion() < 11000): - raise RuntimeError( - "Decimal aggregations are only supported on CUDA >= 11 " - "due to an nvcc compiler bug." - ) + included_aggregations_i = [] c_agg_request = move(libcudf_groupby.aggregation_request()) for agg in aggs: agg_obj = make_groupby_aggregation(agg) if (valid_aggregations == "ALL" or agg_obj.kind in valid_aggregations): - included_aggregations[col_name].append(agg) + included_aggregations_i.append(agg) c_agg_request.aggregations.push_back( move(agg_obj.c_obj) ) + included_aggregations.append(included_aggregations_i) if not c_agg_request.aggregations.empty(): c_agg_request.values = col.view() c_agg_requests.push_back( move(c_agg_request) ) - + column_included.add(i) if c_agg_requests.empty() and not allow_empty: raise DataError("All requested aggregations are unsupported.") - cdef pair[ - unique_ptr[table], - vector[libcudf_groupby.aggregation_result] - ] c_result - with nogil: c_result = move( self.c_obj.get()[0].aggregate( @@ -172,37 +191,38 @@ cdef class GroupBy: ) ) - grouped_keys, _ = data_from_unique_ptr( - move(c_result.first), - column_names=self.keys._column_names + grouped_keys = columns_from_unique_ptr( + move(c_result.first) ) - result_data = ColumnAccessor(multiindex=True) - # Note: This loop relies on the included_aggregations dict being - # insertion ordered to map results to requested aggregations by index. - for i, col_name in enumerate(included_aggregations): - for j, agg_name in enumerate(included_aggregations[col_name]): - if callable(agg_name): - agg_name = agg_name.__name__ - result_data[(col_name, agg_name)] = ( - Column.from_unique_ptr(move(c_result.second[i].results[j])) - ) + result_columns = _agg_result_from_columns( + c_result.second, column_included, len(values) + ) - return result_data, cudf.core.index._index_from_data( - grouped_keys) + return result_columns, grouped_keys, included_aggregations def scan_internal(self, values, aggregations): - from cudf.core.column_accessor import ColumnAccessor + """`values` is a list of columns and `aggregations` is a list of list + of aggregations. `aggregations[i]` is a list of aggregations for + `values[i]`. Returns a tuple containing 1) list of list of aggregation + results, 2) a list of grouped keys, and 3) a list of list of + aggregations performed. + """ cdef vector[libcudf_groupby.scan_request] c_agg_requests cdef libcudf_groupby.scan_request c_agg_request cdef Column col cdef GroupbyScanAggregation agg_obj - allow_empty = all(len(v) == 0 for v in aggregations.values()) + cdef pair[ + unique_ptr[table], + vector[libcudf_groupby.aggregation_result] + ] c_result + + allow_empty = all(len(v) == 0 for v in aggregations) - included_aggregations = defaultdict(list) - for i, (col_name, aggs) in enumerate(aggregations.items()): - col = values._data[col_name] + included_aggregations = [] + column_included = set() + for i, (col, aggs) in enumerate(zip(values, aggregations)): dtype = col.dtype valid_aggregations = ( @@ -214,36 +234,27 @@ cdef class GroupBy: else _DECIMAL_AGGS if is_decimal_dtype(dtype) else "ALL" ) - if (valid_aggregations is _DECIMAL_AGGS - and rmm._cuda.gpu.runtimeGetVersion() < 11000): - raise RuntimeError( - "Decimal aggregations are only supported on CUDA >= 11 " - "due to an nvcc compiler bug." - ) + included_aggregations_i = [] c_agg_request = move(libcudf_groupby.scan_request()) for agg in aggs: agg_obj = make_groupby_scan_aggregation(agg) if (valid_aggregations == "ALL" or agg_obj.kind in valid_aggregations): - included_aggregations[col_name].append(agg) + included_aggregations_i.append(agg) c_agg_request.aggregations.push_back( move(agg_obj.c_obj) ) + included_aggregations.append(included_aggregations_i) if not c_agg_request.aggregations.empty(): c_agg_request.values = col.view() c_agg_requests.push_back( move(c_agg_request) ) - + column_included.add(i) if c_agg_requests.empty() and not allow_empty: raise DataError("All requested aggregations are unsupported.") - cdef pair[ - unique_ptr[table], - vector[libcudf_groupby.aggregation_result] - ] c_result - with nogil: c_result = move( self.c_obj.get()[0].scan( @@ -251,24 +262,15 @@ cdef class GroupBy: ) ) - grouped_keys, _ = data_from_unique_ptr( - move(c_result.first), - column_names=self.keys._column_names + grouped_keys = columns_from_unique_ptr( + move(c_result.first) ) - result_data = ColumnAccessor(multiindex=True) - # Note: This loop relies on the included_aggregations dict being - # insertion ordered to map results to requested aggregations by index. - for i, col_name in enumerate(included_aggregations): - for j, agg_name in enumerate(included_aggregations[col_name]): - if callable(agg_name): - agg_name = agg_name.__name__ - result_data[(col_name, agg_name)] = ( - Column.from_unique_ptr(move(c_result.second[i].results[j])) - ) + result_columns = _agg_result_from_columns( + c_result.second, column_included, len(values) + ) - return result_data, cudf.core.index._index_from_data( - grouped_keys) + return result_columns, grouped_keys, included_aggregations def aggregate(self, values, aggregations): """ @@ -292,8 +294,8 @@ cdef class GroupBy: return self.aggregate_internal(values, aggregations) - def shift(self, values, int periods, list fill_values): - cdef table_view view = table_view_from_table(values) + def shift(self, list values, int periods, list fill_values): + cdef table_view view = table_view_from_columns(values) cdef size_type num_col = view.num_columns() cdef vector[size_type] offsets = vector[size_type](num_col, periods) @@ -301,7 +303,7 @@ cdef class GroupBy: cdef DeviceScalar d_slr d_slrs = [] c_fill_values.reserve(num_col) - for val, col in zip(fill_values, values._columns): + for val, col in zip(fill_values, values): d_slr = as_device_scalar(val, dtype=col.dtype) d_slrs.append(d_slr) c_fill_values.push_back( @@ -315,21 +317,13 @@ cdef class GroupBy: self.c_obj.get()[0].shift(view, offsets, c_fill_values) ) - grouped_keys = cudf.core.index._index_from_data( - *data_from_unique_ptr( - move(c_result.first), - column_names=self.keys._column_names - ) - ) - - shifted, _ = data_from_unique_ptr( - move(c_result.second), column_names=values._column_names - ) + grouped_keys = columns_from_unique_ptr(move(c_result.first)) + shifted = columns_from_unique_ptr(move(c_result.second)) return shifted, grouped_keys - def replace_nulls(self, values, object method): - cdef table_view val_view = table_view_from_table(values) + def replace_nulls(self, list values, object method): + cdef table_view val_view = table_view_from_columns(values) cdef pair[unique_ptr[table], unique_ptr[table]] c_result cdef replace_policy policy = ( replace_policy.PRECEDING @@ -344,15 +338,13 @@ cdef class GroupBy: self.c_obj.get()[0].replace_nulls(val_view, policies) ) - return data_from_unique_ptr( - move(c_result.second), column_names=values._column_names - )[0] + return columns_from_unique_ptr(move(c_result.second)) _GROUPBY_SCANS = {"cumcount", "cumsum", "cummin", "cummax"} -def _is_all_scan_aggregate(aggs): +def _is_all_scan_aggregate(all_aggs): """ Returns true if all are scan aggregations. Raises @@ -365,16 +357,12 @@ def _is_all_scan_aggregate(aggs): return agg.__name__ if callable(agg) else agg all_scan = all( - all( - get_name(agg_name) in _GROUPBY_SCANS for agg_name in aggs[col_name] - ) - for col_name in aggs + get_name(agg_name) in _GROUPBY_SCANS for aggs in all_aggs + for agg_name in aggs ) any_scan = any( - any( - get_name(agg_name) in _GROUPBY_SCANS for agg_name in aggs[col_name] - ) - for col_name in aggs + get_name(agg_name) in _GROUPBY_SCANS for aggs in all_aggs + for agg_name in aggs ) if not all_scan and any_scan: diff --git a/python/cudf/cudf/_typing.py b/python/cudf/cudf/_typing.py index 793a5d1d9e8..ca2024929f3 100644 --- a/python/cudf/cudf/_typing.py +++ b/python/cudf/cudf/_typing.py @@ -1,6 +1,6 @@ -# Copyright (c) 2021, NVIDIA CORPORATION. +# Copyright (c) 2021-2022, NVIDIA CORPORATION. -from typing import TYPE_CHECKING, Any, TypeVar, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, TypeVar, Union import numpy as np from pandas import Period, Timedelta, Timestamp @@ -32,3 +32,9 @@ SeriesOrSingleColumnIndex = Union[ "cudf.Series", "cudf.core.index.GenericIndex" ] + +# Groupby aggregation +AggType = Union[str, Callable] +MultiColumnAggType = Union[ + AggType, Iterable[AggType], Dict[Any, Iterable[AggType]] +] diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index 84b3bc03fbf..af7053df113 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -229,14 +229,17 @@ def _from_columns( def _from_columns_like_self( self, columns: List[ColumnBase], - column_names: abc.Iterable[str], + column_names: Optional[abc.Iterable[str]] = None, index_names: Optional[List[str]] = None, ): """Construct a `Frame` from a list of columns with metadata from self. + If `column_names` is None, use column names from self. If `index_names` is set, the first `len(index_names)` columns are used to construct the index of the frame. """ + if column_names is None: + column_names = self._column_names frame = self.__class__._from_columns( columns, column_names, index_names ) diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index a1a4596ba45..0c274911f3d 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -5,16 +5,18 @@ import pickle import warnings from functools import cached_property +from typing import Any, Iterable, List, Tuple, Union import numpy as np import cudf from cudf._lib import groupby as libgroupby from cudf._lib.reshape import interleave_columns -from cudf._typing import DataFrameOrSeries +from cudf._typing import AggType, DataFrameOrSeries, MultiColumnAggType from cudf.api.types import is_list_like from cudf.core.abc import Serializable -from cudf.core.column.column import arange, as_column +from cudf.core.column.column import ColumnBase, arange, as_column +from cudf.core.column_accessor import ColumnAccessor from cudf.core.mixins import Reducible, Scannable from cudf.core.multiindex import MultiIndex from cudf.utils.utils import GetAttrGetItemMixin, _cudf_nvtx_annotate @@ -37,6 +39,8 @@ def _quantile_75(x): class GroupBy(Serializable, Reducible, Scannable): + obj: "cudf.core.indexed_frame.IndexedFrame" + _VALID_REDUCTIONS = { "sum", "prod", @@ -107,6 +111,7 @@ def __init__( self._dropna = dropna if isinstance(by, _Grouping): + by._obj = self.obj self.grouping = by else: self.grouping = _Grouping(obj, by, level) @@ -204,7 +209,9 @@ def cumcount(self): @cached_property def _groupby(self): - return libgroupby.GroupBy(self.grouping.keys, dropna=self._dropna) + return libgroupby.GroupBy( + [*self.grouping.keys._columns], dropna=self._dropna + ) @_cudf_nvtx_annotate def agg(self, func): @@ -274,55 +281,48 @@ def agg(self, func): 1 1.5 1.75 2.0 2.0 2 3.0 3.00 1.0 1.0 """ - normalized_aggs = self._normalize_aggs(func) + column_names, columns, normalized_aggs = self._normalize_aggs(func) # Note: When there are no key columns, the below produces # a Float64Index, while Pandas returns an Int64Index # (GH: 6945) - result = cudf.DataFrame._from_data( - *self._groupby.aggregate(self.obj, normalized_aggs) + ( + result_columns, + grouped_key_cols, + included_aggregations, + ) = self._groupby.aggregate(columns, normalized_aggs) + + result_index = self.grouping.keys._from_columns_like_self( + grouped_key_cols, ) + multilevel = _is_multi_agg(func) + data = {} + for col_name, aggs, cols in zip( + column_names, included_aggregations, result_columns + ): + for agg, col in zip(aggs, cols): + if multilevel: + agg_name = agg.__name__ if callable(agg) else agg + key = (col_name, agg_name) + else: + key = col_name + data[key] = col + data = ColumnAccessor(data, multiindex=multilevel) + if not multilevel: + data = data.rename_levels({np.nan: None}, level=0) + result = cudf.DataFrame._from_data(data, index=result_index) + if self._sort: result = result.sort_index() - if not _is_multi_agg(func): - if result._data.nlevels <= 1: # 0 or 1 levels - # make sure it's a flat index: - result._data.multiindex = False - - if result._data.nlevels > 1: - result._data.droplevel(-1) - - # if, after dropping the last level, the only - # remaining key is `NaN`, we need to convert to `None` - # for Pandas compat: - if result._data.names == (np.nan,): - result._data = result._data.rename_levels( - {np.nan: None}, level=0 - ) + if not self._as_index: + result = result.reset_index() if libgroupby._is_all_scan_aggregate(normalized_aggs): # Scan aggregations return rows in original index order return self._mimic_pandas_order(result) - # set index names to be group key names - if len(result): - result.index.names = self.grouping.names - - # copy categorical information from keys to the result index: - result.index._copy_type_metadata(self.grouping.keys) - result._index = cudf.Index(result._index) - - if not self._as_index: - for col_name in reversed(self.grouping._named_columns): - result._insert( - 0, - col_name, - result.index.get_level_values(col_name)._values, - ) - result.index = cudf.core.index.RangeIndex(len(result)) - return result def _reduce( @@ -417,43 +417,50 @@ def deserialize(cls, header, frames): return cls(obj, grouping, **kwargs) def _grouped(self): - grouped_keys, grouped_values, offsets = self._groupby.groups(self.obj) - grouped_values = self.obj.__class__._from_data(*grouped_values) - grouped_values._copy_type_metadata(self.obj) + grouped_key_cols, grouped_value_cols, offsets = self._groupby.groups( + [*self.obj._index._columns, *self.obj._columns] + ) + grouped_keys = cudf.core.index._index_from_columns(grouped_key_cols) + grouped_values = self.obj._from_columns_like_self( + grouped_value_cols, + column_names=self.obj._column_names, + index_names=self.obj._index_names, + ) group_names = grouped_keys.unique() return (group_names, offsets, grouped_keys, grouped_values) - def _normalize_aggs(self, aggs): + def _normalize_aggs( + self, aggs: MultiColumnAggType + ) -> Tuple[Iterable[Any], Tuple[ColumnBase, ...], List[List[AggType]]]: """ - Normalize aggs to a dict mapping column names - to a list of aggregations. + Normalize aggs to a list of list of aggregations, where `out[i]` + is a list of aggregations for column `self.obj[i]`. We support three + different form of `aggs` input here: + - A single agg, such as "sum". This agg is applied to all value + columns. + - A list of aggs, such as ["sum", "mean"]. All aggs are applied to all + value columns. + - A mapping of column name to aggs, such as + {"a": ["sum"], "b": ["mean"]}, the aggs are applied to specified + column. + Each agg can be string or lambda functions. """ - if not isinstance(aggs, collections.abc.Mapping): - # Make col_name->aggs mapping from aggs. - # Do not include named key columns - - # Can't do set arithmetic here as sets are - # not ordered - if isinstance(self, SeriesGroupBy): - columns = [self.obj.name] - else: - columns = [ - col_name - for col_name in self.obj._data - if col_name not in self.grouping._named_columns - ] - out = dict.fromkeys(columns, aggs) - else: - out = aggs.copy() - # Convert all values to list-like: - for col, agg in out.items(): - if not is_list_like(agg): - out[col] = [agg] - else: - out[col] = list(agg) - - return out + aggs_per_column: Iterable[Union[AggType, Iterable[AggType]]] + if isinstance(aggs, dict): + column_names, aggs_per_column = aggs.keys(), aggs.values() + columns = tuple(self.obj._data[col] for col in column_names) + else: + values = self.grouping.values + column_names = values._column_names + columns = values._columns + aggs_per_column = (aggs,) * len(columns) + + normalized_aggs = [ + list(agg) if is_list_like(agg) else [agg] + for agg in aggs_per_column + ] + return column_names, columns, normalized_aggs def pipe(self, func, *args, **kwargs): """ @@ -1201,29 +1208,20 @@ def diff(self, periods=1, axis=0): if not axis == 0: raise NotImplementedError("Only axis=0 is supported.") - # grouped values - value_columns = self.grouping.values - _, (data, index), _ = self._groupby.groups( - cudf.core.frame.Frame(value_columns._data) - ) - grouped = self.obj.__class__._from_data(data, index) - grouped = self._mimic_pandas_order(grouped)._copy_type_metadata( - value_columns + values = self.obj.__class__._from_data( + self.grouping.values._data, self.obj.index ) - - result = grouped - self.shift(periods=periods) - return result._copy_type_metadata(value_columns) + return values - self.shift(periods=periods) def _scan_fill(self, method: str, limit: int) -> DataFrameOrSeries: """Internal implementation for `ffill` and `bfill`""" - value_columns = self.grouping.values - result = self.obj.__class__._from_data( - self._groupby.replace_nulls( - cudf.core.frame.Frame(value_columns._data), method - ) + values = self.grouping.values + result = self.obj._from_columns( + self._groupby.replace_nulls([*values._columns], method), + values._column_names, ) result = self._mimic_pandas_order(result) - return result._copy_type_metadata(value_columns) + return result._copy_type_metadata(values) def pad(self, limit=None): """Forward fill NA values. @@ -1334,17 +1332,12 @@ def fillna( ) return getattr(self, method, limit)() - value_columns = self.grouping.values - _, (data, index), _ = self._groupby.groups( - cudf.core.frame.Frame(value_columns._data) + values = self.obj.__class__._from_data( + self.grouping.values._data, self.obj.index ) - - grouped = self.obj.__class__._from_data(data, index) - result = grouped.fillna( + return values.fillna( value=value, inplace=inplace, axis=axis, limit=limit ) - result = self._mimic_pandas_order(result) - return result._copy_type_metadata(value_columns) def shift(self, periods=1, freq=None, axis=0, fill_value=None): """ @@ -1385,22 +1378,21 @@ def shift(self, periods=1, freq=None, axis=0, fill_value=None): if not axis == 0: raise NotImplementedError("Only axis=0 is supported.") - value_columns = self.grouping.values + values = self.grouping.values if is_list_like(fill_value): - if not len(fill_value) == len(value_columns._data): + if len(fill_value) != len(values._data): raise ValueError( "Mismatched number of columns and values to fill." ) else: - fill_value = [fill_value] * len(value_columns._data) + fill_value = [fill_value] * len(values._data) - result = self.obj.__class__._from_data( - *self._groupby.shift( - cudf.core.frame.Frame(value_columns._data), periods, fill_value - ) + result = self.obj.__class__._from_columns( + self._groupby.shift([*values._columns], periods, fill_value)[0], + values._column_names, ) result = self._mimic_pandas_order(result) - return result._copy_type_metadata(value_columns) + return result._copy_type_metadata(values) def _mimic_pandas_order( self, result: DataFrameOrSeries @@ -1408,11 +1400,12 @@ def _mimic_pandas_order( """Given a groupby result from libcudf, reconstruct the row orders matching that of pandas. This also adds appropriate indices. """ - sorted_order_column = arange(0, result._data.nrows) - _, (order, _), _ = self._groupby.groups( - cudf.core.frame.Frame({"sorted_order_column": sorted_order_column}) + # TODO: copy metadata after this method is a common pattern, should + # merge in this method. + _, order_cols, _ = self._groupby.groups( + [arange(0, result._data.nrows)] ) - gather_map = order["sorted_order_column"].argsort() + gather_map = order_cols[0].argsort() result = result.take(gather_map) result.index = self.obj.index return result @@ -1502,6 +1495,8 @@ class DataFrameGroupBy(GroupBy, GetAttrGetItemMixin): Captive 210.0 """ + obj: "cudf.core.dataframe.DataFrame" + _PROTECTED_KEYS = frozenset(("obj",)) def __getitem__(self, key): @@ -1570,6 +1565,8 @@ class SeriesGroupBy(GroupBy): Name: Max Speed, dtype: float64 """ + obj: "cudf.core.series.Series" + def agg(self, func): result = super().agg(func) @@ -1667,7 +1664,7 @@ def keys(self): ) @property - def values(self): + def values(self) -> cudf.core.frame.Frame: """Return value columns as a frame. Note that in aggregation, value columns can be arbitrarily