From 11bc77d684748fe7bcc80a4b7960a6dbc389b6a0 Mon Sep 17 00:00:00 2001 From: DSM Date: Sat, 16 Oct 2021 16:26:51 -0400 Subject: [PATCH] Backport PR #43291: BUG: Fixes to FixedForwardWindowIndexer and GroupbyIndexer (#43267) --- doc/source/whatsnew/v1.3.4.rst | 1 + pandas/core/window/ewm.py | 2 +- pandas/core/window/expanding.py | 2 +- pandas/core/window/indexers.py | 47 ++++--- pandas/core/window/rolling.py | 26 +++- pandas/tests/groupby/test_missing.py | 2 +- pandas/tests/window/test_base_indexer.py | 159 +++++++++++++++++++++++ 7 files changed, 210 insertions(+), 29 deletions(-) diff --git a/doc/source/whatsnew/v1.3.4.rst b/doc/source/whatsnew/v1.3.4.rst index b6ee2d57a0965..f36c0afac763e 100644 --- a/doc/source/whatsnew/v1.3.4.rst +++ b/doc/source/whatsnew/v1.3.4.rst @@ -33,6 +33,7 @@ Fixed regressions Bug fixes ~~~~~~~~~ +- Fixed bug in :meth:`pandas.DataFrame.groupby.rolling` and :class:`pandas.api.indexers.FixedForwardWindowIndexer` leading to segfaults and window endpoints being mixed across groups (:issue:`43267`) - Fixed bug in :meth:`.GroupBy.mean` with datetimelike values including ``NaT`` values returning incorrect results (:issue:`43132`) - Fixed bug in :meth:`Series.aggregate` not passing the first ``args`` to the user supplied ``func`` in certain cases (:issue:`43357`) - Fixed memory leaks in :meth:`Series.rolling.quantile` and :meth:`Series.rolling.median` (:issue:`43339`) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 04c81ae756855..40522111cbf41 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -699,7 +699,7 @@ def _get_window_indexer(self) -> GroupbyIndexer: GroupbyIndexer """ window_indexer = GroupbyIndexer( - groupby_indicies=self._grouper.indices, + groupby_indices=self._grouper.indices, window_indexer=ExponentialMovingWindowIndexer, ) return window_indexer diff --git a/pandas/core/window/expanding.py b/pandas/core/window/expanding.py index 02cf31cad7b8d..c03f4653e5b1c 100644 --- a/pandas/core/window/expanding.py +++ b/pandas/core/window/expanding.py @@ -680,7 +680,7 @@ def _get_window_indexer(self) -> GroupbyIndexer: GroupbyIndexer """ window_indexer = GroupbyIndexer( - groupby_indicies=self._grouper.indices, + groupby_indices=self._grouper.indices, window_indexer=ExpandingIndexer, ) return window_indexer diff --git a/pandas/core/window/indexers.py b/pandas/core/window/indexers.py index cef023a647d7f..c4156f214ca68 100644 --- a/pandas/core/window/indexers.py +++ b/pandas/core/window/indexers.py @@ -266,9 +266,9 @@ def get_window_bounds( ) start = np.arange(num_values, dtype="int64") - end_s = start[: -self.window_size] + self.window_size - end_e = np.full(self.window_size, num_values, dtype="int64") - end = np.concatenate([end_s, end_e]) + end = start + self.window_size + if self.window_size: + end[-self.window_size :] = num_values return start, end @@ -279,8 +279,8 @@ class GroupbyIndexer(BaseIndexer): def __init__( self, index_array: np.ndarray | None = None, - window_size: int = 0, - groupby_indicies: dict | None = None, + window_size: int | BaseIndexer = 0, + groupby_indices: dict | None = None, window_indexer: type[BaseIndexer] = BaseIndexer, indexer_kwargs: dict | None = None, **kwargs, @@ -292,9 +292,9 @@ def __init__( np.ndarray of the index of the original object that we are performing a chained groupby operation over. This index has been pre-sorted relative to the groups - window_size : int + window_size : int or BaseIndexer window size during the windowing operation - groupby_indicies : dict or None + groupby_indices : dict or None dict of {group label: [positional index of rows belonging to the group]} window_indexer : BaseIndexer BaseIndexer class determining the start and end bounds of each group @@ -303,11 +303,13 @@ def __init__( **kwargs : keyword arguments that will be available when get_window_bounds is called """ - self.groupby_indicies = groupby_indicies or {} + self.groupby_indices = groupby_indices or {} self.window_indexer = window_indexer - self.indexer_kwargs = indexer_kwargs or {} + self.indexer_kwargs = indexer_kwargs.copy() if indexer_kwargs else {} super().__init__( - index_array, self.indexer_kwargs.pop("window_size", window_size), **kwargs + index_array=index_array, + window_size=self.indexer_kwargs.pop("window_size", window_size), + **kwargs, ) @Appender(get_window_bounds_doc) @@ -323,8 +325,8 @@ def get_window_bounds( # 3) Append the window bounds in group order start_arrays = [] end_arrays = [] - window_indicies_start = 0 - for key, indices in self.groupby_indicies.items(): + window_indices_start = 0 + for key, indices in self.groupby_indices.items(): index_array: np.ndarray | None if self.index_array is not None: @@ -341,18 +343,21 @@ def get_window_bounds( ) start = start.astype(np.int64) end = end.astype(np.int64) - # Cannot use groupby_indicies as they might not be monotonic with the object + assert len(start) == len( + end + ), "these should be equal in length from get_window_bounds" + # Cannot use groupby_indices as they might not be monotonic with the object # we're rolling over - window_indicies = np.arange( - window_indicies_start, window_indicies_start + len(indices) + window_indices = np.arange( + window_indices_start, window_indices_start + len(indices) ) - window_indicies_start += len(indices) + window_indices_start += len(indices) # Extend as we'll be slicing window like [start, end) - window_indicies = np.append( - window_indicies, [window_indicies[-1] + 1] - ).astype(np.int64) - start_arrays.append(window_indicies.take(ensure_platform_int(start))) - end_arrays.append(window_indicies.take(ensure_platform_int(end))) + window_indices = np.append(window_indices, [window_indices[-1] + 1]).astype( + np.int64, copy=False + ) + start_arrays.append(window_indices.take(ensure_platform_int(start))) + end_arrays.append(window_indices.take(ensure_platform_int(end))) start = np.concatenate(start_arrays) end = np.concatenate(end_arrays) return start, end diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 1c714db78fa46..14246ec30ffa7 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -300,8 +300,10 @@ def __iter__(self): center=self.center, closed=self.closed, ) - # From get_window_bounds, those two should be equal in length of array - assert len(start) == len(end) + + assert len(start) == len( + end + ), "these should be equal in length from get_window_bounds" for s, e in zip(start, end): result = obj.iloc[slice(s, e)] @@ -522,6 +524,10 @@ def calc(x): center=self.center, closed=self.closed, ) + assert len(start) == len( + end + ), "these should be equal in length from get_window_bounds" + return func(x, start, end, min_periods) with np.errstate(all="ignore"): @@ -1402,6 +1408,11 @@ def cov_func(x, y): center=self.center, closed=self.closed, ) + + assert len(start) == len( + end + ), "these should be equal in length from get_window_bounds" + with np.errstate(all="ignore"): mean_x_y = window_aggregations.roll_mean( x_array * y_array, start, end, min_periods @@ -1441,6 +1452,11 @@ def corr_func(x, y): center=self.center, closed=self.closed, ) + + assert len(start) == len( + end + ), "these should be equal in length from get_window_bounds" + with np.errstate(all="ignore"): mean_x_y = window_aggregations.roll_mean( x_array * y_array, start, end, min_periods @@ -2316,11 +2332,11 @@ def _get_window_indexer(self) -> GroupbyIndexer: index_array = self._index_array if isinstance(self.window, BaseIndexer): rolling_indexer = type(self.window) - indexer_kwargs = self.window.__dict__ + indexer_kwargs = self.window.__dict__.copy() assert isinstance(indexer_kwargs, dict) # for mypy # We'll be using the index of each group later indexer_kwargs.pop("index_array", None) - window = 0 + window = self.window elif self._win_freq_i8 is not None: rolling_indexer = VariableWindowIndexer window = self._win_freq_i8 @@ -2330,7 +2346,7 @@ def _get_window_indexer(self) -> GroupbyIndexer: window_indexer = GroupbyIndexer( index_array=index_array, window_size=window, - groupby_indicies=self._grouper.indices, + groupby_indices=self._grouper.indices, window_indexer=rolling_indexer, indexer_kwargs=indexer_kwargs, ) diff --git a/pandas/tests/groupby/test_missing.py b/pandas/tests/groupby/test_missing.py index f3149abb52291..2391ef620aea6 100644 --- a/pandas/tests/groupby/test_missing.py +++ b/pandas/tests/groupby/test_missing.py @@ -144,7 +144,7 @@ def test_min_count(func, min_count, value): tm.assert_frame_equal(result, expected) -def test_indicies_with_missing(): +def test_indices_with_missing(): # GH 9304 df = DataFrame({"a": [1, 1, np.nan], "b": [2, 3, 4], "c": [5, 6, 7]}) g = df.groupby(["a", "b"]) diff --git a/pandas/tests/window/test_base_indexer.py b/pandas/tests/window/test_base_indexer.py index 06867e80ee711..5a3bfbb5741c3 100644 --- a/pandas/tests/window/test_base_indexer.py +++ b/pandas/tests/window/test_base_indexer.py @@ -3,7 +3,9 @@ from pandas import ( DataFrame, + MultiIndex, Series, + concat, date_range, ) import pandas._testing as tm @@ -13,6 +15,7 @@ ) from pandas.core.window.indexers import ( ExpandingIndexer, + FixedWindowIndexer, VariableOffsetWindowIndexer, ) @@ -293,3 +296,159 @@ def get_window_bounds(self, num_values, min_periods, center, closed): result = getattr(df.rolling(indexer), func)(*args) expected = DataFrame({"values": values}) tm.assert_frame_equal(result, expected) + + +@pytest.mark.parametrize( + "indexer_class", [FixedWindowIndexer, FixedForwardWindowIndexer, ExpandingIndexer] +) +@pytest.mark.parametrize("window_size", [1, 2, 12]) +@pytest.mark.parametrize( + "df_data", + [ + {"a": [1, 1], "b": [0, 1]}, + {"a": [1, 2], "b": [0, 1]}, + {"a": [1] * 16, "b": [np.nan, 1, 2, np.nan] + list(range(4, 16))}, + ], +) +def test_indexers_are_reusable_after_groupby_rolling( + indexer_class, window_size, df_data +): + # GH 43267 + df = DataFrame(df_data) + num_trials = 3 + indexer = indexer_class(window_size=window_size) + original_window_size = indexer.window_size + for i in range(num_trials): + df.groupby("a")["b"].rolling(window=indexer, min_periods=1).mean() + assert indexer.window_size == original_window_size + + +@pytest.mark.parametrize( + "window_size, num_values, expected_start, expected_end", + [ + (1, 1, [0], [1]), + (1, 2, [0, 1], [1, 2]), + (2, 1, [0], [1]), + (2, 2, [0, 1], [2, 2]), + (5, 12, range(12), list(range(5, 12)) + [12] * 5), + (12, 5, range(5), [5] * 5), + (0, 0, np.array([]), np.array([])), + (1, 0, np.array([]), np.array([])), + (0, 1, [0], [0]), + ], +) +def test_fixed_forward_indexer_bounds( + window_size, num_values, expected_start, expected_end +): + # GH 43267 + indexer = FixedForwardWindowIndexer(window_size=window_size) + start, end = indexer.get_window_bounds(num_values=num_values) + + tm.assert_numpy_array_equal(start, np.array(expected_start), check_dtype=False) + tm.assert_numpy_array_equal(end, np.array(expected_end), check_dtype=False) + assert len(start) == len(end) + + +@pytest.mark.parametrize( + "df, window_size, expected", + [ + ( + DataFrame({"b": [0, 1, 2], "a": [1, 2, 2]}), + 2, + Series( + [0, 1.5, 2.0], + index=MultiIndex.from_arrays([[1, 2, 2], range(3)], names=["a", None]), + name="b", + dtype=np.float64, + ), + ), + ( + DataFrame( + { + "b": [np.nan, 1, 2, np.nan] + list(range(4, 18)), + "a": [1] * 7 + [2] * 11, + "c": range(18), + } + ), + 12, + Series( + [ + 3.6, + 3.6, + 4.25, + 5.0, + 5.0, + 5.5, + 6.0, + 12.0, + 12.5, + 13.0, + 13.5, + 14.0, + 14.5, + 15.0, + 15.5, + 16.0, + 16.5, + 17.0, + ], + index=MultiIndex.from_arrays( + [[1] * 7 + [2] * 11, range(18)], names=["a", None] + ), + name="b", + dtype=np.float64, + ), + ), + ], +) +def test_rolling_groupby_with_fixed_forward_specific(df, window_size, expected): + # GH 43267 + indexer = FixedForwardWindowIndexer(window_size=window_size) + result = df.groupby("a")["b"].rolling(window=indexer, min_periods=1).mean() + tm.assert_series_equal(result, expected) + + +@pytest.mark.parametrize( + "group_keys", + [ + (1,), + (1, 2), + (2, 1), + (1, 1, 2), + (1, 2, 1), + (1, 1, 2, 2), + (1, 2, 3, 2, 3), + (1, 1, 2) * 4, + (1, 2, 3) * 5, + ], +) +@pytest.mark.parametrize("window_size", [1, 2, 3, 4, 5, 8, 20]) +def test_rolling_groupby_with_fixed_forward_many(group_keys, window_size): + # GH 43267 + df = DataFrame( + { + "a": np.array(list(group_keys)), + "b": np.arange(len(group_keys), dtype=np.float64) + 17, + "c": np.arange(len(group_keys), dtype=np.int64), + } + ) + + indexer = FixedForwardWindowIndexer(window_size=window_size) + result = df.groupby("a")["b"].rolling(window=indexer, min_periods=1).sum() + result.index.names = ["a", "c"] + + groups = df.groupby("a")[["a", "b"]] + manual = concat( + [ + g.assign( + b=[ + g["b"].iloc[i : i + window_size].sum(min_count=1) + for i in range(len(g)) + ] + ) + for _, g in groups + ] + ) + manual = manual.set_index(["a", "c"])["b"] + + tm.assert_series_equal(result, manual)