diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst index 6e596151871..4772d654a3c 100644 --- a/docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst @@ -15,6 +15,7 @@ This page provides API documentation for pylibcudf. gpumemoryview groupby join + reduce scalar table types diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/reduce.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/reduce.rst new file mode 100644 index 00000000000..e6f1b02331d --- /dev/null +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/reduce.rst @@ -0,0 +1,6 @@ +====== +reduce +====== + +.. automodule:: cudf._lib.pylibcudf.reduce + :members: diff --git a/python/cudf/cudf/_lib/aggregation.pxd b/python/cudf/cudf/_lib/aggregation.pxd index f83f170c7c2..7a2a2b022fb 100644 --- a/python/cudf/cudf/_lib/aggregation.pxd +++ b/python/cudf/cudf/_lib/aggregation.pxd @@ -3,28 +3,14 @@ from libcpp.memory cimport unique_ptr from cudf._lib cimport pylibcudf -from cudf._lib.cpp.aggregation cimport ( - groupby_aggregation, - groupby_scan_aggregation, - reduce_aggregation, - rolling_aggregation, - scan_aggregation, -) +from cudf._lib.cpp.aggregation cimport rolling_aggregation cdef class RollingAggregation: cdef unique_ptr[rolling_aggregation] c_obj -cdef class GroupbyAggregation: +cdef class Aggregation: cdef pylibcudf.aggregation.Aggregation c_obj -cdef class ReduceAggregation: - cdef unique_ptr[reduce_aggregation] c_obj - -cdef class ScanAggregation: - cdef unique_ptr[scan_aggregation] c_obj - cdef RollingAggregation make_rolling_aggregation(op, kwargs=*) -cdef GroupbyAggregation make_groupby_aggregation(op, kwargs=*) -cdef ReduceAggregation make_reduce_aggregation(op, kwargs=*) -cdef ScanAggregation make_scan_aggregation(op, kwargs=*) +cdef Aggregation make_aggregation(op, kwargs=*) diff --git a/python/cudf/cudf/_lib/aggregation.pyx b/python/cudf/cudf/_lib/aggregation.pyx index 127580a6ec6..036c922e128 100644 --- a/python/cudf/cudf/_lib/aggregation.pyx +++ b/python/cudf/cudf/_lib/aggregation.pyx @@ -6,21 +6,17 @@ import pandas as pd from libcpp.string cimport string from libcpp.utility cimport move -from libcpp.vector cimport vector from cudf._lib.types import SUPPORTED_NUMPY_TO_LIBCUDF_TYPES, NullHandling from cudf.utils import cudautils from cudf._lib.types cimport ( - underlying_type_t_interpolation, underlying_type_t_null_policy, underlying_type_t_type_id, ) from numba.np import numpy_support -from cudf._lib.types import Interpolation - cimport cudf._lib.cpp.aggregation as libcudf_aggregation cimport cudf._lib.cpp.types as libcudf_types from cudf._lib.cpp.aggregation cimport underlying_type_t_correlation_type @@ -245,19 +241,7 @@ cdef class RollingAggregation: )) return agg -cdef class GroupbyAggregation: - """A Cython wrapper for groupby aggregations. - - **This class should never be instantiated using a standard constructor, - only using one of its many factories.** These factories handle mapping - different cudf operations to their libcudf analogs, e.g. - `cudf.DataFrame.idxmin` -> `libcudf.argmin`. Additionally, they perform - any additional configuration needed to translate Python arguments into - their corresponding C++ types (for instance, C++ enumerations used for - flag arguments). The factory approach is necessary to support operations - like `df.agg(lambda x: x.sum())`; such functions are called with this - class as an argument to generation the desired aggregation. - """ +cdef class Aggregation: def __init__(self, pylibcudf.aggregation.Aggregation agg): self.c_obj = agg @@ -410,202 +394,14 @@ cdef class GroupbyAggregation: )) - -cdef class ReduceAggregation: - """A Cython wrapper for reduce aggregations. - - **This class should never be instantiated using a standard constructor, - only using one of its many factories.** These factories handle mapping - different cudf operations to their libcudf analogs, e.g. - `cudf.DataFrame.idxmin` -> `libcudf.argmin`. Additionally, they perform - any additional configuration needed to translate Python arguments into - their corresponding C++ types (for instance, C++ enumerations used for - flag arguments). The factory approach is necessary to support operations - like `df.agg(lambda x: x.sum())`; such functions are called with this - class as an argument to generation the desired aggregation. - """ - @property - def kind(self): - return AggregationKind(self.c_obj.get()[0].kind).name - - @classmethod - def sum(cls): - cdef ReduceAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_sum_aggregation[reduce_aggregation]()) - return agg - - @classmethod - def product(cls): - cdef ReduceAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation.make_product_aggregation[ - reduce_aggregation]()) - return agg - prod = product - - @classmethod - def min(cls): - cdef ReduceAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_min_aggregation[reduce_aggregation]()) - return agg - - @classmethod - def max(cls): - cdef ReduceAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_max_aggregation[reduce_aggregation]()) - return agg - + # Reduce aggregations @classmethod def any(cls): - cdef ReduceAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation.make_any_aggregation[reduce_aggregation]()) - return agg + return cls(pylibcudf.aggregation.any()) @classmethod def all(cls): - cdef ReduceAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation.make_all_aggregation[reduce_aggregation]()) - return agg - - @classmethod - def sum_of_squares(cls): - cdef ReduceAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation.make_sum_of_squares_aggregation[ - reduce_aggregation]() - ) - return agg - - @classmethod - def mean(cls): - cdef ReduceAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation.make_mean_aggregation[reduce_aggregation]()) - return agg - - @classmethod - def var(cls, ddof=1): - cdef ReduceAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation.make_variance_aggregation[ - reduce_aggregation](ddof)) - return agg - - @classmethod - def std(cls, ddof=1): - cdef ReduceAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation.make_std_aggregation[reduce_aggregation](ddof)) - return agg - - @classmethod - def median(cls): - cdef ReduceAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation.make_median_aggregation[reduce_aggregation]()) - return agg - - @classmethod - def quantile(cls, q=0.5, interpolation="linear"): - cdef ReduceAggregation agg = cls() - - if not pd.api.types.is_list_like(q): - q = [q] - - cdef vector[double] c_q = q - cdef libcudf_types.interpolation c_interp = ( - ( - ( - Interpolation[interpolation.upper()] - ) - ) - ) - agg.c_obj = move( - libcudf_aggregation.make_quantile_aggregation[reduce_aggregation]( - c_q, c_interp) - ) - return agg - - @classmethod - def nunique(cls): - cdef ReduceAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation.make_nunique_aggregation[reduce_aggregation]( - libcudf_types.null_policy.EXCLUDE - )) - return agg - - @classmethod - def nth(cls, libcudf_types.size_type size): - cdef ReduceAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation.make_nth_element_aggregation[ - reduce_aggregation](size)) - return agg - -cdef class ScanAggregation: - """A Cython wrapper for scan aggregations. - - **This class should never be instantiated using a standard constructor, - only using one of its many factories.** These factories handle mapping - different cudf operations to their libcudf analogs, e.g. - `cudf.DataFrame.idxmin` -> `libcudf.argmin`. Additionally, they perform - any additional configuration needed to translate Python arguments into - their corresponding C++ types (for instance, C++ enumerations used for - flag arguments). The factory approach is necessary to support operations - like `df.agg(lambda x: x.sum())`; such functions are called with this - class as an argument to generation the desired aggregation. - """ - @property - def kind(self): - return AggregationKind(self.c_obj.get()[0].kind).name - - @classmethod - def sum(cls): - cdef ScanAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_sum_aggregation[scan_aggregation]()) - return agg - - @classmethod - def product(cls): - cdef ScanAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation.make_product_aggregation[scan_aggregation]()) - return agg - prod = product - - @classmethod - def min(cls): - cdef ScanAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_min_aggregation[scan_aggregation]()) - return agg - - @classmethod - def max(cls): - cdef ScanAggregation agg = cls() - agg.c_obj = move( - libcudf_aggregation. - make_max_aggregation[scan_aggregation]()) - return agg - - # scan aggregations - # TODO: update this after adding per algorithm aggregation derived types - # https://github.com/rapidsai/cudf/issues/7106 - cumsum = sum - cummin = min - cummax = max + return cls(pylibcudf.aggregation.all()) cdef RollingAggregation make_rolling_aggregation(op, kwargs=None): @@ -646,7 +442,7 @@ cdef RollingAggregation make_rolling_aggregation(op, kwargs=None): raise TypeError(f"Unknown aggregation {op}") return agg -cdef GroupbyAggregation make_groupby_aggregation(op, kwargs=None): +cdef Aggregation make_aggregation(op, kwargs=None): r""" Parameters ---------- @@ -665,97 +461,21 @@ cdef GroupbyAggregation make_groupby_aggregation(op, kwargs=None): Returns ------- - GroupbyAggregation - """ - if kwargs is None: - kwargs = {} - - cdef GroupbyAggregation agg - if isinstance(op, str): - agg = getattr(GroupbyAggregation, op)(**kwargs) - elif callable(op): - if op is list: - agg = GroupbyAggregation.collect() - elif "dtype" in kwargs: - agg = GroupbyAggregation.from_udf(op, **kwargs) - else: - agg = op(GroupbyAggregation) - else: - raise TypeError(f"Unknown aggregation {op}") - return agg - -cdef ReduceAggregation make_reduce_aggregation(op, kwargs=None): - r""" - Parameters - ---------- - op : str or callable - If callable, must meet one of the following requirements: - - * Is of the form lambda x: x.agg(*args, **kwargs), where - `agg` is the name of a supported aggregation. Used to - to specify aggregations that take arguments, e.g., - `lambda x: x.quantile(0.5)`. - * Is a user defined aggregation function that operates on - reducible values. In this case, the output dtype must be - specified in the `kwargs` dictionary. - \*\*kwargs : dict, optional - Any keyword arguments to be passed to the op. - - Returns - ------- - ReduceAggregation - """ - if kwargs is None: - kwargs = {} - - cdef ReduceAggregation agg - if isinstance(op, str): - agg = getattr(ReduceAggregation, op)(**kwargs) - elif callable(op): - if op is list: - agg = ReduceAggregation.collect() - elif "dtype" in kwargs: - agg = ReduceAggregation.from_udf(op, **kwargs) - else: - agg = op(ReduceAggregation) - else: - raise TypeError(f"Unknown aggregation {op}") - return agg - -cdef ScanAggregation make_scan_aggregation(op, kwargs=None): - r""" - Parameters - ---------- - op : str or callable - If callable, must meet one of the following requirements: - - * Is of the form lambda x: x.agg(*args, **kwargs), where - `agg` is the name of a supported aggregation. Used to - to specify aggregations that take arguments, e.g., - `lambda x: x.quantile(0.5)`. - * Is a user defined aggregation function that operates on - scannable values. In this case, the output dtype must be - specified in the `kwargs` dictionary. - \*\*kwargs : dict, optional - Any keyword arguments to be passed to the op. - - Returns - ------- - ScanAggregation + Aggregation """ if kwargs is None: kwargs = {} - cdef ScanAggregation agg + cdef Aggregation agg if isinstance(op, str): - agg = getattr(ScanAggregation, op)(**kwargs) + agg = getattr(Aggregation, op)(**kwargs) elif callable(op): if op is list: - agg = ScanAggregation.collect() + agg = Aggregation.collect() elif "dtype" in kwargs: - agg = ScanAggregation.from_udf(op, **kwargs) + agg = Aggregation.from_udf(op, **kwargs) else: - agg = op(ScanAggregation) + agg = op(Aggregation) else: raise TypeError(f"Unknown aggregation {op}") return agg diff --git a/python/cudf/cudf/_lib/cpp/CMakeLists.txt b/python/cudf/cudf/_lib/cpp/CMakeLists.txt index e79fef98448..da06cf225e9 100644 --- a/python/cudf/cudf/_lib/cpp/CMakeLists.txt +++ b/python/cudf/cudf/_lib/cpp/CMakeLists.txt @@ -12,7 +12,7 @@ # the License. # ============================================================================= -set(cython_sources aggregation.pyx binaryop.pyx copying.pyx types.pyx unary.pyx) +set(cython_sources aggregation.pyx binaryop.pyx copying.pyx reduce.pxd types.pyx unary.pyx) set(linked_libraries cudf::cudf) diff --git a/python/cudf/cudf/_lib/cpp/reduce.pxd b/python/cudf/cudf/_lib/cpp/reduce.pxd index 997782dec6c..9c893fe9bcb 100644 --- a/python/cudf/cudf/_lib/cpp/reduce.pxd +++ b/python/cudf/cudf/_lib/cpp/reduce.pxd @@ -1,5 +1,6 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. +from libcpp cimport bool from libcpp.memory cimport unique_ptr from libcpp.utility cimport pair @@ -17,7 +18,7 @@ cdef extern from "cudf/reduction.hpp" namespace "cudf" nogil: data_type type ) except + - ctypedef enum scan_type: + cpdef enum class scan_type(bool): INCLUSIVE "cudf::scan_type::INCLUSIVE", EXCLUSIVE "cudf::scan_type::EXCLUSIVE", diff --git a/python/cudf/cudf/_lib/cpp/reduce.pyx b/python/cudf/cudf/_lib/cpp/reduce.pyx new file mode 100644 index 00000000000..e69de29bb2d diff --git a/python/cudf/cudf/_lib/groupby.pyx b/python/cudf/cudf/_lib/groupby.pyx index 3493d1c4f33..eb0f784de17 100644 --- a/python/cudf/cudf/_lib/groupby.pyx +++ b/python/cudf/cudf/_lib/groupby.pyx @@ -18,7 +18,7 @@ from cudf._lib.utils cimport columns_from_pylibcudf_table from cudf._lib.scalar import as_device_scalar -from cudf._lib.aggregation cimport make_groupby_aggregation +from cudf._lib.aggregation cimport make_aggregation from cudf._lib.cpp.replace cimport replace_policy from cudf._lib.cpp.scalar.scalar cimport scalar @@ -164,7 +164,7 @@ cdef class GroupBy: included_aggregations_i = [] col_aggregations = [] for agg in aggs: - agg_obj = make_groupby_aggregation(agg) + agg_obj = make_aggregation(agg) if valid_aggregations == "ALL" or agg_obj.kind in valid_aggregations: included_aggregations_i.append((agg, agg_obj.kind)) col_aggregations.append(agg_obj.c_obj) diff --git a/python/cudf/cudf/_lib/pylibcudf/CMakeLists.txt b/python/cudf/cudf/_lib/pylibcudf/CMakeLists.txt index da5645b5947..6144fd07ac0 100644 --- a/python/cudf/cudf/_lib/pylibcudf/CMakeLists.txt +++ b/python/cudf/cudf/_lib/pylibcudf/CMakeLists.txt @@ -14,7 +14,7 @@ set(cython_sources aggregation.pyx binaryop.pyx column.pyx copying.pyx gpumemoryview.pyx groupby.pyx interop.pyx - join.pyx scalar.pyx table.pyx types.pyx unary.pyx utils.pyx + join.pyx reduce.pyx scalar.pyx table.pyx types.pyx unary.pyx utils.pyx ) set(linked_libraries cudf::cudf) rapids_cython_create_modules( diff --git a/python/cudf/cudf/_lib/pylibcudf/__init__.pxd b/python/cudf/cudf/_lib/pylibcudf/__init__.pxd index bbe491f43e3..74afa2dbacd 100644 --- a/python/cudf/cudf/_lib/pylibcudf/__init__.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/__init__.pxd @@ -1,7 +1,16 @@ # Copyright (c) 2023-2024, NVIDIA CORPORATION. # TODO: Verify consistent usage of relative/absolute imports in pylibcudf. -from . cimport aggregation, binaryop, copying, groupby, interop, join, unary +from . cimport ( + aggregation, + binaryop, + copying, + groupby, + interop, + join, + reduce, + unary, +) from .column cimport Column from .gpumemoryview cimport gpumemoryview from .scalar cimport Scalar @@ -23,5 +32,6 @@ __all__ = [ "interop", "join", "unary", + "reduce", "types", ] diff --git a/python/cudf/cudf/_lib/pylibcudf/__init__.py b/python/cudf/cudf/_lib/pylibcudf/__init__.py index 35812b65046..96663d365a8 100644 --- a/python/cudf/cudf/_lib/pylibcudf/__init__.py +++ b/python/cudf/cudf/_lib/pylibcudf/__init__.py @@ -1,6 +1,15 @@ # Copyright (c) 2023-2024, NVIDIA CORPORATION. -from . import aggregation, binaryop, copying, groupby, interop, join, unary +from . import ( + aggregation, + binaryop, + copying, + groupby, + interop, + join, + reduce, + unary, +) from .column import Column from .gpumemoryview import gpumemoryview from .scalar import Scalar @@ -21,5 +30,6 @@ "interop", "join", "unary", + "reduce", "types", ] diff --git a/python/cudf/cudf/_lib/pylibcudf/aggregation.pxd b/python/cudf/cudf/_lib/pylibcudf/aggregation.pxd index 8eda16c4165..1b7da5a5532 100644 --- a/python/cudf/cudf/_lib/pylibcudf/aggregation.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/aggregation.pxd @@ -10,6 +10,8 @@ from cudf._lib.cpp.aggregation cimport ( groupby_scan_aggregation, rank_method, rank_percentage, + reduce_aggregation, + scan_aggregation, ) from cudf._lib.cpp.types cimport ( interpolation, @@ -23,14 +25,23 @@ from cudf._lib.cpp.types cimport ( from .types cimport DataType +# workaround for https://github.com/cython/cython/issues/3885 +ctypedef groupby_aggregation * gba_ptr +ctypedef groupby_scan_aggregation * gbsa_ptr +ctypedef reduce_aggregation * ra_ptr +ctypedef scan_aggregation * sa_ptr + cdef class Aggregation: cdef unique_ptr[aggregation] c_obj cpdef kind(self) + cdef void _unsupported_agg_error(self, str alg) cdef unique_ptr[groupby_aggregation] clone_underlying_as_groupby(self) except * cdef unique_ptr[groupby_scan_aggregation] clone_underlying_as_groupby_scan( self ) except * + cdef const reduce_aggregation* view_underlying_as_reduce(self) except * + cdef const scan_aggregation* view_underlying_as_scan(self) except * @staticmethod cdef Aggregation from_libcudf(unique_ptr[aggregation] agg) diff --git a/python/cudf/cudf/_lib/pylibcudf/aggregation.pyx b/python/cudf/cudf/_lib/pylibcudf/aggregation.pyx index bde2643d5b1..0020a0c681d 100644 --- a/python/cudf/cudf/_lib/pylibcudf/aggregation.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/aggregation.pyx @@ -35,6 +35,8 @@ from cudf._lib.cpp.aggregation cimport ( make_variance_aggregation, rank_method, rank_percentage, + reduce_aggregation, + scan_aggregation, ) from cudf._lib.cpp.types cimport ( interpolation, @@ -57,10 +59,6 @@ from cudf._lib.cpp.aggregation import udf_type as UdfType # no-cython-lint from .types cimport DataType -# workaround for https://github.com/cython/cython/issues/3885 -ctypedef groupby_aggregation * gba_ptr -ctypedef groupby_scan_aggregation * gbsa_ptr - cdef class Aggregation: """A type of aggregation to perform. @@ -85,40 +83,47 @@ cdef class Aggregation: """Get the kind of the aggregation.""" return dereference(self.c_obj).kind - cdef unique_ptr[groupby_aggregation] clone_underlying_as_groupby(self) except *: - """Make a copy of the underlying aggregation that can be used in a groupby. + cdef void _unsupported_agg_error(self, str alg): + # Te functions calling this all use a dynamic cast between aggregation types, + # and the cast returning a null pointer is how we capture whether or not + # libcudf supports a given aggregation for a particular algorithm. + agg_repr = str(self.kind()).split(".")[1].title() + raise TypeError(f"{agg_repr} aggregations are not supported by {alg}") - This function will raise an exception if the aggregation is not supported as a - groupby aggregation. This failure to cast translates the per-algorithm - aggregation logic encoded in libcudf's type hierarchy into Python. - """ + cdef unique_ptr[groupby_aggregation] clone_underlying_as_groupby(self) except *: + """Make a copy of the aggregation that can be used in a groupby.""" cdef unique_ptr[aggregation] agg = dereference(self.c_obj).clone() cdef groupby_aggregation *agg_cast = dynamic_cast[gba_ptr](agg.get()) if agg_cast is NULL: - agg_repr = str(self.kind()).split(".")[1].title() - raise TypeError(f"{agg_repr} aggregations are not supported by groupby") + self._unsupported_agg_error("groupby") agg.release() return unique_ptr[groupby_aggregation](agg_cast) - # Ideally this function could reuse the code above, but Cython lacks the - # first-class support for type-aliasing and templates that would make it possible. cdef unique_ptr[groupby_scan_aggregation] clone_underlying_as_groupby_scan( self ) except *: - """Make a copy of the underlying aggregation that can be used in a groupby scan. - - This function will raise an exception if the aggregation is not supported as a - groupby scan aggregation. This failure to cast translates the per-algorithm - aggregation logic encoded in libcudf's type hierarchy into Python. - """ + """Make a copy of the aggregation that can be used in a groupby scan.""" cdef unique_ptr[aggregation] agg = dereference(self.c_obj).clone() cdef groupby_scan_aggregation *agg_cast = dynamic_cast[gbsa_ptr](agg.get()) if agg_cast is NULL: - agg_repr = str(self.kind()).split(".")[1].title() - raise TypeError(f"{agg_repr} scans are not supported by groupby") + self._unsupported_agg_error("groupby_scan") agg.release() return unique_ptr[groupby_scan_aggregation](agg_cast) + cdef const reduce_aggregation* view_underlying_as_reduce(self) except *: + """View the underlying aggregation as a reduce_aggregation.""" + cdef reduce_aggregation *agg_cast = dynamic_cast[ra_ptr](self.c_obj.get()) + if agg_cast is NULL: + self._unsupported_agg_error("reduce") + return agg_cast + + cdef const scan_aggregation* view_underlying_as_scan(self) except *: + """View the underlying aggregation as a scan_aggregation.""" + cdef scan_aggregation *agg_cast = dynamic_cast[sa_ptr](self.c_obj.get()) + if agg_cast is NULL: + self._unsupported_agg_error("scan") + return agg_cast + @staticmethod cdef Aggregation from_libcudf(unique_ptr[aggregation] agg): """Create a Python Aggregation from a libcudf aggregation.""" diff --git a/python/cudf/cudf/_lib/pylibcudf/reduce.pxd b/python/cudf/cudf/_lib/pylibcudf/reduce.pxd new file mode 100644 index 00000000000..a613e877ce2 --- /dev/null +++ b/python/cudf/cudf/_lib/pylibcudf/reduce.pxd @@ -0,0 +1,15 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from cudf._lib.cpp.reduce cimport scan_type + +from .aggregation cimport Aggregation +from .column cimport Column +from .scalar cimport Scalar +from .types cimport DataType + + +cpdef Scalar reduce(Column col, Aggregation agg, DataType data_type) + +cpdef Column scan(Column col, Aggregation agg, scan_type inclusive) + +cpdef tuple minmax(Column col) diff --git a/python/cudf/cudf/_lib/pylibcudf/reduce.pyx b/python/cudf/cudf/_lib/pylibcudf/reduce.pyx new file mode 100644 index 00000000000..d12da712fcf --- /dev/null +++ b/python/cudf/cudf/_lib/pylibcudf/reduce.pyx @@ -0,0 +1,108 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from cython.operator cimport dereference +from libcpp.memory cimport unique_ptr +from libcpp.utility cimport move, pair + +from cudf._lib.cpp cimport reduce as cpp_reduce +from cudf._lib.cpp.aggregation cimport reduce_aggregation, scan_aggregation +from cudf._lib.cpp.column.column cimport column +from cudf._lib.cpp.reduce cimport scan_type +from cudf._lib.cpp.scalar.scalar cimport scalar + +from .aggregation cimport Aggregation +from .column cimport Column +from .scalar cimport Scalar +from .types cimport DataType + +from cudf._lib.cpp.reduce import scan_type as ScanType # no-cython-lint + + +cpdef Scalar reduce(Column col, Aggregation agg, DataType data_type): + """Perform a reduction on a column + + For details, see ``cudf::reduce`` documentation. + + Parameters + ---------- + col : Column + The column to perform the reduction on. + agg : Aggregation + The aggregation to perform. + data_type : DataType + The data type of the result. + + Returns + ------- + Scalar + The result of the reduction. + """ + cdef unique_ptr[scalar] result + cdef const reduce_aggregation *c_agg = agg.view_underlying_as_reduce() + with nogil: + result = move( + cpp_reduce.cpp_reduce( + col.view(), + dereference(c_agg), + data_type.c_obj + ) + ) + return Scalar.from_libcudf(move(result)) + + +cpdef Column scan(Column col, Aggregation agg, scan_type inclusive): + """Perform a scan on a column + + For details, see ``cudf::scan`` documentation. + + Parameters + ---------- + col : Column + The column to perform the scan on. + agg : Aggregation + The aggregation to perform. + inclusive : scan_type + The type of scan to perform. + + Returns + ------- + Column + The result of the scan. + """ + cdef unique_ptr[column] result + cdef const scan_aggregation *c_agg = agg.view_underlying_as_scan() + with nogil: + result = move( + cpp_reduce.cpp_scan( + col.view(), + dereference(c_agg), + inclusive, + ) + ) + return Column.from_libcudf(move(result)) + + +cpdef tuple minmax(Column col): + """Compute the minimum and maximum of a column + + For details, see ``cudf::minmax`` documentation. + + Parameters + ---------- + col : Column + The column to compute the minimum and maximum of. + + Returns + ------- + tuple + A tuple of two Scalars, the first being the minimum and the second + being the maximum. + """ + cdef pair[unique_ptr[scalar], unique_ptr[scalar]] result + with nogil: + result = move(cpp_reduce.cpp_minmax(col.view())) + + return ( + Scalar.from_libcudf(move(result.first)), + Scalar.from_libcudf(move(result.second)), + ) diff --git a/python/cudf/cudf/_lib/reduce.pyx b/python/cudf/cudf/_lib/reduce.pyx index f11bacd5d1e..5767cc8eee1 100644 --- a/python/cudf/cudf/_lib/reduce.pyx +++ b/python/cudf/cudf/_lib/reduce.pyx @@ -1,27 +1,14 @@ -# Copyright (c) 2020-2022, NVIDIA CORPORATION. - -from cython.operator import dereference +# Copyright (c) 2020-2024, NVIDIA CORPORATION. import cudf from cudf.core.buffer import acquire_spill_lock -from libcpp.memory cimport unique_ptr -from libcpp.utility cimport move, pair - -from cudf._lib.aggregation cimport ( - ReduceAggregation, - ScanAggregation, - make_reduce_aggregation, - make_scan_aggregation, -) +from cudf._lib.aggregation cimport make_aggregation from cudf._lib.column cimport Column -from cudf._lib.cpp.column.column cimport column -from cudf._lib.cpp.column.column_view cimport column_view -from cudf._lib.cpp.reduce cimport cpp_minmax, cpp_reduce, cpp_scan, scan_type -from cudf._lib.cpp.scalar.scalar cimport scalar -from cudf._lib.cpp.types cimport data_type from cudf._lib.scalar cimport DeviceScalar -from cudf._lib.types cimport dtype_to_data_type, is_decimal_type_id +from cudf._lib.types cimport dtype_to_pylibcudf_type, is_decimal_type_id + +from cudf._lib import pylibcudf @acquire_spill_lock() @@ -45,13 +32,6 @@ def reduce(reduction_op, Column incol, dtype=None, **kwargs): else incol._reduction_result_dtype(reduction_op) ) - cdef column_view c_incol_view = incol.view() - cdef unique_ptr[scalar] c_result - cdef ReduceAggregation cython_agg = make_reduce_aggregation( - reduction_op, kwargs) - - cdef data_type c_out_dtype = dtype_to_data_type(col_dtype) - # check empty case if len(incol) <= incol.null_count: if reduction_op == 'sum' or reduction_op == 'sum_of_squares': @@ -63,22 +43,20 @@ def reduce(reduction_op, Column incol, dtype=None, **kwargs): return cudf.utils.dtypes._get_nan_for_dtype(col_dtype) - with nogil: - c_result = move(cpp_reduce( - c_incol_view, - dereference(cython_agg.c_obj), - c_out_dtype - )) + result = pylibcudf.reduce.reduce( + incol.to_pylibcudf(mode="read"), + make_aggregation(reduction_op, kwargs).c_obj, + dtype_to_pylibcudf_type(col_dtype), + ) - if is_decimal_type_id(c_result.get()[0].type().id()): - scale = -c_result.get()[0].type().scale() + if is_decimal_type_id(result.type().id()): + scale = -result.type().scale() precision = _reduce_precision(col_dtype, reduction_op, len(incol)) - py_result = DeviceScalar.from_unique_ptr( - move(c_result), dtype=col_dtype.__class__(precision, scale) - ) - else: - py_result = DeviceScalar.from_unique_ptr(move(c_result)) - return py_result.value + return DeviceScalar.from_pylibcudf( + result, + dtype=col_dtype.__class__(precision, scale), + ).value + return DeviceScalar.from_pylibcudf(result).value @acquire_spill_lock() @@ -95,22 +73,14 @@ def scan(scan_op, Column incol, inclusive, **kwargs): inclusive: bool Flag for including nulls in relevant scan """ - cdef column_view c_incol_view = incol.view() - cdef unique_ptr[column] c_result - cdef ScanAggregation cython_agg = make_scan_aggregation(scan_op, kwargs) - - cdef scan_type c_inclusive = \ - scan_type.INCLUSIVE if inclusive else scan_type.EXCLUSIVE - - with nogil: - c_result = move(cpp_scan( - c_incol_view, - dereference(cython_agg.c_obj), - c_inclusive - )) - - py_result = Column.from_unique_ptr(move(c_result)) - return py_result + return Column.from_pylibcudf( + pylibcudf.reduce.scan( + incol.to_pylibcudf(mode="read"), + make_aggregation(scan_op, kwargs).c_obj, + pylibcudf.reduce.ScanType.INCLUSIVE if inclusive + else pylibcudf.reduce.ScanType.EXCLUSIVE, + ) + ) @acquire_spill_lock() @@ -127,18 +97,10 @@ def minmax(Column incol): ------- A pair of ``(min, max)`` values of ``incol`` """ - cdef column_view c_incol_view = incol.view() - cdef pair[unique_ptr[scalar], unique_ptr[scalar]] c_result - - with nogil: - c_result = move(cpp_minmax(c_incol_view)) - - py_result_min = DeviceScalar.from_unique_ptr(move(c_result.first)) - py_result_max = DeviceScalar.from_unique_ptr(move(c_result.second)) - + min, max = pylibcudf.reduce.minmax(incol.to_pylibcudf(mode="read")) return ( - cudf.Scalar.from_device_scalar(py_result_min), - cudf.Scalar.from_device_scalar(py_result_max) + cudf.Scalar.from_device_scalar(DeviceScalar.from_pylibcudf(min)), + cudf.Scalar.from_device_scalar(DeviceScalar.from_pylibcudf(max)), )