Skip to content

Commit

Permalink
Add support for BaseIndexer in Rolling APIs (#9085)
Browse files Browse the repository at this point in the history
Fixes: #9085 

This PR adds support for `BaseIndexer` subclass support in `Rolling` APIs. This also contains a fix related to `fillna` - testcase added.

Authors:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Ashwin Srinath (https://github.com/shwina)

URL: #9085
  • Loading branch information
galipremsagar authored Aug 24, 2021
1 parent 8075199 commit 5647b53
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 21 deletions.
3 changes: 3 additions & 0 deletions python/cudf/cudf/core/column/numerical.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,9 @@ def fillna(
else:
col = self

if col.null_count == 0:
return col

if method is not None:
return super(NumericalColumn, col).fillna(fill_value, method)

Expand Down
67 changes: 46 additions & 21 deletions python/cudf/cudf/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import numba
import pandas as pd
from pandas.api.indexers import BaseIndexer

import cudf
from cudf import _lib as libcudf
Expand All @@ -20,14 +21,16 @@ class Rolling(GetAttrGetItemMixin):
Parameters
----------
window : int or offset
window : int, offset or a BaseIndexer subclass
Size of the window, i.e., the number of observations used
to calculate the statistic.
For datetime indexes, an offset can be provided instead
of an int. The offset must be convertible to a timedelta.
As opposed to a fixed window size, each window will be
sized to accommodate observations within the time period
specified by the offset.
If a BaseIndexer subclass is passed, calculates the window
boundaries based on the defined ``get_window_bounds`` method.
min_periods : int, optional
The minimum number of observations in the window that are
required to be non-null, so that the result is non-null.
Expand Down Expand Up @@ -195,26 +198,46 @@ def __getitem__(self, arg):
)

def _apply_agg_series(self, sr, agg_name):
source_column = sr._column
min_periods = self.min_periods or 1
if isinstance(self.window, int):
result_col = libcudf.rolling.rolling(
sr._column,
None,
None,
self.window,
self.min_periods,
self.center,
agg_name,
preceding_window = None
following_window = None
window = self.window
elif isinstance(self.window, BaseIndexer):
start, end = self.window.get_window_bounds(
num_values=len(self.obj),
min_periods=self.min_periods,
center=self.center,
closed=None,
)
start = as_column(start, dtype="int32")
end = as_column(end, dtype="int32")

idx = cudf.core.column.arange(len(start))
preceding_window = (idx - start + cudf.Scalar(1, "int32")).astype(
"int32"
)
following_window = (end - idx - cudf.Scalar(1, "int32")).astype(
"int32"
)
window = None
else:
result_col = libcudf.rolling.rolling(
sr._column,
as_column(self.window),
column.full(self.window.size, 0, dtype=self.window.dtype),
None,
self.min_periods,
self.center,
agg_name,
preceding_window = as_column(self.window)
following_window = column.full(
self.window.size, 0, dtype=self.window.dtype
)
window = None

result_col = libcudf.rolling.rolling(
source_column=source_column,
pre_column_window=preceding_window,
fwd_column_window=following_window,
window=window,
min_periods=min_periods,
center=self.center,
op=agg_name,
)
return sr._from_data({sr.name: result_col}, sr._index)

def _apply_agg_dataframe(self, df, agg_name):
Expand Down Expand Up @@ -305,15 +328,17 @@ def _normalize(self):
if self.min_periods is None:
min_periods = window
else:
if isinstance(window, numba.cuda.devicearray.DeviceNDArray):
# window is a device_array of window sizes
if isinstance(
window, (numba.cuda.devicearray.DeviceNDArray, BaseIndexer)
):
# window is a device_array of window sizes or BaseIndexer
self.window = window
self.min_periods = min_periods
return

if not isinstance(self.obj.index, cudf.core.index.DatetimeIndex):
raise ValueError(
"window must be an integer for " "non datetime index"
"window must be an integer for non datetime index"
)

self._time_window = True
Expand All @@ -326,7 +351,7 @@ def _normalize(self):
window = window.to_timedelta64()
except ValueError as e:
raise ValueError(
"window must be integer or " "convertible to a timedelta"
"window must be integer or convertible to a timedelta"
) from e
if self.min_periods is None:
min_periods = 1
Expand Down
2 changes: 2 additions & 0 deletions python/cudf/cudf/tests/test_replace.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,7 @@ def test_fillna_method_fixed_width_non_num(data, container, method, inplace):
pd.DataFrame(
{"a": [1, 2, None], "b": [None, None, 5]}, index=["a", "p", "z"]
),
pd.DataFrame({"a": [1, 2, 3]}),
],
)
@pytest.mark.parametrize(
Expand All @@ -671,6 +672,7 @@ def test_fillna_method_fixed_width_non_num(data, container, method, inplace):
{"b": pd.Series([11, 22, 33], index=["a", "p", "z"])},
{"a": 5, "b": pd.Series([3, 4, 5], index=["a", "p", "z"])},
{"c": 100},
np.nan,
],
)
@pytest.mark.parametrize("inplace", [True, False])
Expand Down
48 changes: 48 additions & 0 deletions python/cudf/cudf/tests/test_rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,3 +369,51 @@ def test_rolling_groupby_offset(agg, window_size):
)
got = getattr(gdf.groupby("group").rolling(window_size), agg)().fillna(-1)
assert_eq(expect, got, check_dtype=False)


def test_rolling_custom_index_support():
from pandas.api.indexers import BaseIndexer

class CustomIndexer(BaseIndexer):
def get_window_bounds(self, num_values, min_periods, center, closed):
start = np.empty(num_values, dtype=np.int64)
end = np.empty(num_values, dtype=np.int64)

for i in range(num_values):
if self.use_expanding[i]:
start[i] = 0
end[i] = i + 1
else:
start[i] = i
end[i] = i + self.window_size

return start, end

use_expanding = [True, False, True, False, True]
indexer = CustomIndexer(window_size=1, use_expanding=use_expanding)

df = pd.DataFrame({"values": range(5)})
gdf = cudf.from_pandas(df)

expected = df.rolling(window=indexer).sum()
actual = gdf.rolling(window=indexer).sum()

assert_eq(expected, actual, check_dtype=False)


@pytest.mark.parametrize(
"indexer",
[
pd.api.indexers.FixedForwardWindowIndexer(window_size=2),
pd.core.window.indexers.ExpandingIndexer(),
pd.core.window.indexers.FixedWindowIndexer(window_size=3),
],
)
def test_rolling_indexer_support(indexer):
df = pd.DataFrame({"B": [0, 1, 2, np.nan, 4]})
gdf = cudf.from_pandas(df)

expected = df.rolling(window=indexer, min_periods=2).sum()
actual = gdf.rolling(window=indexer, min_periods=2).sum()

assert_eq(expected, actual)

0 comments on commit 5647b53

Please sign in to comment.