Skip to content

Commit

Permalink
Refactor Python and Cython internals for groupby aggregation (#7818)
Browse files Browse the repository at this point in the history
This PR makes some improvements to the groupby/aggregation code that I identified while working on #7731. The main purpose is to make the code logic easier to follow and reduce some unnecessary complexity; I see minor but measurable performance improvements (2-5% for small datasets) as well, but those are mostly just side effects here. Specifically, it makes the following changes:

1. Inlines the logic for dropping unsupported aggregations. The old function was repetitive and necessitated looping over the aggregations twice, whereas the new approach drops unwanted aggregations on the fly so it only loops once. The new code also makes it so that you only construct a C aggregation object once.
2. Merges the logic from `_AggregationFactory` into `Aggregation`, and removes the constructor for `Aggregation`. The one downside here is that the Cython `Aggregation` object's constructor no longer places it in a valid state; however, in practice the object is always constructed via either the `make_aggregation` function or its various factories, and the object's constructor was only every used in `_drop_unsupported_aggs` anyway. The benefit is we remove the fragmentation between these two classes, making the code much more readable, and the `Aggregation` class actually serves a purpose now beyond just providing a single property `kind` that is only used once: it is now the primary way that other Cython files interact with aggregations. This also means that in most places other Cython modules don't need to work with `unique_ptr[aggregation]` as much anymore (although they do still have to move `Aggregation.c_obj` for performance reasons). `make_aggregation` now returns the Cython class instead of the underlying C++ one.
3. Modified all the "allowed aggregations" sets to use the uppercase names of the aggregations. In addition to simplifying the code a tiny bit, this helps reduce confusion between the aggregation names used in Python for pandas compatibility and the libcudf names (for instance, `idxmin` vs `argmin`, now `ARGMIN`).
4. Explicitly defines all the aggregations on a groupby. I discussed this briefly with @shwina, the change has pros and cons. The benefit is that all of these methods are properly documented now, there's less magic (the binding of methods to a class after its definition can be confusing for less experienced Python developers and has a lot of potential gotchas), and we can use the simpler string-based agg definition wherever possible. The downside is that we now have to define all of these methods. I think the change is definitely an improvement, but I'm happy to change it back if anyone can suggest a better alternative. In the long run we probably need to find a way to share both code and docstrings more effectively between all aggregations (DataFrame, Series, and GroupBy).

Authors:
  - Vyas Ramasubramani (https://github.com/vyasr)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Karthikeyan (https://github.com/karthikeyann)
  - Ashwin Srinath (https://github.com/shwina)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #7818
  • Loading branch information
vyasr authored Apr 16, 2021
1 parent 46c0ba1 commit 8a666a0
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 243 deletions.
4 changes: 2 additions & 2 deletions python/cudf/cudf/_lib/aggregation.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ from libcpp.memory cimport unique_ptr
from cudf._lib.cpp.aggregation cimport aggregation


cdef unique_ptr[aggregation] make_aggregation(op, kwargs=*) except *

cdef class Aggregation:
cdef unique_ptr[aggregation] c_obj

cdef Aggregation make_aggregation(op, kwargs=*)
137 changes: 73 additions & 64 deletions python/cudf/cudf/_lib/aggregation.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -56,85 +56,55 @@ class AggregationKind(Enum):


cdef class Aggregation:
def __init__(self, op, **kwargs):
self.c_obj = move(make_aggregation(op, kwargs))

"""A Cython wrapper for aggregations.
**This class should never be instantiated using a standard constructor,
only using one of its many factories.** These factories handle mapping
different cudf operations to their libcudf analogs, e.g.
`cudf.DataFrame.idxmin` -> `libcudf.argmin`. Additionally, they perform
any additional configuration needed to translate Python arguments into
their corresponding C++ types (for instance, C++ enumerations used for
flag arguments). The factory approach is necessary to support operations
like `df.agg(lambda x: x.sum())`; such functions are called with this
class as an argument to generation the desired aggregation.
"""
@property
def kind(self):
return AggregationKind(self.c_obj.get()[0].kind).name.lower()


cdef unique_ptr[aggregation] make_aggregation(op, kwargs={}) except *:
"""
Parameters
----------
op : str or callable
If callable, must meet one of the following requirements:
* Is of the form lambda x: x.agg(*args, **kwargs), where
`agg` is the name of a supported aggregation. Used to
to specify aggregations that take arguments, e.g.,
`lambda x: x.quantile(0.5)`.
* Is a user defined aggregation function that operates on
group values. In this case, the output dtype must be
specified in the `kwargs` dictionary.
Returns
-------
unique_ptr[aggregation]
"""
cdef Aggregation agg
if isinstance(op, str):
agg = getattr(_AggregationFactory, op)(**kwargs)
elif callable(op):
if op is list:
agg = _AggregationFactory.collect()
elif "dtype" in kwargs:
agg = _AggregationFactory.from_udf(op, **kwargs)
else:
agg = op(_AggregationFactory)
else:
raise TypeError("Unknown aggregation {}".format(op))
return move(agg.c_obj)

# The Cython pattern below enables us to create an Aggregation
# without ever calling its `__init__` method, which would otherwise
# result in a RecursionError.
cdef class _AggregationFactory:
return AggregationKind(self.c_obj.get()[0].kind).name

@classmethod
def sum(cls):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
cdef Aggregation agg = cls()
agg.c_obj = move(libcudf_aggregation.make_sum_aggregation())
return agg

@classmethod
def min(cls):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
cdef Aggregation agg = cls()
agg.c_obj = move(libcudf_aggregation.make_min_aggregation())
return agg

@classmethod
def max(cls):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
cdef Aggregation agg = cls()
agg.c_obj = move(libcudf_aggregation.make_max_aggregation())
return agg

@classmethod
def idxmin(cls):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
cdef Aggregation agg = cls()
agg.c_obj = move(libcudf_aggregation.make_argmin_aggregation())
return agg

@classmethod
def idxmax(cls):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
cdef Aggregation agg = cls()
agg.c_obj = move(libcudf_aggregation.make_argmax_aggregation())
return agg

@classmethod
def mean(cls):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
cdef Aggregation agg = cls()
agg.c_obj = move(libcudf_aggregation.make_mean_aggregation())
return agg

Expand All @@ -146,15 +116,15 @@ cdef class _AggregationFactory:
else:
c_null_handling = libcudf_types.null_policy.INCLUDE

cdef Aggregation agg = Aggregation.__new__(Aggregation)
cdef Aggregation agg = cls()
agg.c_obj = move(libcudf_aggregation.make_count_aggregation(
c_null_handling
))
return agg

@classmethod
def size(cls):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
cdef Aggregation agg = cls()
agg.c_obj = move(libcudf_aggregation.make_count_aggregation(
<libcudf_types.null_policy><underlying_type_t_null_policy>(
NullHandling.INCLUDE
Expand All @@ -164,63 +134,63 @@ cdef class _AggregationFactory:

@classmethod
def nunique(cls):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
cdef Aggregation agg = cls()
agg.c_obj = move(libcudf_aggregation.make_nunique_aggregation())
return agg

@classmethod
def nth(cls, libcudf_types.size_type size):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
cdef Aggregation agg = cls()
agg.c_obj = move(
libcudf_aggregation.make_nth_element_aggregation(size)
)
return agg

@classmethod
def any(cls):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
cdef Aggregation agg = cls()
agg.c_obj = move(libcudf_aggregation.make_any_aggregation())
return agg

@classmethod
def all(cls):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
cdef Aggregation agg = cls()
agg.c_obj = move(libcudf_aggregation.make_all_aggregation())
return agg

@classmethod
def product(cls):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
cdef Aggregation agg = cls()
agg.c_obj = move(libcudf_aggregation.make_product_aggregation())
return agg

@classmethod
def sum_of_squares(cls):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
cdef Aggregation agg = cls()
agg.c_obj = move(libcudf_aggregation.make_sum_of_squares_aggregation())
return agg

@classmethod
def var(cls, ddof=1):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
cdef Aggregation agg = cls()
agg.c_obj = move(libcudf_aggregation.make_variance_aggregation(ddof))
return agg

@classmethod
def std(cls, ddof=1):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
cdef Aggregation agg = cls()
agg.c_obj = move(libcudf_aggregation.make_std_aggregation(ddof))
return agg

@classmethod
def median(cls):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
cdef Aggregation agg = cls()
agg.c_obj = move(libcudf_aggregation.make_median_aggregation())
return agg

@classmethod
def quantile(cls, q=0.5, interpolation="linear"):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
cdef Aggregation agg = cls()

if not pd.api.types.is_list_like(q):
q = [q]
Expand All @@ -240,19 +210,19 @@ cdef class _AggregationFactory:

@classmethod
def collect(cls):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
cdef Aggregation agg = cls()
agg.c_obj = move(libcudf_aggregation.make_collect_list_aggregation())
return agg

@classmethod
def unique(cls):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
cdef Aggregation agg = cls()
agg.c_obj = move(libcudf_aggregation.make_collect_set_aggregation())
return agg

@classmethod
def from_udf(cls, op, *args, **kwargs):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
cdef Aggregation agg = cls()

cdef libcudf_types.type_id tid
cdef libcudf_types.data_type out_dtype
Expand Down Expand Up @@ -282,3 +252,42 @@ cdef class _AggregationFactory:
libcudf_aggregation.udf_type.PTX, cpp_str, out_dtype
))
return agg


cdef Aggregation make_aggregation(op, kwargs=None):
r"""
Parameters
----------
op : str or callable
If callable, must meet one of the following requirements:
* Is of the form lambda x: x.agg(*args, **kwargs), where
`agg` is the name of a supported aggregation. Used to
to specify aggregations that take arguments, e.g.,
`lambda x: x.quantile(0.5)`.
* Is a user defined aggregation function that operates on
group values. In this case, the output dtype must be
specified in the `kwargs` dictionary.
\*\*kwargs : dict, optional
Any keyword arguments to be passed to the op.
Returns
-------
Aggregation
"""
if kwargs is None:
kwargs = {}

cdef Aggregation agg
if isinstance(op, str):
agg = getattr(Aggregation, op)(**kwargs)
elif callable(op):
if op is list:
agg = Aggregation.collect()
elif "dtype" in kwargs:
agg = Aggregation.from_udf(op, **kwargs)
else:
agg = op(Aggregation)
else:
raise TypeError(f"Unknown aggregation {op}")
return agg
Loading

0 comments on commit 8a666a0

Please sign in to comment.