diff --git a/python/cudf/cudf/_lib/CMakeLists.txt b/python/cudf/cudf/_lib/CMakeLists.txt index a0457120935..8d3af5205fa 100644 --- a/python/cudf/cudf/_lib/CMakeLists.txt +++ b/python/cudf/cudf/_lib/CMakeLists.txt @@ -31,7 +31,6 @@ set(cython_sources reduce.pyx replace.pyx reshape.pyx - rolling.pyx round.pyx scalar.pyx sort.pyx diff --git a/python/cudf/cudf/_lib/__init__.py b/python/cudf/cudf/_lib/__init__.py index 942e32747bc..1e0bf931c97 100644 --- a/python/cudf/cudf/_lib/__init__.py +++ b/python/cudf/cudf/_lib/__init__.py @@ -19,7 +19,6 @@ reduce, replace, reshape, - rolling, round, sort, stream_compaction, diff --git a/python/cudf/cudf/_lib/rolling.pyx b/python/cudf/cudf/_lib/rolling.pyx deleted file mode 100644 index 687b261c2c7..00000000000 --- a/python/cudf/cudf/_lib/rolling.pyx +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright (c) 2020-2024, NVIDIA CORPORATION. - -from cudf.core.buffer import acquire_spill_lock - -from cudf._lib.column cimport Column - -import pylibcudf - -from cudf._lib.aggregation import make_aggregation - - -@acquire_spill_lock() -def rolling(Column source_column, - Column pre_column_window, - Column fwd_column_window, - window, - min_periods, - center, - op, - agg_params): - """ - Rolling on input executing operation within the given window for each row - - Parameters - ---------- - source_column : input column on which rolling operation is executed - pre_column_window : prior window for each element of source_column - fwd_column_window : forward window for each element of source_column - window : Size of the moving window, can be integer or None - min_periods : Minimum number of observations in window required to have - a value (otherwise result is null) - center : Set the labels at the center of the window - op : operation to be executed - agg_params : dict, parameter for the aggregation (e.g. ddof for VAR/STD) - - Returns - ------- - A Column with rolling calculations - """ - - if window is None: - if center: - # TODO: we can support this even though Pandas currently does not - raise NotImplementedError( - "center is not implemented for offset-based windows" - ) - pre = pre_column_window.to_pylibcudf(mode="read") - fwd = fwd_column_window.to_pylibcudf(mode="read") - else: - if center: - pre = (window // 2) + 1 - fwd = window - (pre) - else: - pre = window - fwd = 0 - - return Column.from_pylibcudf( - pylibcudf.rolling.rolling_window( - source_column.to_pylibcudf(mode="read"), - pre, - fwd, - min_periods, - make_aggregation( - op, {'dtype': source_column.dtype} if callable(op) else agg_params - ).c_obj, - ) - ) diff --git a/python/cudf/cudf/core/window/rolling.py b/python/cudf/cudf/core/window/rolling.py index 967edc2ab15..d2cb5e8c190 100644 --- a/python/cudf/cudf/core/window/rolling.py +++ b/python/cudf/cudf/core/window/rolling.py @@ -8,8 +8,11 @@ import pandas as pd from pandas.api.indexers import BaseIndexer +import pylibcudf as plc + import cudf from cudf import _lib as libcudf +from cudf._lib.aggregation import make_aggregation from cudf.api.types import is_integer, is_number from cudf.core.buffer import acquire_spill_lock from cudf.core.column.column import as_column @@ -284,16 +287,37 @@ def _apply_agg_column(self, source_column, agg_name): ) window = None - return libcudf.rolling.rolling( - source_column=source_column, - pre_column_window=preceding_window, - fwd_column_window=following_window, - window=window, - min_periods=min_periods, - center=self.center, - op=agg_name, - agg_params=self.agg_params, - ) + with acquire_spill_lock(): + if window is None: + if self.center: + # TODO: we can support this even though Pandas currently does not + raise NotImplementedError( + "center is not implemented for offset-based windows" + ) + pre = preceding_window.to_pylibcudf(mode="read") + fwd = following_window.to_pylibcudf(mode="read") + else: + if self.center: + pre = (window // 2) + 1 + fwd = window - (pre) + else: + pre = window + fwd = 0 + + return libcudf.column.Column.from_pylibcudf( + plc.rolling.rolling_window( + source_column.to_pylibcudf(mode="read"), + pre, + fwd, + min_periods, + make_aggregation( + agg_name, + {"dtype": source_column.dtype} + if callable(agg_name) + else self.agg_params, + ).c_obj, + ) + ) def _reduce( self,