diff --git a/python/cudf/cudf/_lib/aggregation.pyx b/python/cudf/cudf/_lib/aggregation.pyx index 097018fe3c0..4f703724cef 100644 --- a/python/cudf/cudf/_lib/aggregation.pyx +++ b/python/cudf/cudf/_lib/aggregation.pyx @@ -382,6 +382,24 @@ cdef class RollingAggregation: libcudf_aggregation.make_mean_aggregation[rolling_aggregation]()) return agg + @classmethod + def var(cls, ddof=1): + cdef RollingAggregation agg = cls() + agg.c_obj = move( + libcudf_aggregation.make_variance_aggregation[rolling_aggregation]( + ddof + ) + ) + return agg + + @classmethod + def std(cls, ddof=1): + cdef RollingAggregation agg = cls() + agg.c_obj = move( + libcudf_aggregation.make_std_aggregation[rolling_aggregation](ddof) + ) + return agg + @classmethod def count(cls, dropna=True): cdef libcudf_types.null_policy c_null_handling diff --git a/python/cudf/cudf/_lib/rolling.pyx b/python/cudf/cudf/_lib/rolling.pyx index 87c2fa6178f..b4b3384032c 100644 --- a/python/cudf/cudf/_lib/rolling.pyx +++ b/python/cudf/cudf/_lib/rolling.pyx @@ -17,8 +17,14 @@ from cudf._lib.cpp.rolling cimport rolling_window as cpp_rolling_window from cudf._lib.cpp.types cimport size_type -def rolling(Column source_column, Column pre_column_window, - Column fwd_column_window, window, min_periods, center, op): +def rolling(Column source_column, + Column pre_column_window, + Column fwd_column_window, + window, + min_periods, + center, + op, + agg_params): """ Rolling on input executing operation within the given window for each row @@ -33,6 +39,7 @@ def rolling(Column source_column, Column pre_column_window, center : Set the labels at the center of the window op : operation to be executed, as of now it supports MIN, MAX, COUNT, SUM, MEAN and UDF + agg_params : dict, parameter for the aggregation (e.g. ddof for VAR/STD) Returns ------- @@ -51,7 +58,7 @@ def rolling(Column source_column, Column pre_column_window, cython_agg = make_rolling_aggregation( op, {'dtype': source_column.dtype}) else: - cython_agg = make_rolling_aggregation(op) + cython_agg = make_rolling_aggregation(op, agg_params) if window is None: if center: diff --git a/python/cudf/cudf/core/window/rolling.py b/python/cudf/cudf/core/window/rolling.py index aa377f81735..a3d96cee051 100644 --- a/python/cudf/cudf/core/window/rolling.py +++ b/python/cudf/cudf/core/window/rolling.py @@ -177,6 +177,7 @@ def __init__( self.min_periods = min_periods self.center = center self._normalize() + self.agg_params = {} if axis != 0: raise NotImplementedError("axis != 0 is not supported yet.") self.axis = axis @@ -237,6 +238,7 @@ def _apply_agg_series(self, sr, agg_name): min_periods=min_periods, center=self.center, op=agg_name, + agg_params=self.agg_params, ) return sr._from_data({sr.name: result_col}, sr._index) @@ -266,6 +268,14 @@ def max(self): def mean(self): return self._apply_agg("mean") + def var(self, ddof=1): + self.agg_params["ddof"] = ddof + return self._apply_agg("var") + + def std(self, ddof=1): + self.agg_params["ddof"] = ddof + return self._apply_agg("std") + def count(self): return self._apply_agg("count") diff --git a/python/cudf/cudf/tests/test_rolling.py b/python/cudf/cudf/tests/test_rolling.py index 8a8293cd090..29272cbf876 100644 --- a/python/cudf/cudf/tests/test_rolling.py +++ b/python/cudf/cudf/tests/test_rolling.py @@ -9,6 +9,7 @@ import cudf from cudf.core._compat import PANDAS_GE_110 from cudf.testing._utils import assert_eq +from cudf.testing.dataset_generator import rand_dataframe @pytest.mark.parametrize( @@ -20,20 +21,23 @@ ([1, 2, 4, 9, 9, 4], ["a", "b", "c", "d", "e", "f"]), ], ) -@pytest.mark.parametrize("agg", ["sum", "min", "max", "mean", "count"]) +@pytest.mark.parametrize( + "agg", ["sum", "min", "max", "mean", "count", "std", "var"] +) @pytest.mark.parametrize("nulls", ["none", "one", "some", "all"]) @pytest.mark.parametrize("center", [True, False]) def test_rolling_series_basic(data, index, agg, nulls, center): + rng = np.random.default_rng(1) if PANDAS_GE_110: kwargs = {"check_freq": False} else: kwargs = {} if len(data) > 0: if nulls == "one": - p = np.random.randint(0, len(data)) + p = rng.integers(0, len(data)) data[p] = np.nan elif nulls == "some": - p1, p2 = np.random.randint(0, len(data), (2,)) + p1, p2 = rng.integers(0, len(data), (2,)) data[p1] = np.nan data[p2] = np.nan elif nulls == "all": @@ -64,19 +68,22 @@ def test_rolling_series_basic(data, index, agg, nulls, center): }, ], ) -@pytest.mark.parametrize("agg", ["sum", "min", "max", "mean", "count"]) +@pytest.mark.parametrize( + "agg", ["sum", "min", "max", "mean", "count", "std", "var"] +) @pytest.mark.parametrize("nulls", ["none", "one", "some", "all"]) @pytest.mark.parametrize("center", [True, False]) def test_rolling_dataframe_basic(data, agg, nulls, center): + rng = np.random.default_rng(0) pdf = pd.DataFrame(data) if len(pdf) > 0: for col_name in pdf.columns: if nulls == "one": - p = np.random.randint(0, len(data)) + p = rng.integers(0, len(data)) pdf[col_name][p] = np.nan elif nulls == "some": - p1, p2 = np.random.randint(0, len(data), (2,)) + p1, p2 = rng.integers(0, len(data), (2,)) pdf[col_name][p1] = np.nan pdf[col_name][p2] = np.nan elif nulls == "all": @@ -102,6 +109,8 @@ def test_rolling_dataframe_basic(data, agg, nulls, center): pytest.param("max"), pytest.param("mean"), pytest.param("count"), + pytest.param("std"), + pytest.param("var"), ], ) def test_rolling_with_offset(agg): @@ -124,6 +133,79 @@ def test_rolling_with_offset(agg): ) +@pytest.mark.parametrize("agg", ["std", "var"]) +@pytest.mark.parametrize("ddof", [0, 1]) +@pytest.mark.parametrize("center", [True, False]) +@pytest.mark.parametrize("seed", [100, 2000]) +@pytest.mark.parametrize("window_size", [2, 10, 100]) +def test_rolling_var_std_large(agg, ddof, center, seed, window_size): + if PANDAS_GE_110: + kwargs = {"check_freq": False} + else: + kwargs = {} + + iupper_bound = math.sqrt(np.iinfo(np.int64).max / window_size) + ilower_bound = -math.sqrt(abs(np.iinfo(np.int64).min) / window_size) + + fupper_bound = math.sqrt(np.finfo(np.float64).max / window_size) + flower_bound = -math.sqrt(abs(np.finfo(np.float64).min) / window_size) + + n_rows = 1_000 + data = rand_dataframe( + dtypes_meta=[ + { + "dtype": "int64", + "null_frequency": 0.4, + "cardinality": n_rows, + "min_bound": ilower_bound, + "max_bound": iupper_bound, + }, + { + "dtype": "float64", + "null_frequency": 0.4, + "cardinality": n_rows, + "min_bound": flower_bound, + "max_bound": fupper_bound, + }, + { + "dtype": "decimal64", + "null_frequency": 0.4, + "cardinality": n_rows, + "min_bound": ilower_bound, + "max_bound": iupper_bound, + }, + ], + rows=n_rows, + use_threads=False, + seed=seed, + ) + pdf = data.to_pandas() + gdf = cudf.from_pandas(pdf) + + expect = getattr(pdf.rolling(window_size, 1, center), agg)(ddof=ddof) + got = getattr(gdf.rolling(window_size, 1, center), agg)(ddof=ddof) + + assert_eq(expect, got, **kwargs) + + +@pytest.mark.xfail +def test_rolling_var_uniform_window(): + """ + Pandas adopts an online variance calculation algorithm. This gives a + floating point artifact. + https://github.com/pandas-dev/pandas/issues/37051 + + In cudf, each window is computed independently from the previous window, + this gives better numeric precision. + """ + + s = pd.Series([1e8, 5, 5, 5]) + expected = s.rolling(3).var() + got = cudf.from_pandas(s).rolling(3).var() + + assert_eq(expected, got) + + def test_rolling_count_with_offset(): """ This test covers the xfail case from test_rolling_with_offset["count"]. @@ -300,7 +382,9 @@ def some_func(A): ) -@pytest.mark.parametrize("agg", ["sum", "min", "max", "mean", "count"]) +@pytest.mark.parametrize( + "agg", ["sum", "min", "max", "mean", "count", "var", "std"] +) def test_rolling_groupby_simple(agg): pdf = pd.DataFrame( { @@ -330,7 +414,9 @@ def test_rolling_groupby_simple(agg): assert_eq(expect, got, check_dtype=False) -@pytest.mark.parametrize("agg", ["sum", "min", "max", "mean", "count"]) +@pytest.mark.parametrize( + "agg", ["sum", "min", "max", "mean", "count", "var", "std"] +) def test_rolling_groupby_multi(agg): pdf = pd.DataFrame( { @@ -351,7 +437,9 @@ def test_rolling_groupby_multi(agg): assert_eq(expect, got, check_dtype=False) -@pytest.mark.parametrize("agg", ["sum", "min", "max", "mean", "count"]) +@pytest.mark.parametrize( + "agg", ["sum", "min", "max", "mean", "count", "var", "std"] +) @pytest.mark.parametrize( "window_size", ["1d", "2d", "3d", "4d", "5d", "6d", "7d"] )