From 5647b535b2a1546c56ddfd12e7cbd2fb198e64e8 Mon Sep 17 00:00:00 2001 From: GALI PREM SAGAR Date: Tue, 24 Aug 2021 17:52:20 -0500 Subject: [PATCH] Add support for BaseIndexer in Rolling APIs (#9085) Fixes: #9085 This PR adds support for `BaseIndexer` subclass support in `Rolling` APIs. This also contains a fix related to `fillna` - testcase added. Authors: - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - Ashwin Srinath (https://github.com/shwina) URL: https://github.com/rapidsai/cudf/pull/9085 --- python/cudf/cudf/core/column/numerical.py | 3 + python/cudf/cudf/core/window/rolling.py | 67 ++++++++++++++++------- python/cudf/cudf/tests/test_replace.py | 2 + python/cudf/cudf/tests/test_rolling.py | 48 ++++++++++++++++ 4 files changed, 99 insertions(+), 21 deletions(-) diff --git a/python/cudf/cudf/core/column/numerical.py b/python/cudf/cudf/core/column/numerical.py index bc12b42a3fa..db1829d5f38 100644 --- a/python/cudf/cudf/core/column/numerical.py +++ b/python/cudf/cudf/core/column/numerical.py @@ -375,6 +375,9 @@ def fillna( else: col = self + if col.null_count == 0: + return col + if method is not None: return super(NumericalColumn, col).fillna(fill_value, method) diff --git a/python/cudf/cudf/core/window/rolling.py b/python/cudf/cudf/core/window/rolling.py index e3ed15ba2a6..317ce29d00e 100644 --- a/python/cudf/cudf/core/window/rolling.py +++ b/python/cudf/cudf/core/window/rolling.py @@ -4,6 +4,7 @@ import numba import pandas as pd +from pandas.api.indexers import BaseIndexer import cudf from cudf import _lib as libcudf @@ -20,7 +21,7 @@ class Rolling(GetAttrGetItemMixin): Parameters ---------- - window : int or offset + window : int, offset or a BaseIndexer subclass Size of the window, i.e., the number of observations used to calculate the statistic. For datetime indexes, an offset can be provided instead @@ -28,6 +29,8 @@ class Rolling(GetAttrGetItemMixin): As opposed to a fixed window size, each window will be sized to accommodate observations within the time period specified by the offset. + If a BaseIndexer subclass is passed, calculates the window + boundaries based on the defined ``get_window_bounds`` method. min_periods : int, optional The minimum number of observations in the window that are required to be non-null, so that the result is non-null. @@ -195,26 +198,46 @@ def __getitem__(self, arg): ) def _apply_agg_series(self, sr, agg_name): + source_column = sr._column + min_periods = self.min_periods or 1 if isinstance(self.window, int): - result_col = libcudf.rolling.rolling( - sr._column, - None, - None, - self.window, - self.min_periods, - self.center, - agg_name, + preceding_window = None + following_window = None + window = self.window + elif isinstance(self.window, BaseIndexer): + start, end = self.window.get_window_bounds( + num_values=len(self.obj), + min_periods=self.min_periods, + center=self.center, + closed=None, ) + start = as_column(start, dtype="int32") + end = as_column(end, dtype="int32") + + idx = cudf.core.column.arange(len(start)) + preceding_window = (idx - start + cudf.Scalar(1, "int32")).astype( + "int32" + ) + following_window = (end - idx - cudf.Scalar(1, "int32")).astype( + "int32" + ) + window = None else: - result_col = libcudf.rolling.rolling( - sr._column, - as_column(self.window), - column.full(self.window.size, 0, dtype=self.window.dtype), - None, - self.min_periods, - self.center, - agg_name, + preceding_window = as_column(self.window) + following_window = column.full( + self.window.size, 0, dtype=self.window.dtype ) + window = None + + result_col = libcudf.rolling.rolling( + source_column=source_column, + pre_column_window=preceding_window, + fwd_column_window=following_window, + window=window, + min_periods=min_periods, + center=self.center, + op=agg_name, + ) return sr._from_data({sr.name: result_col}, sr._index) def _apply_agg_dataframe(self, df, agg_name): @@ -305,15 +328,17 @@ def _normalize(self): if self.min_periods is None: min_periods = window else: - if isinstance(window, numba.cuda.devicearray.DeviceNDArray): - # window is a device_array of window sizes + if isinstance( + window, (numba.cuda.devicearray.DeviceNDArray, BaseIndexer) + ): + # window is a device_array of window sizes or BaseIndexer self.window = window self.min_periods = min_periods return if not isinstance(self.obj.index, cudf.core.index.DatetimeIndex): raise ValueError( - "window must be an integer for " "non datetime index" + "window must be an integer for non datetime index" ) self._time_window = True @@ -326,7 +351,7 @@ def _normalize(self): window = window.to_timedelta64() except ValueError as e: raise ValueError( - "window must be integer or " "convertible to a timedelta" + "window must be integer or convertible to a timedelta" ) from e if self.min_periods is None: min_periods = 1 diff --git a/python/cudf/cudf/tests/test_replace.py b/python/cudf/cudf/tests/test_replace.py index 33bef2c677b..f60baec746f 100644 --- a/python/cudf/cudf/tests/test_replace.py +++ b/python/cudf/cudf/tests/test_replace.py @@ -657,6 +657,7 @@ def test_fillna_method_fixed_width_non_num(data, container, method, inplace): pd.DataFrame( {"a": [1, 2, None], "b": [None, None, 5]}, index=["a", "p", "z"] ), + pd.DataFrame({"a": [1, 2, 3]}), ], ) @pytest.mark.parametrize( @@ -671,6 +672,7 @@ def test_fillna_method_fixed_width_non_num(data, container, method, inplace): {"b": pd.Series([11, 22, 33], index=["a", "p", "z"])}, {"a": 5, "b": pd.Series([3, 4, 5], index=["a", "p", "z"])}, {"c": 100}, + np.nan, ], ) @pytest.mark.parametrize("inplace", [True, False]) diff --git a/python/cudf/cudf/tests/test_rolling.py b/python/cudf/cudf/tests/test_rolling.py index 07e7f43c992..8a8293cd090 100644 --- a/python/cudf/cudf/tests/test_rolling.py +++ b/python/cudf/cudf/tests/test_rolling.py @@ -369,3 +369,51 @@ def test_rolling_groupby_offset(agg, window_size): ) got = getattr(gdf.groupby("group").rolling(window_size), agg)().fillna(-1) assert_eq(expect, got, check_dtype=False) + + +def test_rolling_custom_index_support(): + from pandas.api.indexers import BaseIndexer + + class CustomIndexer(BaseIndexer): + def get_window_bounds(self, num_values, min_periods, center, closed): + start = np.empty(num_values, dtype=np.int64) + end = np.empty(num_values, dtype=np.int64) + + for i in range(num_values): + if self.use_expanding[i]: + start[i] = 0 + end[i] = i + 1 + else: + start[i] = i + end[i] = i + self.window_size + + return start, end + + use_expanding = [True, False, True, False, True] + indexer = CustomIndexer(window_size=1, use_expanding=use_expanding) + + df = pd.DataFrame({"values": range(5)}) + gdf = cudf.from_pandas(df) + + expected = df.rolling(window=indexer).sum() + actual = gdf.rolling(window=indexer).sum() + + assert_eq(expected, actual, check_dtype=False) + + +@pytest.mark.parametrize( + "indexer", + [ + pd.api.indexers.FixedForwardWindowIndexer(window_size=2), + pd.core.window.indexers.ExpandingIndexer(), + pd.core.window.indexers.FixedWindowIndexer(window_size=3), + ], +) +def test_rolling_indexer_support(indexer): + df = pd.DataFrame({"B": [0, 1, 2, np.nan, 4]}) + gdf = cudf.from_pandas(df) + + expected = df.rolling(window=indexer, min_periods=2).sum() + actual = gdf.rolling(window=indexer, min_periods=2).sum() + + assert_eq(expected, actual)