diff --git a/doc/source/whatsnew/v1.4.0.rst b/doc/source/whatsnew/v1.4.0.rst index 450ecc85c725bf..67dc0b757dff44 100644 --- a/doc/source/whatsnew/v1.4.0.rst +++ b/doc/source/whatsnew/v1.4.0.rst @@ -355,7 +355,7 @@ Groupby/resample/rolling - Bug in :meth:`pandas.DataFrame.ewm`, where non-float64 dtypes were silently failing (:issue:`42452`) - Bug in :meth:`pandas.DataFrame.rolling` operation along rows (``axis=1``) incorrectly omits columns containing ``float16`` and ``float32`` (:issue:`41779`) - Bug in :meth:`Resampler.aggregate` did not allow the use of Named Aggregation (:issue:`32803`) -- +- Bug in :meth:`DataFrame.groupby.rolling` and :class:`FixedForwardWindowIndexer` leading to segfaults and incorrect windows (:issue:`43267`) Reshaping ^^^^^^^^^ diff --git a/pandas/core/indexers/objects.py b/pandas/core/indexers/objects.py index cef023a647d7fe..8f1e299e0bc8f8 100644 --- a/pandas/core/indexers/objects.py +++ b/pandas/core/indexers/objects.py @@ -93,7 +93,6 @@ def get_window_bounds( end = np.clip(end, 0, num_values) start = np.clip(start, 0, num_values) - return start, end @@ -266,9 +265,8 @@ def get_window_bounds( ) start = np.arange(num_values, dtype="int64") - end_s = start[: -self.window_size] + self.window_size - end_e = np.full(self.window_size, num_values, dtype="int64") - end = np.concatenate([end_s, end_e]) + end = start + self.window_size + end[end > num_values] = num_values return start, end @@ -280,7 +278,7 @@ def __init__( self, index_array: np.ndarray | None = None, window_size: int = 0, - groupby_indicies: dict | None = None, + groupby_indices: dict | None = None, window_indexer: type[BaseIndexer] = BaseIndexer, indexer_kwargs: dict | None = None, **kwargs, @@ -294,7 +292,7 @@ def __init__( the groups window_size : int window size during the windowing operation - groupby_indicies : dict or None + groupby_indices : dict or None dict of {group label: [positional index of rows belonging to the group]} window_indexer : BaseIndexer BaseIndexer class determining the start and end bounds of each group @@ -303,11 +301,12 @@ def __init__( **kwargs : keyword arguments that will be available when get_window_bounds is called """ - self.groupby_indicies = groupby_indicies or {} + self.groupby_indices = groupby_indices or {} self.window_indexer = window_indexer - self.indexer_kwargs = indexer_kwargs or {} + self.indexer_kwargs = indexer_kwargs.copy() if indexer_kwargs else {} super().__init__( - index_array, self.indexer_kwargs.pop("window_size", window_size), **kwargs + index_array=index_array, + window_size=self.indexer_kwargs.pop("window_size", window_size), **kwargs ) @Appender(get_window_bounds_doc) @@ -323,8 +322,8 @@ def get_window_bounds( # 3) Append the window bounds in group order start_arrays = [] end_arrays = [] - window_indicies_start = 0 - for key, indices in self.groupby_indicies.items(): + window_indices_start = 0 + for key, indices in self.groupby_indices.items(): index_array: np.ndarray | None if self.index_array is not None: @@ -341,18 +340,18 @@ def get_window_bounds( ) start = start.astype(np.int64) end = end.astype(np.int64) - # Cannot use groupby_indicies as they might not be monotonic with the object + # Cannot use groupby_indices as they might not be monotonic with the object # we're rolling over - window_indicies = np.arange( - window_indicies_start, window_indicies_start + len(indices) + window_indices = np.arange( + window_indices_start, window_indices_start + len(indices) ) - window_indicies_start += len(indices) + window_indices_start += len(indices) # Extend as we'll be slicing window like [start, end) - window_indicies = np.append( - window_indicies, [window_indicies[-1] + 1] + window_indices = np.append( + window_indices, [window_indices[-1] + 1] ).astype(np.int64) - start_arrays.append(window_indicies.take(ensure_platform_int(start))) - end_arrays.append(window_indicies.take(ensure_platform_int(end))) + start_arrays.append(window_indices.take(ensure_platform_int(start))) + end_arrays.append(window_indices.take(ensure_platform_int(end))) start = np.concatenate(start_arrays) end = np.concatenate(end_arrays) return start, end diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index f0bba8ae9727f3..e655797204aa09 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -725,7 +725,7 @@ def _get_window_indexer(self) -> GroupbyIndexer: GroupbyIndexer """ window_indexer = GroupbyIndexer( - groupby_indicies=self._grouper.indices, + groupby_indices=self._grouper.indices, window_indexer=ExponentialMovingWindowIndexer, ) return window_indexer diff --git a/pandas/core/window/expanding.py b/pandas/core/window/expanding.py index 36a6399df7dbc0..52e30ffbdd2540 100644 --- a/pandas/core/window/expanding.py +++ b/pandas/core/window/expanding.py @@ -684,7 +684,7 @@ def _get_window_indexer(self) -> GroupbyIndexer: GroupbyIndexer """ window_indexer = GroupbyIndexer( - groupby_indicies=self._grouper.indices, + groupby_indices=self._grouper.indices, window_indexer=ExpandingIndexer, ) return window_indexer diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index ab23b84a3b8c63..35708790d4b2cf 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -58,6 +58,7 @@ from pandas.core.indexers.objects import ( BaseIndexer, FixedWindowIndexer, + FixedForwardWindowIndexer, GroupbyIndexer, VariableWindowIndexer, ) @@ -299,6 +300,7 @@ def __iter__(self): center=self.center, closed=self.closed, ) + # From get_window_bounds, those two should be equal in length of array assert len(start) == len(end) @@ -529,6 +531,7 @@ def _apply( y : type of input """ window_indexer = self._get_window_indexer() + min_periods = ( self.min_periods if self.min_periods is not None @@ -542,12 +545,16 @@ def homogeneous_func(values: np.ndarray): return values.copy() def calc(x): + start, end = window_indexer.get_window_bounds( num_values=len(x), min_periods=min_periods, center=self.center, closed=self.closed, ) + + # From get_window_bounds, those two should be equal in length of array + assert len(start) == len(end) return func(x, start, end, min_periods, *numba_args) with np.errstate(all="ignore"): @@ -1429,6 +1436,10 @@ def cov_func(x, y): center=self.center, closed=self.closed, ) + + # From get_window_bounds, those two should be equal in length of array + assert len(start) == len(end) + with np.errstate(all="ignore"): mean_x_y = window_aggregations.roll_mean( x_array * y_array, start, end, min_periods @@ -1468,6 +1479,10 @@ def corr_func(x, y): center=self.center, closed=self.closed, ) + + # From get_window_bounds, those two should be equal in length of array + assert len(start) == len(end) + with np.errstate(all="ignore"): mean_x_y = window_aggregations.roll_mean( x_array * y_array, start, end, min_periods @@ -2341,26 +2356,29 @@ def _get_window_indexer(self) -> GroupbyIndexer: rolling_indexer: type[BaseIndexer] indexer_kwargs: dict[str, Any] | None = None index_array = self._index_array + if isinstance(self.window, BaseIndexer): rolling_indexer = type(self.window) - indexer_kwargs = self.window.__dict__ + indexer_kwargs = self.window.__dict__.copy() assert isinstance(indexer_kwargs, dict) # for mypy # We'll be using the index of each group later indexer_kwargs.pop("index_array", None) - window = 0 + window = self.window elif self._win_freq_i8 is not None: rolling_indexer = VariableWindowIndexer window = self._win_freq_i8 else: rolling_indexer = FixedWindowIndexer window = self.window + window_indexer = GroupbyIndexer( index_array=index_array, window_size=window, - groupby_indicies=self._grouper.indices, + groupby_indices=self._grouper.indices, window_indexer=rolling_indexer, indexer_kwargs=indexer_kwargs, ) + return window_indexer def _validate_monotonic(self): diff --git a/pandas/tests/groupby/test_missing.py b/pandas/tests/groupby/test_missing.py index f3149abb52291e..2391ef620aea61 100644 --- a/pandas/tests/groupby/test_missing.py +++ b/pandas/tests/groupby/test_missing.py @@ -144,7 +144,7 @@ def test_min_count(func, min_count, value): tm.assert_frame_equal(result, expected) -def test_indicies_with_missing(): +def test_indices_with_missing(): # GH 9304 df = DataFrame({"a": [1, 1, np.nan], "b": [2, 3, 4], "c": [5, 6, 7]}) g = df.groupby(["a", "b"]) diff --git a/pandas/tests/window/test_base_indexer.py b/pandas/tests/window/test_base_indexer.py index 89c4b0cfdb4aa8..90f925f359204b 100644 --- a/pandas/tests/window/test_base_indexer.py +++ b/pandas/tests/window/test_base_indexer.py @@ -13,6 +13,7 @@ ) from pandas.core.indexers.objects import ( ExpandingIndexer, + FixedWindowIndexer, VariableOffsetWindowIndexer, ) @@ -293,3 +294,43 @@ def get_window_bounds(self, num_values, min_periods, center, closed): result = getattr(df.rolling(indexer), func)(*args) expected = DataFrame({"values": values}) tm.assert_frame_equal(result, expected) + + +@pytest.mark.parametrize('indexer_class', [FixedWindowIndexer, + FixedForwardWindowIndexer, + ExpandingIndexer]) +@pytest.mark.parametrize('window_size', [1, 2, 12]) +@pytest.mark.parametrize("df", + [DataFrame({"a": [1, 1], "b": [0, 1]}), + DataFrame({"a": [1, 2], "b": [0, 1]}), + DataFrame({'b': [np.nan, 1, 2, np.nan] + list(range(4, 16))}).assign(a=1) + ]) +def test_indexers_are_reusable_after_groupby_rolling(indexer_class, window_size, df): + # GH 43267 + num_trials = 3 + indexer = indexer_class(window_size=window_size) + original_window_size = indexer.window_size + for i in range(num_trials): + df.groupby("a")["b"].rolling(window=indexer, min_periods=1).mean() + assert indexer.window_size == original_window_size + + +@pytest.mark.parametrize('window_size, num_values, expected_start, expected_end', [ + (1, 1, [0], [1]), + (1, 2, [0, 1], [1, 2]), + (2, 1, [0], [1]), + (2, 2, [0, 1], [2, 2]), + (5, 12, range(12), list(range(5, 12)) + [12] * 5), + (12, 5, range(5), [5] * 5), + (0, 0, np.array([], dtype='int64'), np.array([], dtype='int64')), + (1, 0, np.array([], dtype='int64'), np.array([], dtype='int64')), + (0, 1, [0], [0]), +]) +def test_fixed_forward_indexer_bounds(window_size, num_values, expected_start, expected_end): + # GH 43267 + indexer = FixedForwardWindowIndexer(window_size=window_size) + start, end = indexer.get_window_bounds(num_values=num_values) + + tm.assert_equal(start, np.array(expected_start)) + tm.assert_equal(end, np.array(expected_end)) + assert len(start) == len(end) diff --git a/pandas/tests/window/test_rolling.py b/pandas/tests/window/test_rolling.py index 2edf22d96a9ba9..ef8619fbed52ab 100644 --- a/pandas/tests/window/test_rolling.py +++ b/pandas/tests/window/test_rolling.py @@ -19,13 +19,14 @@ Series, Timedelta, Timestamp, + concat, date_range, period_range, to_datetime, to_timedelta, ) import pandas._testing as tm -from pandas.api.indexers import BaseIndexer +from pandas.api.indexers import BaseIndexer, FixedForwardWindowIndexer from pandas.core.window import Rolling @@ -1500,3 +1501,50 @@ def test_rolling_numeric_dtypes(): dtype="float64", ) tm.assert_frame_equal(result, expected) + + +@pytest.mark.parametrize('df, window_size, expected', [ + (DataFrame({"b": [0, 1, 2], "a": [1, 2, 2]}), + 2, + Series([0, 1.5, 2.0], + index=MultiIndex.from_arrays([[1, 2, 2], range(3)], names=['a', None]), + name="b", dtype=np.float64)), + (DataFrame({"b": [np.nan, 1, 2, np.nan] + list(range(4, 18)), + "a": [1] * 7 + [2] * 11, + "c": range(18)}), + 12, + Series([3.6, 3.6, 4.25, 5., 5., 5.5, 6., 12., 12.5, + 13., 13.5, 14., 14.5, 15., 15.5, 16., 16.5, 17.], + index=MultiIndex.from_arrays([[1] * 7 + [2] * 11, range(18)], names=["a", None]), + name="b", + dtype=np.float64))]) +def test_rolling_groupby_with_fixed_forward_specific(df, window_size, expected): + # GH 43267 + indexer = FixedForwardWindowIndexer(window_size=window_size) + result = df.groupby("a")["b"].rolling(window=indexer, min_periods=1).mean() + tm.assert_series_equal(result, expected) + + +@pytest.mark.parametrize('group_keys', [ + (1,), (1, 2), (2, 1), + (1, 1, 2), (1, 2, 1), (1, 1, 2, 2), (1, 2, 3, 2, 3), + (1, 1, 2) * 4, + (1, 2, 3) * 5, +]) +@pytest.mark.parametrize('window_size', [1, 2, 3, 4, 5, 8, 20]) +def test_rolling_groupby_with_fixed_forward_many(group_keys, window_size): + # GH 43267 + df = DataFrame(dict(a=np.array(list(group_keys)), + b=np.arange(len(group_keys), dtype=np.float64) + 17, + c=np.arange(len(group_keys), dtype=np.int64))) + + indexer = FixedForwardWindowIndexer(window_size=window_size) + result = df.groupby("a")["b"].rolling(window=indexer, min_periods=1).sum() + result.index.names = ['a', 'c'] + + groups = df.groupby("a")[["a", "b"]] + manual = concat([g.assign(b=[g['b'].iloc[i:i+window_size].sum(min_count=1) + for i in range(len(g))]) for _, g in groups]) + manual = manual.set_index(["a","c"])["b"] + + tm.assert_series_equal(result, manual)