Skip to content

Commit

Permalink
BUG: Fixes to FixedForwardWindowIndexer and GroupbyIndexer (pandas-de…
Browse files Browse the repository at this point in the history
  • Loading branch information
dsm054 committed Aug 30, 2021
1 parent beb7c48 commit 2755500
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 28 deletions.
2 changes: 1 addition & 1 deletion doc/source/whatsnew/v1.4.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ Groupby/resample/rolling
- Bug in :meth:`pandas.DataFrame.ewm`, where non-float64 dtypes were silently failing (:issue:`42452`)
- Bug in :meth:`pandas.DataFrame.rolling` operation along rows (``axis=1``) incorrectly omits columns containing ``float16`` and ``float32`` (:issue:`41779`)
- Bug in :meth:`Resampler.aggregate` did not allow the use of Named Aggregation (:issue:`32803`)
-
- Bug in :class:`GroupbyIndexer` and :class:`FixedForwardWindowIndexer` leading to segfaults and incorrect windows (:issue:`43267`)

Reshaping
^^^^^^^^^
Expand Down
40 changes: 20 additions & 20 deletions pandas/core/indexers/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def get_window_bounds(

end = np.clip(end, 0, num_values)
start = np.clip(start, 0, num_values)

return start, end


Expand Down Expand Up @@ -266,9 +265,8 @@ def get_window_bounds(
)

start = np.arange(num_values, dtype="int64")
end_s = start[: -self.window_size] + self.window_size
end_e = np.full(self.window_size, num_values, dtype="int64")
end = np.concatenate([end_s, end_e])
end = start + self.window_size
end[end > num_values] = num_values

return start, end

Expand All @@ -280,7 +278,7 @@ def __init__(
self,
index_array: np.ndarray | None = None,
window_size: int = 0,
groupby_indicies: dict | None = None,
groupby_indices: dict | None = None,
window_indexer: type[BaseIndexer] = BaseIndexer,
indexer_kwargs: dict | None = None,
**kwargs,
Expand All @@ -294,7 +292,7 @@ def __init__(
the groups
window_size : int
window size during the windowing operation
groupby_indicies : dict or None
groupby_indices : dict or None
dict of {group label: [positional index of rows belonging to the group]}
window_indexer : BaseIndexer
BaseIndexer class determining the start and end bounds of each group
Expand All @@ -303,11 +301,13 @@ def __init__(
**kwargs :
keyword arguments that will be available when get_window_bounds is called
"""
self.groupby_indicies = groupby_indicies or {}
self.groupby_indices = groupby_indices or {}
self.window_indexer = window_indexer
self.indexer_kwargs = indexer_kwargs or {}
self.indexer_kwargs = indexer_kwargs.copy() if indexer_kwargs else {}
super().__init__(
index_array, self.indexer_kwargs.pop("window_size", window_size), **kwargs
index_array=index_array,
window_size=self.indexer_kwargs.pop("window_size", window_size),
**kwargs,
)

@Appender(get_window_bounds_doc)
Expand All @@ -323,8 +323,8 @@ def get_window_bounds(
# 3) Append the window bounds in group order
start_arrays = []
end_arrays = []
window_indicies_start = 0
for key, indices in self.groupby_indicies.items():
window_indices_start = 0
for key, indices in self.groupby_indices.items():
index_array: np.ndarray | None

if self.index_array is not None:
Expand All @@ -341,18 +341,18 @@ def get_window_bounds(
)
start = start.astype(np.int64)
end = end.astype(np.int64)
# Cannot use groupby_indicies as they might not be monotonic with the object
# Cannot use groupby_indices as they might not be monotonic with the object
# we're rolling over
window_indicies = np.arange(
window_indicies_start, window_indicies_start + len(indices)
window_indices = np.arange(
window_indices_start, window_indices_start + len(indices)
)
window_indicies_start += len(indices)
window_indices_start += len(indices)
# Extend as we'll be slicing window like [start, end)
window_indicies = np.append(
window_indicies, [window_indicies[-1] + 1]
).astype(np.int64)
start_arrays.append(window_indicies.take(ensure_platform_int(start)))
end_arrays.append(window_indicies.take(ensure_platform_int(end)))
window_indices = np.append(window_indices, [window_indices[-1] + 1]).astype(
np.int64
)
start_arrays.append(window_indices.take(ensure_platform_int(start)))
end_arrays.append(window_indices.take(ensure_platform_int(end)))
start = np.concatenate(start_arrays)
end = np.concatenate(end_arrays)
return start, end
Expand Down
2 changes: 1 addition & 1 deletion pandas/core/window/ewm.py
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ def _get_window_indexer(self) -> GroupbyIndexer:
GroupbyIndexer
"""
window_indexer = GroupbyIndexer(
groupby_indicies=self._grouper.indices,
groupby_indices=self._grouper.indices,
window_indexer=ExponentialMovingWindowIndexer,
)
return window_indexer
Expand Down
2 changes: 1 addition & 1 deletion pandas/core/window/expanding.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ def _get_window_indexer(self) -> GroupbyIndexer:
GroupbyIndexer
"""
window_indexer = GroupbyIndexer(
groupby_indicies=self._grouper.indices,
groupby_indices=self._grouper.indices,
window_indexer=ExpandingIndexer,
)
return window_indexer
23 changes: 20 additions & 3 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ def __iter__(self):
center=self.center,
closed=self.closed,
)

# From get_window_bounds, those two should be equal in length of array
assert len(start) == len(end)

Expand Down Expand Up @@ -529,6 +530,7 @@ def _apply(
y : type of input
"""
window_indexer = self._get_window_indexer()

min_periods = (
self.min_periods
if self.min_periods is not None
Expand All @@ -542,12 +544,16 @@ def homogeneous_func(values: np.ndarray):
return values.copy()

def calc(x):

start, end = window_indexer.get_window_bounds(
num_values=len(x),
min_periods=min_periods,
center=self.center,
closed=self.closed,
)

# From get_window_bounds, those two should be equal in length of array
assert len(start) == len(end)
return func(x, start, end, min_periods, *numba_args)

with np.errstate(all="ignore"):
Expand Down Expand Up @@ -1429,6 +1435,10 @@ def cov_func(x, y):
center=self.center,
closed=self.closed,
)

# From get_window_bounds, those two should be equal in length of array
assert len(start) == len(end)

with np.errstate(all="ignore"):
mean_x_y = window_aggregations.roll_mean(
x_array * y_array, start, end, min_periods
Expand Down Expand Up @@ -1468,6 +1478,10 @@ def corr_func(x, y):
center=self.center,
closed=self.closed,
)

# From get_window_bounds, those two should be equal in length of array
assert len(start) == len(end)

with np.errstate(all="ignore"):
mean_x_y = window_aggregations.roll_mean(
x_array * y_array, start, end, min_periods
Expand Down Expand Up @@ -2341,26 +2355,29 @@ def _get_window_indexer(self) -> GroupbyIndexer:
rolling_indexer: type[BaseIndexer]
indexer_kwargs: dict[str, Any] | None = None
index_array = self._index_array

if isinstance(self.window, BaseIndexer):
rolling_indexer = type(self.window)
indexer_kwargs = self.window.__dict__
indexer_kwargs = self.window.__dict__.copy()
assert isinstance(indexer_kwargs, dict) # for mypy
# We'll be using the index of each group later
indexer_kwargs.pop("index_array", None)
window = 0
window = self.window
elif self._win_freq_i8 is not None:
rolling_indexer = VariableWindowIndexer
window = self._win_freq_i8
else:
rolling_indexer = FixedWindowIndexer
window = self.window

window_indexer = GroupbyIndexer(
index_array=index_array,
window_size=window,
groupby_indicies=self._grouper.indices,
groupby_indices=self._grouper.indices,
window_indexer=rolling_indexer,
indexer_kwargs=indexer_kwargs,
)

return window_indexer

def _validate_monotonic(self):
Expand Down
2 changes: 1 addition & 1 deletion pandas/tests/groupby/test_missing.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def test_min_count(func, min_count, value):
tm.assert_frame_equal(result, expected)


def test_indicies_with_missing():
def test_indices_with_missing():
# GH 9304
df = DataFrame({"a": [1, 1, np.nan], "b": [2, 3, 4], "c": [5, 6, 7]})
g = df.groupby(["a", "b"])
Expand Down
49 changes: 49 additions & 0 deletions pandas/tests/window/test_base_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)
from pandas.core.indexers.objects import (
ExpandingIndexer,
FixedWindowIndexer,
VariableOffsetWindowIndexer,
)

Expand Down Expand Up @@ -293,3 +294,51 @@ def get_window_bounds(self, num_values, min_periods, center, closed):
result = getattr(df.rolling(indexer), func)(*args)
expected = DataFrame({"values": values})
tm.assert_frame_equal(result, expected)


@pytest.mark.parametrize(
"indexer_class", [FixedWindowIndexer, FixedForwardWindowIndexer, ExpandingIndexer]
)
@pytest.mark.parametrize("window_size", [1, 2, 12])
@pytest.mark.parametrize(
"df",
[
DataFrame({"a": [1, 1], "b": [0, 1]}),
DataFrame({"a": [1, 2], "b": [0, 1]}),
DataFrame({"b": [np.nan, 1, 2, np.nan] + list(range(4, 16))}).assign(a=1),
],
)
def test_indexers_are_reusable_after_groupby_rolling(indexer_class, window_size, df):
# GH 43267
num_trials = 3
indexer = indexer_class(window_size=window_size)
original_window_size = indexer.window_size
for i in range(num_trials):
df.groupby("a")["b"].rolling(window=indexer, min_periods=1).mean()
assert indexer.window_size == original_window_size


@pytest.mark.parametrize(
"window_size, num_values, expected_start, expected_end",
[
(1, 1, [0], [1]),
(1, 2, [0, 1], [1, 2]),
(2, 1, [0], [1]),
(2, 2, [0, 1], [2, 2]),
(5, 12, range(12), list(range(5, 12)) + [12] * 5),
(12, 5, range(5), [5] * 5),
(0, 0, np.array([], dtype="int64"), np.array([], dtype="int64")),
(1, 0, np.array([], dtype="int64"), np.array([], dtype="int64")),
(0, 1, [0], [0]),
],
)
def test_fixed_forward_indexer_bounds(
window_size, num_values, expected_start, expected_end
):
# GH 43267
indexer = FixedForwardWindowIndexer(window_size=window_size)
start, end = indexer.get_window_bounds(num_values=num_values)

tm.assert_equal(start, np.array(expected_start, dtype='int64'))
tm.assert_equal(end, np.array(expected_end, dtype='int64'))
assert len(start) == len(end)
Loading

0 comments on commit 2755500

Please sign in to comment.