Skip to content

Commit

Permalink
BUG: Fixes to FixedForwardWindowIndexer and GroupbyIndexer (pandas-de…
Browse files Browse the repository at this point in the history
  • Loading branch information
dsm054 committed Aug 29, 2021
1 parent beb7c48 commit 11bf344
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 27 deletions.
2 changes: 1 addition & 1 deletion doc/source/whatsnew/v1.4.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ Groupby/resample/rolling
- Bug in :meth:`pandas.DataFrame.ewm`, where non-float64 dtypes were silently failing (:issue:`42452`)
- Bug in :meth:`pandas.DataFrame.rolling` operation along rows (``axis=1``) incorrectly omits columns containing ``float16`` and ``float32`` (:issue:`41779`)
- Bug in :meth:`Resampler.aggregate` did not allow the use of Named Aggregation (:issue:`32803`)
-
- Bug in :meth:`DataFrame.groupby.rolling` and :class:`FixedForwardWindowIndexer` leading to segfaults and incorrect windows (:issue:`43267`)

Reshaping
^^^^^^^^^
Expand Down
37 changes: 18 additions & 19 deletions pandas/core/indexers/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def get_window_bounds(

end = np.clip(end, 0, num_values)
start = np.clip(start, 0, num_values)

return start, end


Expand Down Expand Up @@ -266,9 +265,8 @@ def get_window_bounds(
)

start = np.arange(num_values, dtype="int64")
end_s = start[: -self.window_size] + self.window_size
end_e = np.full(self.window_size, num_values, dtype="int64")
end = np.concatenate([end_s, end_e])
end = start + self.window_size
end[end > num_values] = num_values

return start, end

Expand All @@ -280,7 +278,7 @@ def __init__(
self,
index_array: np.ndarray | None = None,
window_size: int = 0,
groupby_indicies: dict | None = None,
groupby_indices: dict | None = None,
window_indexer: type[BaseIndexer] = BaseIndexer,
indexer_kwargs: dict | None = None,
**kwargs,
Expand All @@ -294,7 +292,7 @@ def __init__(
the groups
window_size : int
window size during the windowing operation
groupby_indicies : dict or None
groupby_indices : dict or None
dict of {group label: [positional index of rows belonging to the group]}
window_indexer : BaseIndexer
BaseIndexer class determining the start and end bounds of each group
Expand All @@ -303,11 +301,12 @@ def __init__(
**kwargs :
keyword arguments that will be available when get_window_bounds is called
"""
self.groupby_indicies = groupby_indicies or {}
self.groupby_indices = groupby_indices or {}
self.window_indexer = window_indexer
self.indexer_kwargs = indexer_kwargs or {}
self.indexer_kwargs = indexer_kwargs.copy() if indexer_kwargs else {}
super().__init__(
index_array, self.indexer_kwargs.pop("window_size", window_size), **kwargs
index_array=index_array,
window_size=self.indexer_kwargs.pop("window_size", window_size), **kwargs
)

@Appender(get_window_bounds_doc)
Expand All @@ -323,8 +322,8 @@ def get_window_bounds(
# 3) Append the window bounds in group order
start_arrays = []
end_arrays = []
window_indicies_start = 0
for key, indices in self.groupby_indicies.items():
window_indices_start = 0
for key, indices in self.groupby_indices.items():
index_array: np.ndarray | None

if self.index_array is not None:
Expand All @@ -341,18 +340,18 @@ def get_window_bounds(
)
start = start.astype(np.int64)
end = end.astype(np.int64)
# Cannot use groupby_indicies as they might not be monotonic with the object
# Cannot use groupby_indices as they might not be monotonic with the object
# we're rolling over
window_indicies = np.arange(
window_indicies_start, window_indicies_start + len(indices)
window_indices = np.arange(
window_indices_start, window_indices_start + len(indices)
)
window_indicies_start += len(indices)
window_indices_start += len(indices)
# Extend as we'll be slicing window like [start, end)
window_indicies = np.append(
window_indicies, [window_indicies[-1] + 1]
window_indices = np.append(
window_indices, [window_indices[-1] + 1]
).astype(np.int64)
start_arrays.append(window_indicies.take(ensure_platform_int(start)))
end_arrays.append(window_indicies.take(ensure_platform_int(end)))
start_arrays.append(window_indices.take(ensure_platform_int(start)))
end_arrays.append(window_indices.take(ensure_platform_int(end)))
start = np.concatenate(start_arrays)
end = np.concatenate(end_arrays)
return start, end
Expand Down
2 changes: 1 addition & 1 deletion pandas/core/window/ewm.py
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ def _get_window_indexer(self) -> GroupbyIndexer:
GroupbyIndexer
"""
window_indexer = GroupbyIndexer(
groupby_indicies=self._grouper.indices,
groupby_indices=self._grouper.indices,
window_indexer=ExponentialMovingWindowIndexer,
)
return window_indexer
Expand Down
2 changes: 1 addition & 1 deletion pandas/core/window/expanding.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ def _get_window_indexer(self) -> GroupbyIndexer:
GroupbyIndexer
"""
window_indexer = GroupbyIndexer(
groupby_indicies=self._grouper.indices,
groupby_indices=self._grouper.indices,
window_indexer=ExpandingIndexer,
)
return window_indexer
24 changes: 21 additions & 3 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from pandas.core.indexers.objects import (
BaseIndexer,
FixedWindowIndexer,
FixedForwardWindowIndexer,
GroupbyIndexer,
VariableWindowIndexer,
)
Expand Down Expand Up @@ -299,6 +300,7 @@ def __iter__(self):
center=self.center,
closed=self.closed,
)

# From get_window_bounds, those two should be equal in length of array
assert len(start) == len(end)

Expand Down Expand Up @@ -529,6 +531,7 @@ def _apply(
y : type of input
"""
window_indexer = self._get_window_indexer()

min_periods = (
self.min_periods
if self.min_periods is not None
Expand All @@ -542,12 +545,16 @@ def homogeneous_func(values: np.ndarray):
return values.copy()

def calc(x):

start, end = window_indexer.get_window_bounds(
num_values=len(x),
min_periods=min_periods,
center=self.center,
closed=self.closed,
)

# From get_window_bounds, those two should be equal in length of array
assert len(start) == len(end)
return func(x, start, end, min_periods, *numba_args)

with np.errstate(all="ignore"):
Expand Down Expand Up @@ -1429,6 +1436,10 @@ def cov_func(x, y):
center=self.center,
closed=self.closed,
)

# From get_window_bounds, those two should be equal in length of array
assert len(start) == len(end)

with np.errstate(all="ignore"):
mean_x_y = window_aggregations.roll_mean(
x_array * y_array, start, end, min_periods
Expand Down Expand Up @@ -1468,6 +1479,10 @@ def corr_func(x, y):
center=self.center,
closed=self.closed,
)

# From get_window_bounds, those two should be equal in length of array
assert len(start) == len(end)

with np.errstate(all="ignore"):
mean_x_y = window_aggregations.roll_mean(
x_array * y_array, start, end, min_periods
Expand Down Expand Up @@ -2341,26 +2356,29 @@ def _get_window_indexer(self) -> GroupbyIndexer:
rolling_indexer: type[BaseIndexer]
indexer_kwargs: dict[str, Any] | None = None
index_array = self._index_array

if isinstance(self.window, BaseIndexer):
rolling_indexer = type(self.window)
indexer_kwargs = self.window.__dict__
indexer_kwargs = self.window.__dict__.copy()
assert isinstance(indexer_kwargs, dict) # for mypy
# We'll be using the index of each group later
indexer_kwargs.pop("index_array", None)
window = 0
window = self.window
elif self._win_freq_i8 is not None:
rolling_indexer = VariableWindowIndexer
window = self._win_freq_i8
else:
rolling_indexer = FixedWindowIndexer
window = self.window

window_indexer = GroupbyIndexer(
index_array=index_array,
window_size=window,
groupby_indicies=self._grouper.indices,
groupby_indices=self._grouper.indices,
window_indexer=rolling_indexer,
indexer_kwargs=indexer_kwargs,
)

return window_indexer

def _validate_monotonic(self):
Expand Down
2 changes: 1 addition & 1 deletion pandas/tests/groupby/test_missing.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def test_min_count(func, min_count, value):
tm.assert_frame_equal(result, expected)


def test_indicies_with_missing():
def test_indices_with_missing():
# GH 9304
df = DataFrame({"a": [1, 1, np.nan], "b": [2, 3, 4], "c": [5, 6, 7]})
g = df.groupby(["a", "b"])
Expand Down
41 changes: 41 additions & 0 deletions pandas/tests/window/test_base_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)
from pandas.core.indexers.objects import (
ExpandingIndexer,
FixedWindowIndexer,
VariableOffsetWindowIndexer,
)

Expand Down Expand Up @@ -293,3 +294,43 @@ def get_window_bounds(self, num_values, min_periods, center, closed):
result = getattr(df.rolling(indexer), func)(*args)
expected = DataFrame({"values": values})
tm.assert_frame_equal(result, expected)


@pytest.mark.parametrize('indexer_class', [FixedWindowIndexer,
FixedForwardWindowIndexer,
ExpandingIndexer])
@pytest.mark.parametrize('window_size', [1, 2, 12])
@pytest.mark.parametrize("df",
[DataFrame({"a": [1, 1], "b": [0, 1]}),
DataFrame({"a": [1, 2], "b": [0, 1]}),
DataFrame({'b': [np.nan, 1, 2, np.nan] + list(range(4, 16))}).assign(a=1)
])
def test_indexers_are_reusable_after_groupby_rolling(indexer_class, window_size, df):
# GH 43267
num_trials = 3
indexer = indexer_class(window_size=window_size)
original_window_size = indexer.window_size
for i in range(num_trials):
df.groupby("a")["b"].rolling(window=indexer, min_periods=1).mean()
assert indexer.window_size == original_window_size


@pytest.mark.parametrize('window_size, num_values, expected_start, expected_end', [
(1, 1, [0], [1]),
(1, 2, [0, 1], [1, 2]),
(2, 1, [0], [1]),
(2, 2, [0, 1], [2, 2]),
(5, 12, range(12), list(range(5, 12)) + [12] * 5),
(12, 5, range(5), [5] * 5),
(0, 0, np.array([], dtype='int64'), np.array([], dtype='int64')),
(1, 0, np.array([], dtype='int64'), np.array([], dtype='int64')),
(0, 1, [0], [0]),
])
def test_fixed_forward_indexer_bounds(window_size, num_values, expected_start, expected_end):
# GH 43267
indexer = FixedForwardWindowIndexer(window_size=window_size)
start, end = indexer.get_window_bounds(num_values=num_values)

tm.assert_equal(start, np.array(expected_start))
tm.assert_equal(end, np.array(expected_end))
assert len(start) == len(end)
50 changes: 49 additions & 1 deletion pandas/tests/window/test_rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
Series,
Timedelta,
Timestamp,
concat,
date_range,
period_range,
to_datetime,
to_timedelta,
)
import pandas._testing as tm
from pandas.api.indexers import BaseIndexer
from pandas.api.indexers import BaseIndexer, FixedForwardWindowIndexer
from pandas.core.window import Rolling


Expand Down Expand Up @@ -1500,3 +1501,50 @@ def test_rolling_numeric_dtypes():
dtype="float64",
)
tm.assert_frame_equal(result, expected)


@pytest.mark.parametrize('df, window_size, expected', [
(DataFrame({"b": [0, 1, 2], "a": [1, 2, 2]}),
2,
Series([0, 1.5, 2.0],
index=MultiIndex.from_arrays([[1, 2, 2], range(3)], names=['a', None]),
name="b", dtype=np.float64)),
(DataFrame({"b": [np.nan, 1, 2, np.nan] + list(range(4, 18)),
"a": [1] * 7 + [2] * 11,
"c": range(18)}),
12,
Series([3.6, 3.6, 4.25, 5., 5., 5.5, 6., 12., 12.5,
13., 13.5, 14., 14.5, 15., 15.5, 16., 16.5, 17.],
index=MultiIndex.from_arrays([[1] * 7 + [2] * 11, range(18)], names=["a", None]),
name="b",
dtype=np.float64))])
def test_rolling_groupby_with_fixed_forward_specific(df, window_size, expected):
# GH 43267
indexer = FixedForwardWindowIndexer(window_size=window_size)
result = df.groupby("a")["b"].rolling(window=indexer, min_periods=1).mean()
tm.assert_series_equal(result, expected)


@pytest.mark.parametrize('group_keys', [
(1,), (1, 2), (2, 1),
(1, 1, 2), (1, 2, 1), (1, 1, 2, 2), (1, 2, 3, 2, 3),
(1, 1, 2) * 4,
(1, 2, 3) * 5,
])
@pytest.mark.parametrize('window_size', [1, 2, 3, 4, 5, 8, 20])
def test_rolling_groupby_with_fixed_forward_many(group_keys, window_size):
# GH 43267
df = DataFrame(dict(a=np.array(list(group_keys)),
b=np.arange(len(group_keys), dtype=np.float64) + 17,
c=np.arange(len(group_keys), dtype=np.int64)))

indexer = FixedForwardWindowIndexer(window_size=window_size)
result = df.groupby("a")["b"].rolling(window=indexer, min_periods=1).sum()
result.index.names = ['a', 'c']

groups = df.groupby("a")[["a", "b"]]
manual = concat([g.assign(b=[g['b'].iloc[i:i+window_size].sum(min_count=1)
for i in range(len(g))]) for _, g in groups])
manual = manual.set_index(["a","c"])["b"]

tm.assert_series_equal(result, manual)

0 comments on commit 11bf344

Please sign in to comment.