Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use list of columns for methods in Groupby.pyx #10419

Merged
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
a512745
Remove stale cuda version check
isVoid Mar 8, 2022
ca884d5
groupby.groups refactor
isVoid Mar 9, 2022
6666d1b
Refactor shift
isVoid Mar 9, 2022
0a5545d
replace_nulls refactor
isVoid Mar 9, 2022
286fb88
Storing only lists of keys in cython object
isVoid Mar 9, 2022
9adb436
Minimal aggregate API, some TODO notes
isVoid Mar 9, 2022
75c39c3
Rewrites normalized aggs
isVoid Mar 10, 2022
d94d4d3
list of columns passed as `values` argument
isVoid Mar 11, 2022
792c653
Add aggregate docstring
isVoid Mar 11, 2022
8ee3eb9
auto reuse column names in _from_columns_like_self, optimize post pro…
isVoid Mar 11, 2022
1ff18b5
Merge branch 'branch-22.04' of github.com:rapidsai/cudf into improvem…
isVoid Mar 11, 2022
33cb056
Only docstring changes from review comments
isVoid Mar 14, 2022
75b218e
Use canonical for-loop strucutre
isVoid Mar 14, 2022
f6291c5
Reverting some usage of `_from_columns_like_self`.
isVoid Mar 14, 2022
6e35453
Eliminate value->grouped_value->value no-op procedure in diff and fil…
isVoid Mar 14, 2022
a98acae
Add parenthesis
isVoid Mar 14, 2022
7288c17
Applying the simpler aggs unpacking pattern
isVoid Mar 14, 2022
a202cd5
Canonical unequal
isVoid Mar 14, 2022
2903711
Applied a better comprehension technique at constructing aggregation …
isVoid Mar 15, 2022
fa9cae5
Apply suggestions from code review
isVoid Mar 15, 2022
c5ff0d8
Update python/cudf/cudf/_lib/groupby.pyx
isVoid Mar 15, 2022
f7f8739
Cleanups after applying code review suggestions; Use pointer to vecto…
isVoid Mar 15, 2022
7f82f8a
Move comments to correct location
isVoid Mar 15, 2022
6b9e7f2
Update python/cudf/cudf/_lib/groupby.pyx
isVoid Mar 15, 2022
a78355c
Remove nested `any`, `all` calls
isVoid Mar 15, 2022
612b165
Update python/cudf/cudf/core/groupby/groupby.py
isVoid Mar 15, 2022
4d83f4e
Fixing nested comprehension order
isVoid Mar 15, 2022
0335aab
Remove list materialization
isVoid Mar 16, 2022
5fe87bb
Precise type hints for agg types.
isVoid Mar 16, 2022
f605f81
Swapping dict branches with non-dict branches, requires some typing f…
isVoid Mar 16, 2022
978491c
Use forward declaration of type to avoid circular import
isVoid Mar 16, 2022
0175610
Accepting iterable of aggs input
isVoid Mar 16, 2022
9dedfb7
style fixes
isVoid Mar 16, 2022
1aa5e40
Update python/cudf/cudf/_lib/groupby.pyx
isVoid Mar 16, 2022
9b6a814
Rename `column_order`
isVoid Mar 16, 2022
70cd23e
Merge branch 'improvement/ListOfColumnRefactor/groupby' of github.com…
isVoid Mar 16, 2022
0bdc462
Rename `c_result_i`
isVoid Mar 16, 2022
64c7815
type annotation for series/dataframe groupby obj
isVoid Mar 16, 2022
61576e4
Update python/cudf/cudf/core/groupby/groupby.py
isVoid Mar 17, 2022
a805123
Use cdef blocks to improve readability; remove unused variable
isVoid Mar 18, 2022
494934c
Merge branch 'improvement/ListOfColumnRefactor/groupby' of github.com…
isVoid Mar 18, 2022
373ed05
Merge remote-tracking branch 'upstream/branch-22.04' into improvement…
bdice Mar 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 102 additions & 117 deletions python/cudf/cudf/_lib/groupby.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ import cudf

from cudf._lib.column cimport Column
from cudf._lib.scalar cimport DeviceScalar
from cudf._lib.utils cimport table_view_from_table
from cudf._lib.utils cimport (
columns_from_unique_ptr,
data_from_unique_ptr,
table_view_from_columns,
table_view_from_table,
)

from cudf._lib.scalar import as_device_scalar

Expand All @@ -46,7 +51,6 @@ from cudf._lib.cpp.scalar.scalar cimport scalar
from cudf._lib.cpp.table.table cimport table, table_view
from cudf._lib.cpp.types cimport size_type
from cudf._lib.cpp.utilities.host_span cimport host_span
from cudf._lib.utils cimport data_from_unique_ptr

# The sets below define the possible aggregations that can be performed on
# different dtypes. These strings must be elements of the AggregationKind enum.
Expand All @@ -62,19 +66,44 @@ _DECIMAL_AGGS = {"COUNT", "SUM", "ARGMIN", "ARGMAX", "MIN", "MAX", "NUNIQUE",
# workaround for https://github.com/cython/cython/issues/3885
ctypedef const scalar constscalar


cdef _agg_result_from_columns(
vector[libcudf_groupby.aggregation_result]& c_result_columns,
set column_included,
int n_input_columns
):
"""Construct the list of result columns from libcudf result. The result
contains the same number of lists as the number of input columns. Result
for an input column that has no applicable aggregations is an empty list.
"""
cdef int i, j, result_index = 0, n_res_cols = c_result_columns.size()
result_columns = []
cdef vector[unique_ptr[column]]* c_result
isVoid marked this conversation as resolved.
Show resolved Hide resolved
for i in range(n_input_columns):
if i in column_included:
c_result = &c_result_columns[result_index].results
result_columns.append([
Column.from_unique_ptr(move(c_result[0][j]))
for j in range(c_result[0].size())
])
result_index += 1
else:
result_columns.append([])
return result_columns

cdef class GroupBy:
cdef unique_ptr[libcudf_groupby.groupby] c_obj
cdef dict __dict__

def __cinit__(self, keys, bool dropna=True, *args, **kwargs):
def __cinit__(self, list keys, bool dropna=True, *args, **kwargs):
cdef libcudf_types.null_policy c_null_handling

if dropna:
c_null_handling = libcudf_types.null_policy.EXCLUDE
else:
c_null_handling = libcudf_types.null_policy.INCLUDE

cdef table_view keys_view = table_view_from_table(keys)
cdef table_view keys_view = table_view_from_columns(keys)

with nogil:
self.c_obj.reset(
Expand All @@ -84,46 +113,42 @@ cdef class GroupBy:
)
)

def __init__(self, keys, bool dropna=True):
def __init__(self, list keys, bool dropna=True):
self.keys = keys
self.dropna = dropna

def groups(self, values):

cdef table_view values_view = table_view_from_table(values)
def groups(self, list values):
cdef table_view values_view = table_view_from_columns(values)

with nogil:
c_groups = move(self.c_obj.get()[0].get_groups(values_view))

c_grouped_keys = move(c_groups.keys)
c_grouped_values = move(c_groups.values)
c_group_offsets = c_groups.offsets

grouped_keys = cudf.core.index._index_from_data(
*data_from_unique_ptr(
move(c_grouped_keys),
column_names=range(c_grouped_keys.get()[0].num_columns())
)
)
grouped_values = data_from_unique_ptr(
move(c_grouped_values),
index_names=values._index_names,
column_names=values._column_names
)
return grouped_keys, grouped_values, c_group_offsets
grouped_key_cols = columns_from_unique_ptr(move(c_groups.keys))
grouped_value_cols = columns_from_unique_ptr(move(c_groups.values))
return grouped_key_cols, grouped_value_cols, c_groups.offsets

def aggregate_internal(self, values, aggregations):
from cudf.core.column_accessor import ColumnAccessor
"""`values` is a list of columns and `aggregations` is a list of list
of aggregations. `aggregations[i]` is a list of aggregations for
`values[i]`. Returns a tuple containing 1) list of list of aggregation
results, 2) a list of grouped keys, and 3) a list of list of
aggregations performed.
"""
cdef vector[libcudf_groupby.aggregation_request] c_agg_requests
cdef libcudf_groupby.aggregation_request c_agg_request
cdef Column col
cdef GroupbyAggregation agg_obj

allow_empty = all(len(v) == 0 for v in aggregations.values())
cdef pair[
unique_ptr[table],
vector[libcudf_groupby.aggregation_result]
] c_result

included_aggregations = defaultdict(list)
for i, (col_name, aggs) in enumerate(aggregations.items()):
col = values._data[col_name]
allow_empty = all(len(v) == 0 for v in aggregations)

included_aggregations = []
column_included = set()
for i, (col, aggs) in enumerate(zip(values, aggregations)):
dtype = col.dtype

valid_aggregations = (
Expand All @@ -135,74 +160,66 @@ cdef class GroupBy:
else _DECIMAL_AGGS if is_decimal_dtype(dtype)
else "ALL"
)
if (valid_aggregations is _DECIMAL_AGGS
and rmm._cuda.gpu.runtimeGetVersion() < 11000):
raise RuntimeError(
"Decimal aggregations are only supported on CUDA >= 11 "
"due to an nvcc compiler bug."
)
included_aggregations_i = []

c_agg_request = move(libcudf_groupby.aggregation_request())
for agg in aggs:
agg_obj = make_groupby_aggregation(agg)
if (valid_aggregations == "ALL"
or agg_obj.kind in valid_aggregations):
included_aggregations[col_name].append(agg)
included_aggregations_i.append(agg)
c_agg_request.aggregations.push_back(
move(agg_obj.c_obj)
)
included_aggregations.append(included_aggregations_i)
if not c_agg_request.aggregations.empty():
c_agg_request.values = col.view()
c_agg_requests.push_back(
move(c_agg_request)
)

column_included.add(i)
if c_agg_requests.empty() and not allow_empty:
raise DataError("All requested aggregations are unsupported.")

cdef pair[
unique_ptr[table],
vector[libcudf_groupby.aggregation_result]
] c_result

with nogil:
c_result = move(
self.c_obj.get()[0].aggregate(
c_agg_requests
)
)

grouped_keys, _ = data_from_unique_ptr(
move(c_result.first),
column_names=self.keys._column_names
grouped_keys = columns_from_unique_ptr(
move(c_result.first)
)

result_data = ColumnAccessor(multiindex=True)
# Note: This loop relies on the included_aggregations dict being
# insertion ordered to map results to requested aggregations by index.
for i, col_name in enumerate(included_aggregations):
for j, agg_name in enumerate(included_aggregations[col_name]):
if callable(agg_name):
agg_name = agg_name.__name__
result_data[(col_name, agg_name)] = (
Column.from_unique_ptr(move(c_result.second[i].results[j]))
)
result_columns = _agg_result_from_columns(
c_result.second, column_included, len(values)
)

return result_data, cudf.core.index._index_from_data(
grouped_keys)
return result_columns, grouped_keys, included_aggregations

def scan_internal(self, values, aggregations):
from cudf.core.column_accessor import ColumnAccessor
"""`values` is a list of columns and `aggregations` is a list of list
of aggregations. `aggregations[i]` is a list of aggregations for
`values[i]`. Returns a tuple containing 1) list of list of aggregation
results, 2) a list of grouped keys, and 3) a list of list of
aggregations performed.
"""
cdef vector[libcudf_groupby.scan_request] c_agg_requests
cdef libcudf_groupby.scan_request c_agg_request
cdef Column col
cdef GroupbyScanAggregation agg_obj

allow_empty = all(len(v) == 0 for v in aggregations.values())
cdef pair[
unique_ptr[table],
vector[libcudf_groupby.aggregation_result]
] c_result

allow_empty = all(len(v) == 0 for v in aggregations)

included_aggregations = defaultdict(list)
for i, (col_name, aggs) in enumerate(aggregations.items()):
col = values._data[col_name]
included_aggregations = []
column_included = set()
for i, (col, aggs) in enumerate(zip(values, aggregations)):
dtype = col.dtype

valid_aggregations = (
Expand All @@ -214,61 +231,43 @@ cdef class GroupBy:
else _DECIMAL_AGGS if is_decimal_dtype(dtype)
else "ALL"
)
if (valid_aggregations is _DECIMAL_AGGS
and rmm._cuda.gpu.runtimeGetVersion() < 11000):
raise RuntimeError(
"Decimal aggregations are only supported on CUDA >= 11 "
"due to an nvcc compiler bug."
)
included_aggregations_i = []

c_agg_request = move(libcudf_groupby.scan_request())
for agg in aggs:
agg_obj = make_groupby_scan_aggregation(agg)
if (valid_aggregations == "ALL"
or agg_obj.kind in valid_aggregations):
included_aggregations[col_name].append(agg)
included_aggregations_i.append(agg)
c_agg_request.aggregations.push_back(
move(agg_obj.c_obj)
)
included_aggregations.append(included_aggregations_i)
if not c_agg_request.aggregations.empty():
c_agg_request.values = col.view()
c_agg_requests.push_back(
move(c_agg_request)
)

column_included.add(i)
if c_agg_requests.empty() and not allow_empty:
raise DataError("All requested aggregations are unsupported.")

cdef pair[
unique_ptr[table],
vector[libcudf_groupby.aggregation_result]
] c_result

with nogil:
c_result = move(
self.c_obj.get()[0].scan(
c_agg_requests
)
)

grouped_keys, _ = data_from_unique_ptr(
move(c_result.first),
column_names=self.keys._column_names
grouped_keys = columns_from_unique_ptr(
move(c_result.first)
)

result_data = ColumnAccessor(multiindex=True)
# Note: This loop relies on the included_aggregations dict being
# insertion ordered to map results to requested aggregations by index.
for i, col_name in enumerate(included_aggregations):
for j, agg_name in enumerate(included_aggregations[col_name]):
if callable(agg_name):
agg_name = agg_name.__name__
result_data[(col_name, agg_name)] = (
Column.from_unique_ptr(move(c_result.second[i].results[j]))
)
result_columns = _agg_result_from_columns(
c_result.second, column_included, len(values)
)

return result_data, cudf.core.index._index_from_data(
grouped_keys)
return result_columns, grouped_keys, included_aggregations

def aggregate(self, values, aggregations):
"""
Expand All @@ -292,16 +291,16 @@ cdef class GroupBy:

return self.aggregate_internal(values, aggregations)

def shift(self, values, int periods, list fill_values):
cdef table_view view = table_view_from_table(values)
def shift(self, list values, int periods, list fill_values):
cdef table_view view = table_view_from_columns(values)
cdef size_type num_col = view.num_columns()
cdef vector[size_type] offsets = vector[size_type](num_col, periods)

cdef vector[reference_wrapper[constscalar]] c_fill_values
cdef DeviceScalar d_slr
d_slrs = []
c_fill_values.reserve(num_col)
for val, col in zip(fill_values, values._columns):
for val, col in zip(fill_values, values):
d_slr = as_device_scalar(val, dtype=col.dtype)
d_slrs.append(d_slr)
c_fill_values.push_back(
Expand All @@ -315,21 +314,13 @@ cdef class GroupBy:
self.c_obj.get()[0].shift(view, offsets, c_fill_values)
)

grouped_keys = cudf.core.index._index_from_data(
*data_from_unique_ptr(
move(c_result.first),
column_names=self.keys._column_names
)
)

shifted, _ = data_from_unique_ptr(
move(c_result.second), column_names=values._column_names
)
grouped_keys = columns_from_unique_ptr(move(c_result.first))
shifted = columns_from_unique_ptr(move(c_result.second))

return shifted, grouped_keys

def replace_nulls(self, values, object method):
cdef table_view val_view = table_view_from_table(values)
def replace_nulls(self, list values, object method):
cdef table_view val_view = table_view_from_columns(values)
cdef pair[unique_ptr[table], unique_ptr[table]] c_result
cdef replace_policy policy = (
replace_policy.PRECEDING
Expand All @@ -344,15 +335,13 @@ cdef class GroupBy:
self.c_obj.get()[0].replace_nulls(val_view, policies)
)

return data_from_unique_ptr(
move(c_result.second), column_names=values._column_names
)[0]
return columns_from_unique_ptr(move(c_result.second))


_GROUPBY_SCANS = {"cumcount", "cumsum", "cummin", "cummax"}


def _is_all_scan_aggregate(aggs):
def _is_all_scan_aggregate(all_aggs):
"""
Returns true if all are scan aggregations.
Raises
Expand All @@ -365,16 +354,12 @@ def _is_all_scan_aggregate(aggs):
return agg.__name__ if callable(agg) else agg

all_scan = all(
isVoid marked this conversation as resolved.
Show resolved Hide resolved
all(
get_name(agg_name) in _GROUPBY_SCANS for agg_name in aggs[col_name]
)
for col_name in aggs
get_name(agg_name) in _GROUPBY_SCANS for aggs in all_aggs
for agg_name in aggs
)
any_scan = any(
isVoid marked this conversation as resolved.
Show resolved Hide resolved
any(
get_name(agg_name) in _GROUPBY_SCANS for agg_name in aggs[col_name]
)
for col_name in aggs
get_name(agg_name) in _GROUPBY_SCANS for aggs in all_aggs
for agg_name in aggs
)

if not all_scan and any_scan:
Expand Down
Loading