From 9c08fe1e192ed4ab197b042d8477f583d7a1cdeb Mon Sep 17 00:00:00 2001 From: Kaiqi Dong Date: Wed, 13 May 2020 22:43:44 +0200 Subject: [PATCH] CLN: Splits Moments and MomentsConsistency tests into separate files (#34140) --- .../moments/test_moments_consistency_ewm.py | 284 +++++++ ... => test_moments_consistency_expanding.py} | 0 .../test_moments_consistency_rolling.py | 700 ++++++++++++++++++ .../tests/window/moments/test_moments_ewm.py | 282 +------ .../window/moments/test_moments_rolling.py | 692 +---------------- 5 files changed, 988 insertions(+), 970 deletions(-) create mode 100644 pandas/tests/window/moments/test_moments_consistency_ewm.py rename pandas/tests/window/moments/{test_moments_expanding.py => test_moments_consistency_expanding.py} (100%) create mode 100644 pandas/tests/window/moments/test_moments_consistency_rolling.py diff --git a/pandas/tests/window/moments/test_moments_consistency_ewm.py b/pandas/tests/window/moments/test_moments_consistency_ewm.py new file mode 100644 index 0000000000000..3b3a9d59cb6e7 --- /dev/null +++ b/pandas/tests/window/moments/test_moments_consistency_ewm.py @@ -0,0 +1,284 @@ +import numpy as np +from numpy.random import randn +import pytest + +from pandas import DataFrame, Series, concat +from pandas.tests.window.common import ( + Base, + check_binary_ew, + check_binary_ew_min_periods, + check_pairwise_moment, + ew_func, + moments_consistency_cov_data, + moments_consistency_is_constant, + moments_consistency_mock_mean, + moments_consistency_series_data, + moments_consistency_std_data, + moments_consistency_var_data, + moments_consistency_var_debiasing_factors, +) + + +class TestEwmMomentsConsistency(Base): + def setup_method(self, method): + self._create_data() + + @pytest.mark.parametrize("func", ["cov", "corr"]) + def test_ewm_pairwise_cov_corr(self, func): + check_pairwise_moment(self.frame, "ewm", func, span=10, min_periods=5) + + +@pytest.mark.parametrize("name", ["cov", "corr"]) +def test_ewm_corr_cov(name, min_periods, binary_ew_data): + A, B = binary_ew_data + + check_binary_ew(name="corr", A=A, B=B) + check_binary_ew_min_periods("corr", min_periods, A, B) + + +@pytest.mark.parametrize("name", ["cov", "corr"]) +def test_different_input_array_raise_exception(name, binary_ew_data): + + A, _ = binary_ew_data + msg = "Input arrays must be of the same type!" + # exception raised is Exception + with pytest.raises(Exception, match=msg): + ew_func(A, randn(50), 20, name=name, min_periods=5) + + +@pytest.mark.slow +@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) +@pytest.mark.parametrize("adjust", [True, False]) +@pytest.mark.parametrize("ignore_na", [True, False]) +def test_ewm_consistency(consistency_data, min_periods, adjust, ignore_na): + def _weights(s, com, adjust, ignore_na): + if isinstance(s, DataFrame): + if not len(s.columns): + return DataFrame(index=s.index, columns=s.columns) + w = concat( + [ + _weights(s.iloc[:, i], com=com, adjust=adjust, ignore_na=ignore_na) + for i, _ in enumerate(s.columns) + ], + axis=1, + ) + w.index = s.index + w.columns = s.columns + return w + + w = Series(np.nan, index=s.index) + alpha = 1.0 / (1.0 + com) + if ignore_na: + w[s.notna()] = _weights( + s[s.notna()], com=com, adjust=adjust, ignore_na=False + ) + elif adjust: + for i in range(len(s)): + if s.iat[i] == s.iat[i]: + w.iat[i] = pow(1.0 / (1.0 - alpha), i) + else: + sum_wts = 0.0 + prev_i = -1 + for i in range(len(s)): + if s.iat[i] == s.iat[i]: + if prev_i == -1: + w.iat[i] = 1.0 + else: + w.iat[i] = alpha * sum_wts / pow(1.0 - alpha, i - prev_i) + sum_wts += w.iat[i] + prev_i = i + return w + + def _variance_debiasing_factors(s, com, adjust, ignore_na): + weights = _weights(s, com=com, adjust=adjust, ignore_na=ignore_na) + cum_sum = weights.cumsum().fillna(method="ffill") + cum_sum_sq = (weights * weights).cumsum().fillna(method="ffill") + numerator = cum_sum * cum_sum + denominator = numerator - cum_sum_sq + denominator[denominator <= 0.0] = np.nan + return numerator / denominator + + def _ewma(s, com, min_periods, adjust, ignore_na): + weights = _weights(s, com=com, adjust=adjust, ignore_na=ignore_na) + result = ( + s.multiply(weights).cumsum().divide(weights.cumsum()).fillna(method="ffill") + ) + result[ + s.expanding().count() < (max(min_periods, 1) if min_periods else 1) + ] = np.nan + return result + + x, is_constant, no_nans = consistency_data + com = 3.0 + moments_consistency_mock_mean( + x=x, + mean=lambda x: x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).mean(), + mock_mean=lambda x: _ewma( + x, com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ), + ) + + moments_consistency_is_constant( + x=x, + is_constant=is_constant, + min_periods=min_periods, + count=lambda x: x.expanding().count(), + mean=lambda x: x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).mean(), + corr=lambda x, y: x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).corr(y), + ) + + moments_consistency_var_debiasing_factors( + x=x, + var_unbiased=lambda x: ( + x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).var(bias=False) + ), + var_biased=lambda x: ( + x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).var(bias=True) + ), + var_debiasing_factors=lambda x: ( + _variance_debiasing_factors(x, com=com, adjust=adjust, ignore_na=ignore_na) + ), + ) + + +@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) +@pytest.mark.parametrize("adjust", [True, False]) +@pytest.mark.parametrize("ignore_na", [True, False]) +def test_ewm_consistency_var(consistency_data, min_periods, adjust, ignore_na): + x, is_constant, no_nans = consistency_data + com = 3.0 + moments_consistency_var_data( + x=x, + is_constant=is_constant, + min_periods=min_periods, + count=lambda x: x.expanding().count(), + mean=lambda x: x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).mean(), + var_unbiased=lambda x: ( + x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).var(bias=False) + ), + var_biased=lambda x: ( + x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).var(bias=True) + ), + ) + + +@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) +@pytest.mark.parametrize("adjust", [True, False]) +@pytest.mark.parametrize("ignore_na", [True, False]) +def test_ewm_consistency_std(consistency_data, min_periods, adjust, ignore_na): + x, is_constant, no_nans = consistency_data + com = 3.0 + moments_consistency_std_data( + x=x, + var_unbiased=lambda x: ( + x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).var(bias=False) + ), + std_unbiased=lambda x: ( + x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).std(bias=False) + ), + var_biased=lambda x: ( + x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).var(bias=True) + ), + std_biased=lambda x: x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).std(bias=True), + ) + + +@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) +@pytest.mark.parametrize("adjust", [True, False]) +@pytest.mark.parametrize("ignore_na", [True, False]) +def test_ewm_consistency_cov(consistency_data, min_periods, adjust, ignore_na): + x, is_constant, no_nans = consistency_data + com = 3.0 + moments_consistency_cov_data( + x=x, + var_unbiased=lambda x: ( + x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).var(bias=False) + ), + cov_unbiased=lambda x, y: ( + x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).cov(y, bias=False) + ), + var_biased=lambda x: ( + x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).var(bias=True) + ), + cov_biased=lambda x, y: ( + x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).cov(y, bias=True) + ), + ) + + +@pytest.mark.slow +@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) +@pytest.mark.parametrize("adjust", [True, False]) +@pytest.mark.parametrize("ignore_na", [True, False]) +def test_ewm_consistency_series_data(consistency_data, min_periods, adjust, ignore_na): + x, is_constant, no_nans = consistency_data + com = 3.0 + moments_consistency_series_data( + x=x, + mean=lambda x: x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).mean(), + corr=lambda x, y: x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).corr(y), + var_unbiased=lambda x: ( + x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).var(bias=False) + ), + std_unbiased=lambda x: ( + x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).std(bias=False) + ), + cov_unbiased=lambda x, y: ( + x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).cov(y, bias=False) + ), + var_biased=lambda x: ( + x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).var(bias=True) + ), + std_biased=lambda x: x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).std(bias=True), + cov_biased=lambda x, y: ( + x.ewm( + com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na + ).cov(y, bias=True) + ), + ) diff --git a/pandas/tests/window/moments/test_moments_expanding.py b/pandas/tests/window/moments/test_moments_consistency_expanding.py similarity index 100% rename from pandas/tests/window/moments/test_moments_expanding.py rename to pandas/tests/window/moments/test_moments_consistency_expanding.py diff --git a/pandas/tests/window/moments/test_moments_consistency_rolling.py b/pandas/tests/window/moments/test_moments_consistency_rolling.py new file mode 100644 index 0000000000000..2c37baeae13b7 --- /dev/null +++ b/pandas/tests/window/moments/test_moments_consistency_rolling.py @@ -0,0 +1,700 @@ +from datetime import datetime +import warnings + +import numpy as np +from numpy.random import randn +import pytest + +import pandas.util._test_decorators as td + +import pandas as pd +from pandas import DataFrame, DatetimeIndex, Index, Series +import pandas._testing as tm +from pandas.core.window.common import _flex_binary_moment +from pandas.tests.window.common import ( + Base, + check_pairwise_moment, + moments_consistency_cov_data, + moments_consistency_is_constant, + moments_consistency_mock_mean, + moments_consistency_series_data, + moments_consistency_std_data, + moments_consistency_var_data, + moments_consistency_var_debiasing_factors, +) + + +def _rolling_consistency_cases(): + for window in [1, 2, 3, 10, 20]: + for min_periods in {0, 1, 2, 3, 4, window}: + if min_periods and (min_periods > window): + continue + for center in [False, True]: + yield window, min_periods, center + + +class TestRollingMomentsConsistency(Base): + def setup_method(self, method): + self._create_data() + + # binary moments + def test_rolling_cov(self): + A = self.series + B = A + randn(len(A)) + + result = A.rolling(window=50, min_periods=25).cov(B) + tm.assert_almost_equal(result[-1], np.cov(A[-50:], B[-50:])[0, 1]) + + def test_rolling_corr(self): + A = self.series + B = A + randn(len(A)) + + result = A.rolling(window=50, min_periods=25).corr(B) + tm.assert_almost_equal(result[-1], np.corrcoef(A[-50:], B[-50:])[0, 1]) + + # test for correct bias correction + a = tm.makeTimeSeries() + b = tm.makeTimeSeries() + a[:5] = np.nan + b[:10] = np.nan + + result = a.rolling(window=len(a), min_periods=1).corr(b) + tm.assert_almost_equal(result[-1], a.corr(b)) + + @pytest.mark.parametrize("func", ["cov", "corr"]) + def test_rolling_pairwise_cov_corr(self, func): + check_pairwise_moment(self.frame, "rolling", func, window=10, min_periods=5) + + @pytest.mark.parametrize("method", ["corr", "cov"]) + def test_flex_binary_frame(self, method): + series = self.frame[1] + + res = getattr(series.rolling(window=10), method)(self.frame) + res2 = getattr(self.frame.rolling(window=10), method)(series) + exp = self.frame.apply(lambda x: getattr(series.rolling(window=10), method)(x)) + + tm.assert_frame_equal(res, exp) + tm.assert_frame_equal(res2, exp) + + frame2 = self.frame.copy() + frame2.values[:] = np.random.randn(*frame2.shape) + + res3 = getattr(self.frame.rolling(window=10), method)(frame2) + exp = DataFrame( + { + k: getattr(self.frame[k].rolling(window=10), method)(frame2[k]) + for k in self.frame + } + ) + tm.assert_frame_equal(res3, exp) + + +@pytest.mark.slow +@pytest.mark.parametrize( + "window,min_periods,center", list(_rolling_consistency_cases()) +) +def test_rolling_apply_consistency( + consistency_data, base_functions, no_nan_functions, window, min_periods, center +): + x, is_constant, no_nans = consistency_data + + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", message=".*(empty slice|0 for slice).*", category=RuntimeWarning, + ) + # test consistency between rolling_xyz() and either (a) + # rolling_apply of Series.xyz(), or (b) rolling_apply of + # np.nanxyz() + functions = base_functions + + # GH 8269 + if no_nans: + functions = no_nan_functions + base_functions + for (f, require_min_periods, name) in functions: + rolling_f = getattr( + x.rolling(window=window, center=center, min_periods=min_periods), name, + ) + + if ( + require_min_periods + and (min_periods is not None) + and (min_periods < require_min_periods) + ): + continue + + if name == "count": + rolling_f_result = rolling_f() + rolling_apply_f_result = x.rolling( + window=window, min_periods=min_periods, center=center + ).apply(func=f, raw=True) + else: + if name in ["cov", "corr"]: + rolling_f_result = rolling_f(pairwise=False) + else: + rolling_f_result = rolling_f() + rolling_apply_f_result = x.rolling( + window=window, min_periods=min_periods, center=center + ).apply(func=f, raw=True) + + # GH 9422 + if name in ["sum", "prod"]: + tm.assert_equal(rolling_f_result, rolling_apply_f_result) + + +@pytest.mark.parametrize("window", range(7)) +def test_rolling_corr_with_zero_variance(window): + # GH 18430 + s = pd.Series(np.zeros(20)) + other = pd.Series(np.arange(20)) + + assert s.rolling(window=window).corr(other=other).isna().all() + + +def test_flex_binary_moment(): + # GH3155 + # don't blow the stack + msg = "arguments to moment function must be of type np.ndarray/Series/DataFrame" + with pytest.raises(TypeError, match=msg): + _flex_binary_moment(5, 6, None) + + +def test_corr_sanity(): + # GH 3155 + df = DataFrame( + np.array( + [ + [0.87024726, 0.18505595], + [0.64355431, 0.3091617], + [0.92372966, 0.50552513], + [0.00203756, 0.04520709], + [0.84780328, 0.33394331], + [0.78369152, 0.63919667], + ] + ) + ) + + res = df[0].rolling(5, center=True).corr(df[1]) + assert all(np.abs(np.nan_to_num(x)) <= 1 for x in res) + + # and some fuzzing + for _ in range(10): + df = DataFrame(np.random.rand(30, 2)) + res = df[0].rolling(5, center=True).corr(df[1]) + try: + assert all(np.abs(np.nan_to_num(x)) <= 1 for x in res) + except AssertionError: + print(res) + + +def test_rolling_cov_diff_length(): + # GH 7512 + s1 = Series([1, 2, 3], index=[0, 1, 2]) + s2 = Series([1, 3], index=[0, 2]) + result = s1.rolling(window=3, min_periods=2).cov(s2) + expected = Series([None, None, 2.0]) + tm.assert_series_equal(result, expected) + + s2a = Series([1, None, 3], index=[0, 1, 2]) + result = s1.rolling(window=3, min_periods=2).cov(s2a) + tm.assert_series_equal(result, expected) + + +def test_rolling_corr_diff_length(): + # GH 7512 + s1 = Series([1, 2, 3], index=[0, 1, 2]) + s2 = Series([1, 3], index=[0, 2]) + result = s1.rolling(window=3, min_periods=2).corr(s2) + expected = Series([None, None, 1.0]) + tm.assert_series_equal(result, expected) + + s2a = Series([1, None, 3], index=[0, 1, 2]) + result = s1.rolling(window=3, min_periods=2).corr(s2a) + tm.assert_series_equal(result, expected) + + +@pytest.mark.parametrize( + "f", + [ + lambda x: x.rolling(window=10, min_periods=5).cov(x, pairwise=False), + lambda x: x.rolling(window=10, min_periods=5).corr(x, pairwise=False), + lambda x: x.rolling(window=10, min_periods=5).max(), + lambda x: x.rolling(window=10, min_periods=5).min(), + lambda x: x.rolling(window=10, min_periods=5).sum(), + lambda x: x.rolling(window=10, min_periods=5).mean(), + lambda x: x.rolling(window=10, min_periods=5).std(), + lambda x: x.rolling(window=10, min_periods=5).var(), + lambda x: x.rolling(window=10, min_periods=5).skew(), + lambda x: x.rolling(window=10, min_periods=5).kurt(), + lambda x: x.rolling(window=10, min_periods=5).quantile(quantile=0.5), + lambda x: x.rolling(window=10, min_periods=5).median(), + lambda x: x.rolling(window=10, min_periods=5).apply(sum, raw=False), + lambda x: x.rolling(window=10, min_periods=5).apply(sum, raw=True), + lambda x: x.rolling(win_type="boxcar", window=10, min_periods=5).mean(), + ], +) +@td.skip_if_no_scipy +def test_rolling_functions_window_non_shrinkage(f): + # GH 7764 + s = Series(range(4)) + s_expected = Series(np.nan, index=s.index) + df = DataFrame([[1, 5], [3, 2], [3, 9], [-1, 0]], columns=["A", "B"]) + df_expected = DataFrame(np.nan, index=df.index, columns=df.columns) + + s_result = f(s) + tm.assert_series_equal(s_result, s_expected) + + df_result = f(df) + tm.assert_frame_equal(df_result, df_expected) + + +def test_rolling_functions_window_non_shrinkage_binary(): + + # corr/cov return a MI DataFrame + df = DataFrame( + [[1, 5], [3, 2], [3, 9], [-1, 0]], + columns=Index(["A", "B"], name="foo"), + index=Index(range(4), name="bar"), + ) + df_expected = DataFrame( + columns=Index(["A", "B"], name="foo"), + index=pd.MultiIndex.from_product([df.index, df.columns], names=["bar", "foo"]), + dtype="float64", + ) + functions = [ + lambda x: (x.rolling(window=10, min_periods=5).cov(x, pairwise=True)), + lambda x: (x.rolling(window=10, min_periods=5).corr(x, pairwise=True)), + ] + for f in functions: + df_result = f(df) + tm.assert_frame_equal(df_result, df_expected) + + +def test_rolling_skew_edge_cases(): + + all_nan = Series([np.NaN] * 5) + + # yields all NaN (0 variance) + d = Series([1] * 5) + x = d.rolling(window=5).skew() + tm.assert_series_equal(all_nan, x) + + # yields all NaN (window too small) + d = Series(np.random.randn(5)) + x = d.rolling(window=2).skew() + tm.assert_series_equal(all_nan, x) + + # yields [NaN, NaN, NaN, 0.177994, 1.548824] + d = Series([-1.50837035, -0.1297039, 0.19501095, 1.73508164, 0.41941401]) + expected = Series([np.NaN, np.NaN, np.NaN, 0.177994, 1.548824]) + x = d.rolling(window=4).skew() + tm.assert_series_equal(expected, x) + + +def test_rolling_kurt_edge_cases(): + + all_nan = Series([np.NaN] * 5) + + # yields all NaN (0 variance) + d = Series([1] * 5) + x = d.rolling(window=5).kurt() + tm.assert_series_equal(all_nan, x) + + # yields all NaN (window too small) + d = Series(np.random.randn(5)) + x = d.rolling(window=3).kurt() + tm.assert_series_equal(all_nan, x) + + # yields [NaN, NaN, NaN, 1.224307, 2.671499] + d = Series([-1.50837035, -0.1297039, 0.19501095, 1.73508164, 0.41941401]) + expected = Series([np.NaN, np.NaN, np.NaN, 1.224307, 2.671499]) + x = d.rolling(window=4).kurt() + tm.assert_series_equal(expected, x) + + +def test_rolling_skew_eq_value_fperr(): + # #18804 all rolling skew for all equal values should return Nan + a = Series([1.1] * 15).rolling(window=10).skew() + assert np.isnan(a).all() + + +def test_rolling_kurt_eq_value_fperr(): + # #18804 all rolling kurt for all equal values should return Nan + a = Series([1.1] * 15).rolling(window=10).kurt() + assert np.isnan(a).all() + + +def test_rolling_max_gh6297(): + """Replicate result expected in GH #6297""" + indices = [datetime(1975, 1, i) for i in range(1, 6)] + # So that we can have 2 datapoints on one of the days + indices.append(datetime(1975, 1, 3, 6, 0)) + series = Series(range(1, 7), index=indices) + # Use floats instead of ints as values + series = series.map(lambda x: float(x)) + # Sort chronologically + series = series.sort_index() + + expected = Series( + [1.0, 2.0, 6.0, 4.0, 5.0], + index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), + ) + x = series.resample("D").max().rolling(window=1).max() + tm.assert_series_equal(expected, x) + + +def test_rolling_max_resample(): + + indices = [datetime(1975, 1, i) for i in range(1, 6)] + # So that we can have 3 datapoints on last day (4, 10, and 20) + indices.append(datetime(1975, 1, 5, 1)) + indices.append(datetime(1975, 1, 5, 2)) + series = Series(list(range(0, 5)) + [10, 20], index=indices) + # Use floats instead of ints as values + series = series.map(lambda x: float(x)) + # Sort chronologically + series = series.sort_index() + + # Default how should be max + expected = Series( + [0.0, 1.0, 2.0, 3.0, 20.0], + index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), + ) + x = series.resample("D").max().rolling(window=1).max() + tm.assert_series_equal(expected, x) + + # Now specify median (10.0) + expected = Series( + [0.0, 1.0, 2.0, 3.0, 10.0], + index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), + ) + x = series.resample("D").median().rolling(window=1).max() + tm.assert_series_equal(expected, x) + + # Now specify mean (4+10+20)/3 + v = (4.0 + 10.0 + 20.0) / 3.0 + expected = Series( + [0.0, 1.0, 2.0, 3.0, v], + index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), + ) + x = series.resample("D").mean().rolling(window=1).max() + tm.assert_series_equal(expected, x) + + +def test_rolling_min_resample(): + + indices = [datetime(1975, 1, i) for i in range(1, 6)] + # So that we can have 3 datapoints on last day (4, 10, and 20) + indices.append(datetime(1975, 1, 5, 1)) + indices.append(datetime(1975, 1, 5, 2)) + series = Series(list(range(0, 5)) + [10, 20], index=indices) + # Use floats instead of ints as values + series = series.map(lambda x: float(x)) + # Sort chronologically + series = series.sort_index() + + # Default how should be min + expected = Series( + [0.0, 1.0, 2.0, 3.0, 4.0], + index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), + ) + r = series.resample("D").min().rolling(window=1) + tm.assert_series_equal(expected, r.min()) + + +def test_rolling_median_resample(): + + indices = [datetime(1975, 1, i) for i in range(1, 6)] + # So that we can have 3 datapoints on last day (4, 10, and 20) + indices.append(datetime(1975, 1, 5, 1)) + indices.append(datetime(1975, 1, 5, 2)) + series = Series(list(range(0, 5)) + [10, 20], index=indices) + # Use floats instead of ints as values + series = series.map(lambda x: float(x)) + # Sort chronologically + series = series.sort_index() + + # Default how should be median + expected = Series( + [0.0, 1.0, 2.0, 3.0, 10], + index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), + ) + x = series.resample("D").median().rolling(window=1).median() + tm.assert_series_equal(expected, x) + + +def test_rolling_median_memory_error(): + # GH11722 + n = 20000 + Series(np.random.randn(n)).rolling(window=2, center=False).median() + Series(np.random.randn(n)).rolling(window=2, center=False).median() + + +def test_rolling_min_max_numeric_types(): + + # GH12373 + types_test = [np.dtype(f"f{width}") for width in [4, 8]] + types_test.extend( + [np.dtype(f"{sign}{width}") for width in [1, 2, 4, 8] for sign in "ui"] + ) + for data_type in types_test: + # Just testing that these don't throw exceptions and that + # the return type is float64. Other tests will cover quantitative + # correctness + result = DataFrame(np.arange(20, dtype=data_type)).rolling(window=5).max() + assert result.dtypes[0] == np.dtype("f8") + result = DataFrame(np.arange(20, dtype=data_type)).rolling(window=5).min() + assert result.dtypes[0] == np.dtype("f8") + + +def test_moment_functions_zero_length(): + # GH 8056 + s = Series(dtype=np.float64) + s_expected = s + df1 = DataFrame() + df1_expected = df1 + df2 = DataFrame(columns=["a"]) + df2["a"] = df2["a"].astype("float64") + df2_expected = df2 + + functions = [ + lambda x: x.rolling(window=10).count(), + lambda x: x.rolling(window=10, min_periods=5).cov(x, pairwise=False), + lambda x: x.rolling(window=10, min_periods=5).corr(x, pairwise=False), + lambda x: x.rolling(window=10, min_periods=5).max(), + lambda x: x.rolling(window=10, min_periods=5).min(), + lambda x: x.rolling(window=10, min_periods=5).sum(), + lambda x: x.rolling(window=10, min_periods=5).mean(), + lambda x: x.rolling(window=10, min_periods=5).std(), + lambda x: x.rolling(window=10, min_periods=5).var(), + lambda x: x.rolling(window=10, min_periods=5).skew(), + lambda x: x.rolling(window=10, min_periods=5).kurt(), + lambda x: x.rolling(window=10, min_periods=5).quantile(0.5), + lambda x: x.rolling(window=10, min_periods=5).median(), + lambda x: x.rolling(window=10, min_periods=5).apply(sum, raw=False), + lambda x: x.rolling(window=10, min_periods=5).apply(sum, raw=True), + lambda x: x.rolling(win_type="boxcar", window=10, min_periods=5).mean(), + ] + for f in functions: + try: + s_result = f(s) + tm.assert_series_equal(s_result, s_expected) + + df1_result = f(df1) + tm.assert_frame_equal(df1_result, df1_expected) + + df2_result = f(df2) + tm.assert_frame_equal(df2_result, df2_expected) + except (ImportError): + + # scipy needed for rolling_window + continue + + +def test_moment_functions_zero_length_pairwise(): + + df1 = DataFrame() + df2 = DataFrame(columns=Index(["a"], name="foo"), index=Index([], name="bar")) + df2["a"] = df2["a"].astype("float64") + + df1_expected = DataFrame( + index=pd.MultiIndex.from_product([df1.index, df1.columns]), columns=Index([]), + ) + df2_expected = DataFrame( + index=pd.MultiIndex.from_product( + [df2.index, df2.columns], names=["bar", "foo"] + ), + columns=Index(["a"], name="foo"), + dtype="float64", + ) + + functions = [ + lambda x: (x.rolling(window=10, min_periods=5).cov(x, pairwise=True)), + lambda x: (x.rolling(window=10, min_periods=5).corr(x, pairwise=True)), + ] + + for f in functions: + df1_result = f(df1) + tm.assert_frame_equal(df1_result, df1_expected) + + df2_result = f(df2) + tm.assert_frame_equal(df2_result, df2_expected) + + +@pytest.mark.slow +@pytest.mark.parametrize( + "window,min_periods,center", list(_rolling_consistency_cases()) +) +def test_rolling_consistency_var(consistency_data, window, min_periods, center): + x, is_constant, no_nans = consistency_data + moments_consistency_var_data( + x=x, + is_constant=is_constant, + min_periods=min_periods, + count=lambda x: ( + x.rolling(window=window, min_periods=min_periods, center=center).count() + ), + mean=lambda x: ( + x.rolling(window=window, min_periods=min_periods, center=center).mean() + ), + var_unbiased=lambda x: ( + x.rolling(window=window, min_periods=min_periods, center=center).var() + ), + var_biased=lambda x: ( + x.rolling(window=window, min_periods=min_periods, center=center).var(ddof=0) + ), + ) + + +@pytest.mark.slow +@pytest.mark.parametrize( + "window,min_periods,center", list(_rolling_consistency_cases()) +) +def test_rolling_consistency_std(consistency_data, window, min_periods, center): + x, is_constant, no_nans = consistency_data + moments_consistency_std_data( + x=x, + var_unbiased=lambda x: ( + x.rolling(window=window, min_periods=min_periods, center=center).var() + ), + std_unbiased=lambda x: ( + x.rolling(window=window, min_periods=min_periods, center=center).std() + ), + var_biased=lambda x: ( + x.rolling(window=window, min_periods=min_periods, center=center).var(ddof=0) + ), + std_biased=lambda x: ( + x.rolling(window=window, min_periods=min_periods, center=center).std(ddof=0) + ), + ) + + +@pytest.mark.slow +@pytest.mark.parametrize( + "window,min_periods,center", list(_rolling_consistency_cases()) +) +def test_rolling_consistency_cov(consistency_data, window, min_periods, center): + x, is_constant, no_nans = consistency_data + moments_consistency_cov_data( + x=x, + var_unbiased=lambda x: ( + x.rolling(window=window, min_periods=min_periods, center=center).var() + ), + cov_unbiased=lambda x, y: ( + x.rolling(window=window, min_periods=min_periods, center=center).cov(y) + ), + var_biased=lambda x: ( + x.rolling(window=window, min_periods=min_periods, center=center).var(ddof=0) + ), + cov_biased=lambda x, y: ( + x.rolling(window=window, min_periods=min_periods, center=center).cov( + y, ddof=0 + ) + ), + ) + + +@pytest.mark.slow +@pytest.mark.parametrize( + "window,min_periods,center", list(_rolling_consistency_cases()) +) +def test_rolling_consistency_series(consistency_data, window, min_periods, center): + x, is_constant, no_nans = consistency_data + moments_consistency_series_data( + x=x, + mean=lambda x: ( + x.rolling(window=window, min_periods=min_periods, center=center).mean() + ), + corr=lambda x, y: ( + x.rolling(window=window, min_periods=min_periods, center=center).corr(y) + ), + var_unbiased=lambda x: ( + x.rolling(window=window, min_periods=min_periods, center=center).var() + ), + std_unbiased=lambda x: ( + x.rolling(window=window, min_periods=min_periods, center=center).std() + ), + cov_unbiased=lambda x, y: ( + x.rolling(window=window, min_periods=min_periods, center=center).cov(y) + ), + var_biased=lambda x: ( + x.rolling(window=window, min_periods=min_periods, center=center).var(ddof=0) + ), + std_biased=lambda x: ( + x.rolling(window=window, min_periods=min_periods, center=center).std(ddof=0) + ), + cov_biased=lambda x, y: ( + x.rolling(window=window, min_periods=min_periods, center=center).cov( + y, ddof=0 + ) + ), + ) + + +@pytest.mark.slow +@pytest.mark.parametrize( + "window,min_periods,center", list(_rolling_consistency_cases()) +) +def test_rolling_consistency(consistency_data, window, min_periods, center): + x, is_constant, no_nans = consistency_data + # suppress warnings about empty slices, as we are deliberately testing + # with empty/0-length Series/DataFrames + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", message=".*(empty slice|0 for slice).*", category=RuntimeWarning, + ) + + # test consistency between different rolling_* moments + moments_consistency_mock_mean( + x=x, + mean=lambda x: ( + x.rolling(window=window, min_periods=min_periods, center=center).mean() + ), + mock_mean=lambda x: ( + x.rolling(window=window, min_periods=min_periods, center=center) + .sum() + .divide( + x.rolling( + window=window, min_periods=min_periods, center=center + ).count() + ) + ), + ) + + moments_consistency_is_constant( + x=x, + is_constant=is_constant, + min_periods=min_periods, + count=lambda x: ( + x.rolling(window=window, min_periods=min_periods, center=center).count() + ), + mean=lambda x: ( + x.rolling(window=window, min_periods=min_periods, center=center).mean() + ), + corr=lambda x, y: ( + x.rolling(window=window, min_periods=min_periods, center=center).corr(y) + ), + ) + + moments_consistency_var_debiasing_factors( + x=x, + var_unbiased=lambda x: ( + x.rolling(window=window, min_periods=min_periods, center=center).var() + ), + var_biased=lambda x: ( + x.rolling(window=window, min_periods=min_periods, center=center).var( + ddof=0 + ) + ), + var_debiasing_factors=lambda x: ( + x.rolling(window=window, min_periods=min_periods, center=center) + .count() + .divide( + ( + x.rolling( + window=window, min_periods=min_periods, center=center + ).count() + - 1.0 + ).replace(0.0, np.nan) + ) + ), + ) diff --git a/pandas/tests/window/moments/test_moments_ewm.py b/pandas/tests/window/moments/test_moments_ewm.py index 17d497e6e1320..162917fff9363 100644 --- a/pandas/tests/window/moments/test_moments_ewm.py +++ b/pandas/tests/window/moments/test_moments_ewm.py @@ -3,22 +3,9 @@ import pytest import pandas as pd -from pandas import DataFrame, Series, concat +from pandas import DataFrame, Series import pandas._testing as tm -from pandas.tests.window.common import ( - Base, - check_binary_ew, - check_binary_ew_min_periods, - check_pairwise_moment, - ew_func, - moments_consistency_cov_data, - moments_consistency_is_constant, - moments_consistency_mock_mean, - moments_consistency_series_data, - moments_consistency_std_data, - moments_consistency_var_data, - moments_consistency_var_debiasing_factors, -) +from pandas.tests.window.common import Base @pytest.mark.filterwarnings("ignore:can't resolve package:ImportWarning") @@ -272,268 +259,3 @@ def test_ew_min_periods(self, min_periods, name): # pass in ints result2 = getattr(Series(np.arange(50)).ewm(span=10), name)() assert result2.dtype == np.float_ - - -class TestEwmMomentsConsistency(Base): - def setup_method(self, method): - self._create_data() - - @pytest.mark.parametrize("func", ["cov", "corr"]) - def test_ewm_pairwise_cov_corr(self, func): - check_pairwise_moment(self.frame, "ewm", func, span=10, min_periods=5) - - -@pytest.mark.parametrize("name", ["cov", "corr"]) -def test_ewm_corr_cov(name, min_periods, binary_ew_data): - A, B = binary_ew_data - - check_binary_ew(name="corr", A=A, B=B) - check_binary_ew_min_periods("corr", min_periods, A, B) - - -@pytest.mark.parametrize("name", ["cov", "corr"]) -def test_different_input_array_raise_exception(name, binary_ew_data): - - A, _ = binary_ew_data - msg = "Input arrays must be of the same type!" - # exception raised is Exception - with pytest.raises(Exception, match=msg): - ew_func(A, randn(50), 20, name=name, min_periods=5) - - -@pytest.mark.slow -@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) -@pytest.mark.parametrize("adjust", [True, False]) -@pytest.mark.parametrize("ignore_na", [True, False]) -def test_ewm_consistency(consistency_data, min_periods, adjust, ignore_na): - def _weights(s, com, adjust, ignore_na): - if isinstance(s, DataFrame): - if not len(s.columns): - return DataFrame(index=s.index, columns=s.columns) - w = concat( - [ - _weights(s.iloc[:, i], com=com, adjust=adjust, ignore_na=ignore_na) - for i, _ in enumerate(s.columns) - ], - axis=1, - ) - w.index = s.index - w.columns = s.columns - return w - - w = Series(np.nan, index=s.index) - alpha = 1.0 / (1.0 + com) - if ignore_na: - w[s.notna()] = _weights( - s[s.notna()], com=com, adjust=adjust, ignore_na=False - ) - elif adjust: - for i in range(len(s)): - if s.iat[i] == s.iat[i]: - w.iat[i] = pow(1.0 / (1.0 - alpha), i) - else: - sum_wts = 0.0 - prev_i = -1 - for i in range(len(s)): - if s.iat[i] == s.iat[i]: - if prev_i == -1: - w.iat[i] = 1.0 - else: - w.iat[i] = alpha * sum_wts / pow(1.0 - alpha, i - prev_i) - sum_wts += w.iat[i] - prev_i = i - return w - - def _variance_debiasing_factors(s, com, adjust, ignore_na): - weights = _weights(s, com=com, adjust=adjust, ignore_na=ignore_na) - cum_sum = weights.cumsum().fillna(method="ffill") - cum_sum_sq = (weights * weights).cumsum().fillna(method="ffill") - numerator = cum_sum * cum_sum - denominator = numerator - cum_sum_sq - denominator[denominator <= 0.0] = np.nan - return numerator / denominator - - def _ewma(s, com, min_periods, adjust, ignore_na): - weights = _weights(s, com=com, adjust=adjust, ignore_na=ignore_na) - result = ( - s.multiply(weights).cumsum().divide(weights.cumsum()).fillna(method="ffill") - ) - result[ - s.expanding().count() < (max(min_periods, 1) if min_periods else 1) - ] = np.nan - return result - - x, is_constant, no_nans = consistency_data - com = 3.0 - moments_consistency_mock_mean( - x=x, - mean=lambda x: x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).mean(), - mock_mean=lambda x: _ewma( - x, com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ), - ) - - moments_consistency_is_constant( - x=x, - is_constant=is_constant, - min_periods=min_periods, - count=lambda x: x.expanding().count(), - mean=lambda x: x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).mean(), - corr=lambda x, y: x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).corr(y), - ) - - moments_consistency_var_debiasing_factors( - x=x, - var_unbiased=lambda x: ( - x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).var(bias=False) - ), - var_biased=lambda x: ( - x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).var(bias=True) - ), - var_debiasing_factors=lambda x: ( - _variance_debiasing_factors(x, com=com, adjust=adjust, ignore_na=ignore_na) - ), - ) - - -@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) -@pytest.mark.parametrize("adjust", [True, False]) -@pytest.mark.parametrize("ignore_na", [True, False]) -def test_ewm_consistency_var(consistency_data, min_periods, adjust, ignore_na): - x, is_constant, no_nans = consistency_data - com = 3.0 - moments_consistency_var_data( - x=x, - is_constant=is_constant, - min_periods=min_periods, - count=lambda x: x.expanding().count(), - mean=lambda x: x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).mean(), - var_unbiased=lambda x: ( - x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).var(bias=False) - ), - var_biased=lambda x: ( - x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).var(bias=True) - ), - ) - - -@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) -@pytest.mark.parametrize("adjust", [True, False]) -@pytest.mark.parametrize("ignore_na", [True, False]) -def test_ewm_consistency_std(consistency_data, min_periods, adjust, ignore_na): - x, is_constant, no_nans = consistency_data - com = 3.0 - moments_consistency_std_data( - x=x, - var_unbiased=lambda x: ( - x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).var(bias=False) - ), - std_unbiased=lambda x: ( - x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).std(bias=False) - ), - var_biased=lambda x: ( - x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).var(bias=True) - ), - std_biased=lambda x: x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).std(bias=True), - ) - - -@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) -@pytest.mark.parametrize("adjust", [True, False]) -@pytest.mark.parametrize("ignore_na", [True, False]) -def test_ewm_consistency_cov(consistency_data, min_periods, adjust, ignore_na): - x, is_constant, no_nans = consistency_data - com = 3.0 - moments_consistency_cov_data( - x=x, - var_unbiased=lambda x: ( - x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).var(bias=False) - ), - cov_unbiased=lambda x, y: ( - x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).cov(y, bias=False) - ), - var_biased=lambda x: ( - x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).var(bias=True) - ), - cov_biased=lambda x, y: ( - x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).cov(y, bias=True) - ), - ) - - -@pytest.mark.slow -@pytest.mark.parametrize("min_periods", [0, 1, 2, 3, 4]) -@pytest.mark.parametrize("adjust", [True, False]) -@pytest.mark.parametrize("ignore_na", [True, False]) -def test_ewm_consistency_series_data(consistency_data, min_periods, adjust, ignore_na): - x, is_constant, no_nans = consistency_data - com = 3.0 - moments_consistency_series_data( - x=x, - mean=lambda x: x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).mean(), - corr=lambda x, y: x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).corr(y), - var_unbiased=lambda x: ( - x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).var(bias=False) - ), - std_unbiased=lambda x: ( - x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).std(bias=False) - ), - cov_unbiased=lambda x, y: ( - x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).cov(y, bias=False) - ), - var_biased=lambda x: ( - x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).var(bias=True) - ), - std_biased=lambda x: x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).std(bias=True), - cov_biased=lambda x, y: ( - x.ewm( - com=com, min_periods=min_periods, adjust=adjust, ignore_na=ignore_na - ).cov(y, bias=True) - ), - ) diff --git a/pandas/tests/window/moments/test_moments_rolling.py b/pandas/tests/window/moments/test_moments_rolling.py index 43618d7676731..399b76e92fc4f 100644 --- a/pandas/tests/window/moments/test_moments_rolling.py +++ b/pandas/tests/window/moments/test_moments_rolling.py @@ -1,5 +1,4 @@ import copy -from datetime import datetime import warnings import numpy as np @@ -9,20 +8,9 @@ import pandas.util._test_decorators as td import pandas as pd -from pandas import DataFrame, DatetimeIndex, Index, Series, isna, notna +from pandas import DataFrame, Series, isna, notna import pandas._testing as tm -from pandas.core.window.common import _flex_binary_moment -from pandas.tests.window.common import ( - Base, - check_pairwise_moment, - moments_consistency_cov_data, - moments_consistency_is_constant, - moments_consistency_mock_mean, - moments_consistency_series_data, - moments_consistency_std_data, - moments_consistency_var_data, - moments_consistency_var_debiasing_factors, -) +from pandas.tests.window.common import Base import pandas.tseries.offsets as offsets @@ -931,679 +919,3 @@ def get_result(obj, window, min_periods=None, center=False): frame_xp = frame_xp.fillna(fill_value) tm.assert_series_equal(series_xp, series_rs) tm.assert_frame_equal(frame_xp, frame_rs) - - -def _rolling_consistency_cases(): - for window in [1, 2, 3, 10, 20]: - for min_periods in {0, 1, 2, 3, 4, window}: - if min_periods and (min_periods > window): - continue - for center in [False, True]: - yield window, min_periods, center - - -class TestRollingMomentsConsistency(Base): - def setup_method(self, method): - self._create_data() - - # binary moments - def test_rolling_cov(self): - A = self.series - B = A + randn(len(A)) - - result = A.rolling(window=50, min_periods=25).cov(B) - tm.assert_almost_equal(result[-1], np.cov(A[-50:], B[-50:])[0, 1]) - - def test_rolling_corr(self): - A = self.series - B = A + randn(len(A)) - - result = A.rolling(window=50, min_periods=25).corr(B) - tm.assert_almost_equal(result[-1], np.corrcoef(A[-50:], B[-50:])[0, 1]) - - # test for correct bias correction - a = tm.makeTimeSeries() - b = tm.makeTimeSeries() - a[:5] = np.nan - b[:10] = np.nan - - result = a.rolling(window=len(a), min_periods=1).corr(b) - tm.assert_almost_equal(result[-1], a.corr(b)) - - @pytest.mark.parametrize("func", ["cov", "corr"]) - def test_rolling_pairwise_cov_corr(self, func): - check_pairwise_moment(self.frame, "rolling", func, window=10, min_periods=5) - - @pytest.mark.parametrize("method", ["corr", "cov"]) - def test_flex_binary_frame(self, method): - series = self.frame[1] - - res = getattr(series.rolling(window=10), method)(self.frame) - res2 = getattr(self.frame.rolling(window=10), method)(series) - exp = self.frame.apply(lambda x: getattr(series.rolling(window=10), method)(x)) - - tm.assert_frame_equal(res, exp) - tm.assert_frame_equal(res2, exp) - - frame2 = self.frame.copy() - frame2.values[:] = np.random.randn(*frame2.shape) - - res3 = getattr(self.frame.rolling(window=10), method)(frame2) - exp = DataFrame( - { - k: getattr(self.frame[k].rolling(window=10), method)(frame2[k]) - for k in self.frame - } - ) - tm.assert_frame_equal(res3, exp) - - -@pytest.mark.slow -@pytest.mark.parametrize( - "window,min_periods,center", list(_rolling_consistency_cases()) -) -def test_rolling_apply_consistency( - consistency_data, base_functions, no_nan_functions, window, min_periods, center -): - x, is_constant, no_nans = consistency_data - - with warnings.catch_warnings(): - warnings.filterwarnings( - "ignore", message=".*(empty slice|0 for slice).*", category=RuntimeWarning, - ) - # test consistency between rolling_xyz() and either (a) - # rolling_apply of Series.xyz(), or (b) rolling_apply of - # np.nanxyz() - functions = base_functions - - # GH 8269 - if no_nans: - functions = no_nan_functions + base_functions - for (f, require_min_periods, name) in functions: - rolling_f = getattr( - x.rolling(window=window, center=center, min_periods=min_periods), name, - ) - - if ( - require_min_periods - and (min_periods is not None) - and (min_periods < require_min_periods) - ): - continue - - if name == "count": - rolling_f_result = rolling_f() - rolling_apply_f_result = x.rolling( - window=window, min_periods=min_periods, center=center - ).apply(func=f, raw=True) - else: - if name in ["cov", "corr"]: - rolling_f_result = rolling_f(pairwise=False) - else: - rolling_f_result = rolling_f() - rolling_apply_f_result = x.rolling( - window=window, min_periods=min_periods, center=center - ).apply(func=f, raw=True) - - # GH 9422 - if name in ["sum", "prod"]: - tm.assert_equal(rolling_f_result, rolling_apply_f_result) - - -@pytest.mark.parametrize("window", range(7)) -def test_rolling_corr_with_zero_variance(window): - # GH 18430 - s = pd.Series(np.zeros(20)) - other = pd.Series(np.arange(20)) - - assert s.rolling(window=window).corr(other=other).isna().all() - - -def test_flex_binary_moment(): - # GH3155 - # don't blow the stack - msg = "arguments to moment function must be of type np.ndarray/Series/DataFrame" - with pytest.raises(TypeError, match=msg): - _flex_binary_moment(5, 6, None) - - -def test_corr_sanity(): - # GH 3155 - df = DataFrame( - np.array( - [ - [0.87024726, 0.18505595], - [0.64355431, 0.3091617], - [0.92372966, 0.50552513], - [0.00203756, 0.04520709], - [0.84780328, 0.33394331], - [0.78369152, 0.63919667], - ] - ) - ) - - res = df[0].rolling(5, center=True).corr(df[1]) - assert all(np.abs(np.nan_to_num(x)) <= 1 for x in res) - - # and some fuzzing - for _ in range(10): - df = DataFrame(np.random.rand(30, 2)) - res = df[0].rolling(5, center=True).corr(df[1]) - try: - assert all(np.abs(np.nan_to_num(x)) <= 1 for x in res) - except AssertionError: - print(res) - - -def test_rolling_cov_diff_length(): - # GH 7512 - s1 = Series([1, 2, 3], index=[0, 1, 2]) - s2 = Series([1, 3], index=[0, 2]) - result = s1.rolling(window=3, min_periods=2).cov(s2) - expected = Series([None, None, 2.0]) - tm.assert_series_equal(result, expected) - - s2a = Series([1, None, 3], index=[0, 1, 2]) - result = s1.rolling(window=3, min_periods=2).cov(s2a) - tm.assert_series_equal(result, expected) - - -def test_rolling_corr_diff_length(): - # GH 7512 - s1 = Series([1, 2, 3], index=[0, 1, 2]) - s2 = Series([1, 3], index=[0, 2]) - result = s1.rolling(window=3, min_periods=2).corr(s2) - expected = Series([None, None, 1.0]) - tm.assert_series_equal(result, expected) - - s2a = Series([1, None, 3], index=[0, 1, 2]) - result = s1.rolling(window=3, min_periods=2).corr(s2a) - tm.assert_series_equal(result, expected) - - -@pytest.mark.parametrize( - "f", - [ - lambda x: x.rolling(window=10, min_periods=5).cov(x, pairwise=False), - lambda x: x.rolling(window=10, min_periods=5).corr(x, pairwise=False), - lambda x: x.rolling(window=10, min_periods=5).max(), - lambda x: x.rolling(window=10, min_periods=5).min(), - lambda x: x.rolling(window=10, min_periods=5).sum(), - lambda x: x.rolling(window=10, min_periods=5).mean(), - lambda x: x.rolling(window=10, min_periods=5).std(), - lambda x: x.rolling(window=10, min_periods=5).var(), - lambda x: x.rolling(window=10, min_periods=5).skew(), - lambda x: x.rolling(window=10, min_periods=5).kurt(), - lambda x: x.rolling(window=10, min_periods=5).quantile(quantile=0.5), - lambda x: x.rolling(window=10, min_periods=5).median(), - lambda x: x.rolling(window=10, min_periods=5).apply(sum, raw=False), - lambda x: x.rolling(window=10, min_periods=5).apply(sum, raw=True), - lambda x: x.rolling(win_type="boxcar", window=10, min_periods=5).mean(), - ], -) -@td.skip_if_no_scipy -def test_rolling_functions_window_non_shrinkage(f): - # GH 7764 - s = Series(range(4)) - s_expected = Series(np.nan, index=s.index) - df = DataFrame([[1, 5], [3, 2], [3, 9], [-1, 0]], columns=["A", "B"]) - df_expected = DataFrame(np.nan, index=df.index, columns=df.columns) - - s_result = f(s) - tm.assert_series_equal(s_result, s_expected) - - df_result = f(df) - tm.assert_frame_equal(df_result, df_expected) - - -def test_rolling_functions_window_non_shrinkage_binary(): - - # corr/cov return a MI DataFrame - df = DataFrame( - [[1, 5], [3, 2], [3, 9], [-1, 0]], - columns=Index(["A", "B"], name="foo"), - index=Index(range(4), name="bar"), - ) - df_expected = DataFrame( - columns=Index(["A", "B"], name="foo"), - index=pd.MultiIndex.from_product([df.index, df.columns], names=["bar", "foo"]), - dtype="float64", - ) - functions = [ - lambda x: (x.rolling(window=10, min_periods=5).cov(x, pairwise=True)), - lambda x: (x.rolling(window=10, min_periods=5).corr(x, pairwise=True)), - ] - for f in functions: - df_result = f(df) - tm.assert_frame_equal(df_result, df_expected) - - -def test_rolling_skew_edge_cases(): - - all_nan = Series([np.NaN] * 5) - - # yields all NaN (0 variance) - d = Series([1] * 5) - x = d.rolling(window=5).skew() - tm.assert_series_equal(all_nan, x) - - # yields all NaN (window too small) - d = Series(np.random.randn(5)) - x = d.rolling(window=2).skew() - tm.assert_series_equal(all_nan, x) - - # yields [NaN, NaN, NaN, 0.177994, 1.548824] - d = Series([-1.50837035, -0.1297039, 0.19501095, 1.73508164, 0.41941401]) - expected = Series([np.NaN, np.NaN, np.NaN, 0.177994, 1.548824]) - x = d.rolling(window=4).skew() - tm.assert_series_equal(expected, x) - - -def test_rolling_kurt_edge_cases(): - - all_nan = Series([np.NaN] * 5) - - # yields all NaN (0 variance) - d = Series([1] * 5) - x = d.rolling(window=5).kurt() - tm.assert_series_equal(all_nan, x) - - # yields all NaN (window too small) - d = Series(np.random.randn(5)) - x = d.rolling(window=3).kurt() - tm.assert_series_equal(all_nan, x) - - # yields [NaN, NaN, NaN, 1.224307, 2.671499] - d = Series([-1.50837035, -0.1297039, 0.19501095, 1.73508164, 0.41941401]) - expected = Series([np.NaN, np.NaN, np.NaN, 1.224307, 2.671499]) - x = d.rolling(window=4).kurt() - tm.assert_series_equal(expected, x) - - -def test_rolling_skew_eq_value_fperr(): - # #18804 all rolling skew for all equal values should return Nan - a = Series([1.1] * 15).rolling(window=10).skew() - assert np.isnan(a).all() - - -def test_rolling_kurt_eq_value_fperr(): - # #18804 all rolling kurt for all equal values should return Nan - a = Series([1.1] * 15).rolling(window=10).kurt() - assert np.isnan(a).all() - - -def test_rolling_max_gh6297(): - """Replicate result expected in GH #6297""" - indices = [datetime(1975, 1, i) for i in range(1, 6)] - # So that we can have 2 datapoints on one of the days - indices.append(datetime(1975, 1, 3, 6, 0)) - series = Series(range(1, 7), index=indices) - # Use floats instead of ints as values - series = series.map(lambda x: float(x)) - # Sort chronologically - series = series.sort_index() - - expected = Series( - [1.0, 2.0, 6.0, 4.0, 5.0], - index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), - ) - x = series.resample("D").max().rolling(window=1).max() - tm.assert_series_equal(expected, x) - - -def test_rolling_max_resample(): - - indices = [datetime(1975, 1, i) for i in range(1, 6)] - # So that we can have 3 datapoints on last day (4, 10, and 20) - indices.append(datetime(1975, 1, 5, 1)) - indices.append(datetime(1975, 1, 5, 2)) - series = Series(list(range(0, 5)) + [10, 20], index=indices) - # Use floats instead of ints as values - series = series.map(lambda x: float(x)) - # Sort chronologically - series = series.sort_index() - - # Default how should be max - expected = Series( - [0.0, 1.0, 2.0, 3.0, 20.0], - index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), - ) - x = series.resample("D").max().rolling(window=1).max() - tm.assert_series_equal(expected, x) - - # Now specify median (10.0) - expected = Series( - [0.0, 1.0, 2.0, 3.0, 10.0], - index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), - ) - x = series.resample("D").median().rolling(window=1).max() - tm.assert_series_equal(expected, x) - - # Now specify mean (4+10+20)/3 - v = (4.0 + 10.0 + 20.0) / 3.0 - expected = Series( - [0.0, 1.0, 2.0, 3.0, v], - index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), - ) - x = series.resample("D").mean().rolling(window=1).max() - tm.assert_series_equal(expected, x) - - -def test_rolling_min_resample(): - - indices = [datetime(1975, 1, i) for i in range(1, 6)] - # So that we can have 3 datapoints on last day (4, 10, and 20) - indices.append(datetime(1975, 1, 5, 1)) - indices.append(datetime(1975, 1, 5, 2)) - series = Series(list(range(0, 5)) + [10, 20], index=indices) - # Use floats instead of ints as values - series = series.map(lambda x: float(x)) - # Sort chronologically - series = series.sort_index() - - # Default how should be min - expected = Series( - [0.0, 1.0, 2.0, 3.0, 4.0], - index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), - ) - r = series.resample("D").min().rolling(window=1) - tm.assert_series_equal(expected, r.min()) - - -def test_rolling_median_resample(): - - indices = [datetime(1975, 1, i) for i in range(1, 6)] - # So that we can have 3 datapoints on last day (4, 10, and 20) - indices.append(datetime(1975, 1, 5, 1)) - indices.append(datetime(1975, 1, 5, 2)) - series = Series(list(range(0, 5)) + [10, 20], index=indices) - # Use floats instead of ints as values - series = series.map(lambda x: float(x)) - # Sort chronologically - series = series.sort_index() - - # Default how should be median - expected = Series( - [0.0, 1.0, 2.0, 3.0, 10], - index=DatetimeIndex([datetime(1975, 1, i, 0) for i in range(1, 6)], freq="D"), - ) - x = series.resample("D").median().rolling(window=1).median() - tm.assert_series_equal(expected, x) - - -def test_rolling_median_memory_error(): - # GH11722 - n = 20000 - Series(np.random.randn(n)).rolling(window=2, center=False).median() - Series(np.random.randn(n)).rolling(window=2, center=False).median() - - -def test_rolling_min_max_numeric_types(): - - # GH12373 - types_test = [np.dtype(f"f{width}") for width in [4, 8]] - types_test.extend( - [np.dtype(f"{sign}{width}") for width in [1, 2, 4, 8] for sign in "ui"] - ) - for data_type in types_test: - # Just testing that these don't throw exceptions and that - # the return type is float64. Other tests will cover quantitative - # correctness - result = DataFrame(np.arange(20, dtype=data_type)).rolling(window=5).max() - assert result.dtypes[0] == np.dtype("f8") - result = DataFrame(np.arange(20, dtype=data_type)).rolling(window=5).min() - assert result.dtypes[0] == np.dtype("f8") - - -def test_moment_functions_zero_length(): - # GH 8056 - s = Series(dtype=np.float64) - s_expected = s - df1 = DataFrame() - df1_expected = df1 - df2 = DataFrame(columns=["a"]) - df2["a"] = df2["a"].astype("float64") - df2_expected = df2 - - functions = [ - lambda x: x.rolling(window=10).count(), - lambda x: x.rolling(window=10, min_periods=5).cov(x, pairwise=False), - lambda x: x.rolling(window=10, min_periods=5).corr(x, pairwise=False), - lambda x: x.rolling(window=10, min_periods=5).max(), - lambda x: x.rolling(window=10, min_periods=5).min(), - lambda x: x.rolling(window=10, min_periods=5).sum(), - lambda x: x.rolling(window=10, min_periods=5).mean(), - lambda x: x.rolling(window=10, min_periods=5).std(), - lambda x: x.rolling(window=10, min_periods=5).var(), - lambda x: x.rolling(window=10, min_periods=5).skew(), - lambda x: x.rolling(window=10, min_periods=5).kurt(), - lambda x: x.rolling(window=10, min_periods=5).quantile(0.5), - lambda x: x.rolling(window=10, min_periods=5).median(), - lambda x: x.rolling(window=10, min_periods=5).apply(sum, raw=False), - lambda x: x.rolling(window=10, min_periods=5).apply(sum, raw=True), - lambda x: x.rolling(win_type="boxcar", window=10, min_periods=5).mean(), - ] - for f in functions: - try: - s_result = f(s) - tm.assert_series_equal(s_result, s_expected) - - df1_result = f(df1) - tm.assert_frame_equal(df1_result, df1_expected) - - df2_result = f(df2) - tm.assert_frame_equal(df2_result, df2_expected) - except (ImportError): - - # scipy needed for rolling_window - continue - - -def test_moment_functions_zero_length_pairwise(): - - df1 = DataFrame() - df2 = DataFrame(columns=Index(["a"], name="foo"), index=Index([], name="bar")) - df2["a"] = df2["a"].astype("float64") - - df1_expected = DataFrame( - index=pd.MultiIndex.from_product([df1.index, df1.columns]), columns=Index([]), - ) - df2_expected = DataFrame( - index=pd.MultiIndex.from_product( - [df2.index, df2.columns], names=["bar", "foo"] - ), - columns=Index(["a"], name="foo"), - dtype="float64", - ) - - functions = [ - lambda x: (x.rolling(window=10, min_periods=5).cov(x, pairwise=True)), - lambda x: (x.rolling(window=10, min_periods=5).corr(x, pairwise=True)), - ] - - for f in functions: - df1_result = f(df1) - tm.assert_frame_equal(df1_result, df1_expected) - - df2_result = f(df2) - tm.assert_frame_equal(df2_result, df2_expected) - - -@pytest.mark.slow -@pytest.mark.parametrize( - "window,min_periods,center", list(_rolling_consistency_cases()) -) -def test_rolling_consistency_var(consistency_data, window, min_periods, center): - x, is_constant, no_nans = consistency_data - moments_consistency_var_data( - x=x, - is_constant=is_constant, - min_periods=min_periods, - count=lambda x: ( - x.rolling(window=window, min_periods=min_periods, center=center).count() - ), - mean=lambda x: ( - x.rolling(window=window, min_periods=min_periods, center=center).mean() - ), - var_unbiased=lambda x: ( - x.rolling(window=window, min_periods=min_periods, center=center).var() - ), - var_biased=lambda x: ( - x.rolling(window=window, min_periods=min_periods, center=center).var(ddof=0) - ), - ) - - -@pytest.mark.slow -@pytest.mark.parametrize( - "window,min_periods,center", list(_rolling_consistency_cases()) -) -def test_rolling_consistency_std(consistency_data, window, min_periods, center): - x, is_constant, no_nans = consistency_data - moments_consistency_std_data( - x=x, - var_unbiased=lambda x: ( - x.rolling(window=window, min_periods=min_periods, center=center).var() - ), - std_unbiased=lambda x: ( - x.rolling(window=window, min_periods=min_periods, center=center).std() - ), - var_biased=lambda x: ( - x.rolling(window=window, min_periods=min_periods, center=center).var(ddof=0) - ), - std_biased=lambda x: ( - x.rolling(window=window, min_periods=min_periods, center=center).std(ddof=0) - ), - ) - - -@pytest.mark.slow -@pytest.mark.parametrize( - "window,min_periods,center", list(_rolling_consistency_cases()) -) -def test_rolling_consistency_cov(consistency_data, window, min_periods, center): - x, is_constant, no_nans = consistency_data - moments_consistency_cov_data( - x=x, - var_unbiased=lambda x: ( - x.rolling(window=window, min_periods=min_periods, center=center).var() - ), - cov_unbiased=lambda x, y: ( - x.rolling(window=window, min_periods=min_periods, center=center).cov(y) - ), - var_biased=lambda x: ( - x.rolling(window=window, min_periods=min_periods, center=center).var(ddof=0) - ), - cov_biased=lambda x, y: ( - x.rolling(window=window, min_periods=min_periods, center=center).cov( - y, ddof=0 - ) - ), - ) - - -@pytest.mark.slow -@pytest.mark.parametrize( - "window,min_periods,center", list(_rolling_consistency_cases()) -) -def test_rolling_consistency_series(consistency_data, window, min_periods, center): - x, is_constant, no_nans = consistency_data - moments_consistency_series_data( - x=x, - mean=lambda x: ( - x.rolling(window=window, min_periods=min_periods, center=center).mean() - ), - corr=lambda x, y: ( - x.rolling(window=window, min_periods=min_periods, center=center).corr(y) - ), - var_unbiased=lambda x: ( - x.rolling(window=window, min_periods=min_periods, center=center).var() - ), - std_unbiased=lambda x: ( - x.rolling(window=window, min_periods=min_periods, center=center).std() - ), - cov_unbiased=lambda x, y: ( - x.rolling(window=window, min_periods=min_periods, center=center).cov(y) - ), - var_biased=lambda x: ( - x.rolling(window=window, min_periods=min_periods, center=center).var(ddof=0) - ), - std_biased=lambda x: ( - x.rolling(window=window, min_periods=min_periods, center=center).std(ddof=0) - ), - cov_biased=lambda x, y: ( - x.rolling(window=window, min_periods=min_periods, center=center).cov( - y, ddof=0 - ) - ), - ) - - -@pytest.mark.slow -@pytest.mark.parametrize( - "window,min_periods,center", list(_rolling_consistency_cases()) -) -def test_rolling_consistency(consistency_data, window, min_periods, center): - x, is_constant, no_nans = consistency_data - # suppress warnings about empty slices, as we are deliberately testing - # with empty/0-length Series/DataFrames - with warnings.catch_warnings(): - warnings.filterwarnings( - "ignore", message=".*(empty slice|0 for slice).*", category=RuntimeWarning, - ) - - # test consistency between different rolling_* moments - moments_consistency_mock_mean( - x=x, - mean=lambda x: ( - x.rolling(window=window, min_periods=min_periods, center=center).mean() - ), - mock_mean=lambda x: ( - x.rolling(window=window, min_periods=min_periods, center=center) - .sum() - .divide( - x.rolling( - window=window, min_periods=min_periods, center=center - ).count() - ) - ), - ) - - moments_consistency_is_constant( - x=x, - is_constant=is_constant, - min_periods=min_periods, - count=lambda x: ( - x.rolling(window=window, min_periods=min_periods, center=center).count() - ), - mean=lambda x: ( - x.rolling(window=window, min_periods=min_periods, center=center).mean() - ), - corr=lambda x, y: ( - x.rolling(window=window, min_periods=min_periods, center=center).corr(y) - ), - ) - - moments_consistency_var_debiasing_factors( - x=x, - var_unbiased=lambda x: ( - x.rolling(window=window, min_periods=min_periods, center=center).var() - ), - var_biased=lambda x: ( - x.rolling(window=window, min_periods=min_periods, center=center).var( - ddof=0 - ) - ), - var_debiasing_factors=lambda x: ( - x.rolling(window=window, min_periods=min_periods, center=center) - .count() - .divide( - ( - x.rolling( - window=window, min_periods=min_periods, center=center - ).count() - - 1.0 - ).replace(0.0, np.nan) - ) - ), - )