Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Add support for BaseIndexer in Rolling APIs #9085

Merged
merged 9 commits into from
Aug 24, 2021
3 changes: 3 additions & 0 deletions python/cudf/cudf/core/column/numerical.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,9 @@ def fillna(
else:
col = self

if col.null_count == 0:
return col

if method is not None:
return super(NumericalColumn, col).fillna(fill_value, method)

Expand Down
67 changes: 46 additions & 21 deletions python/cudf/cudf/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import numba
import pandas as pd
from pandas.api.indexers import BaseIndexer
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved

import cudf
from cudf import _lib as libcudf
Expand All @@ -20,14 +21,16 @@ class Rolling(GetAttrGetItemMixin):

Parameters
----------
window : int or offset
window : int, offset or a BaseIndexer subclass
Size of the window, i.e., the number of observations used
to calculate the statistic.
For datetime indexes, an offset can be provided instead
of an int. The offset must be convertible to a timedelta.
As opposed to a fixed window size, each window will be
sized to accommodate observations within the time period
specified by the offset.
If a BaseIndexer subclass is passed, calculates the window
boundaries based on the defined ``get_window_bounds`` method.
min_periods : int, optional
The minimum number of observations in the window that are
required to be non-null, so that the result is non-null.
Expand Down Expand Up @@ -195,26 +198,46 @@ def __getitem__(self, arg):
)

def _apply_agg_series(self, sr, agg_name):
source_column = sr._column
min_periods = self.min_periods or 1
if isinstance(self.window, int):
result_col = libcudf.rolling.rolling(
sr._column,
None,
None,
self.window,
self.min_periods,
self.center,
agg_name,
preceding_window = None
following_window = None
window = self.window
elif isinstance(self.window, BaseIndexer):
start, end = self.window.get_window_bounds(
num_values=len(self.obj),
min_periods=self.min_periods,
center=self.center,
closed=None,
)
start = as_column(start, dtype="int32")
end = as_column(end, dtype="int32")
Comment on lines +214 to +215
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this does a HtoD copy of a potentially large array. Thus my comment https://github.com/rapidsai/cudf/pull/9085/files#r695093462


idx = cudf.core.column.arange(len(start))
preceding_window = (idx - start + cudf.Scalar(1, "int32")).astype(
"int32"
)
following_window = (end - idx - cudf.Scalar(1, "int32")).astype(
"int32"
)
window = None
else:
result_col = libcudf.rolling.rolling(
sr._column,
as_column(self.window),
column.full(self.window.size, 0, dtype=self.window.dtype),
None,
self.min_periods,
self.center,
agg_name,
preceding_window = as_column(self.window)
following_window = column.full(
self.window.size, 0, dtype=self.window.dtype
)
window = None

result_col = libcudf.rolling.rolling(
source_column=source_column,
pre_column_window=preceding_window,
fwd_column_window=following_window,
window=window,
min_periods=min_periods,
center=self.center,
op=agg_name,
)
return sr._from_data({sr.name: result_col}, sr._index)

def _apply_agg_dataframe(self, df, agg_name):
Expand Down Expand Up @@ -305,15 +328,17 @@ def _normalize(self):
if self.min_periods is None:
min_periods = window
else:
if isinstance(window, numba.cuda.devicearray.DeviceNDArray):
# window is a device_array of window sizes
if isinstance(
window, (numba.cuda.devicearray.DeviceNDArray, BaseIndexer)
):
# window is a device_array of window sizes or BaseIndexer
self.window = window
self.min_periods = min_periods
return

if not isinstance(self.obj.index, cudf.core.index.DatetimeIndex):
raise ValueError(
"window must be an integer for " "non datetime index"
"window must be an integer for non datetime index"
)

self._time_window = True
Expand All @@ -326,7 +351,7 @@ def _normalize(self):
window = window.to_timedelta64()
except ValueError as e:
raise ValueError(
"window must be integer or " "convertible to a timedelta"
"window must be integer or convertible to a timedelta"
) from e
if self.min_periods is None:
min_periods = 1
Expand Down
2 changes: 2 additions & 0 deletions python/cudf/cudf/tests/test_replace.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,7 @@ def test_fillna_method_fixed_width_non_num(data, container, method, inplace):
pd.DataFrame(
{"a": [1, 2, None], "b": [None, None, 5]}, index=["a", "p", "z"]
),
pd.DataFrame({"a": [1, 2, 3]}),
],
)
@pytest.mark.parametrize(
Expand All @@ -671,6 +672,7 @@ def test_fillna_method_fixed_width_non_num(data, container, method, inplace):
{"b": pd.Series([11, 22, 33], index=["a", "p", "z"])},
{"a": 5, "b": pd.Series([3, 4, 5], index=["a", "p", "z"])},
{"c": 100},
np.nan,
],
)
@pytest.mark.parametrize("inplace", [True, False])
Expand Down
48 changes: 48 additions & 0 deletions python/cudf/cudf/tests/test_rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,3 +369,51 @@ def test_rolling_groupby_offset(agg, window_size):
)
got = getattr(gdf.groupby("group").rolling(window_size), agg)().fillna(-1)
assert_eq(expect, got, check_dtype=False)


def test_rolling_custom_index_support():
from pandas.api.indexers import BaseIndexer

class CustomIndexer(BaseIndexer):
def get_window_bounds(self, num_values, min_periods, center, closed):
start = np.empty(num_values, dtype=np.int64)
end = np.empty(num_values, dtype=np.int64)

for i in range(num_values):
if self.use_expanding[i]:
start[i] = 0
end[i] = i + 1
else:
start[i] = i
end[i] = i + self.window_size

return start, end

use_expanding = [True, False, True, False, True]
indexer = CustomIndexer(window_size=1, use_expanding=use_expanding)

df = pd.DataFrame({"values": range(5)})
gdf = cudf.from_pandas(df)

expected = df.rolling(window=indexer).sum()
actual = gdf.rolling(window=indexer).sum()

assert_eq(expected, actual, check_dtype=False)


@pytest.mark.parametrize(
"indexer",
[
pd.api.indexers.FixedForwardWindowIndexer(window_size=2),
pd.core.window.indexers.ExpandingIndexer(),
pd.core.window.indexers.FixedWindowIndexer(window_size=3),
],
)
def test_rolling_indexer_support(indexer):
df = pd.DataFrame({"B": [0, 1, 2, np.nan, 4]})
gdf = cudf.from_pandas(df)

expected = df.rolling(window=indexer, min_periods=2).sum()
actual = gdf.rolling(window=indexer, min_periods=2).sum()

assert_eq(expected, actual)