Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…eries.rolling`, `DataFrame.rolling`

functionality to enable rolling window operations

Signed-off-by: Alexey Prutskov <[email protected]>
  • Loading branch information
prutskov authored and aregm committed Sep 16, 2020
1 parent 2ef9c43 commit ee223d6
Show file tree
Hide file tree
Showing 9 changed files with 601 additions and 17 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ jobs:
if: matrix.part != 3
- run: python -m pytest modin/pandas/test/test_series.py
if: matrix.part == 3
- run: python -m pytest modin/pandas/test/test_rolling.py
if: matrix.part == 3
- run: python -m pytest modin/pandas/test/test_concat.py
if: matrix.part == 3
- run: python -m pytest modin/pandas/test/test_groupby.py
Expand Down Expand Up @@ -149,6 +151,8 @@ jobs:
if: matrix.part != 3
- run: python -m pytest modin/pandas/test/test_series.py
if: matrix.part == 3
- run: python -m pytest modin/pandas/test/test_rolling.py
if: matrix.part == 3
- run: python -m pytest modin/pandas/test/test_concat.py
if: matrix.part == 3
- run: python -m pytest modin/pandas/test/test_groupby.py
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ jobs:
if: matrix.part != 3
- run: python -m pytest modin/pandas/test/test_series.py
if: matrix.part == 3
- run: python -m pytest modin/pandas/test/test_rolling.py
if: matrix.part == 3
- run: python -m pytest modin/pandas/test/test_concat.py
if: matrix.part == 3
- run: python -m pytest modin/pandas/test/test_groupby.py
Expand Down Expand Up @@ -63,6 +65,8 @@ jobs:
if: matrix.part != 3
- run: python -m pytest modin/pandas/test/test_series.py
if: matrix.part == 3
- run: python -m pytest modin/pandas/test/test_rolling.py
if: matrix.part == 3
- run: python -m pytest modin/pandas/test/test_concat.py
if: matrix.part == 3
- run: python -m pytest modin/pandas/test/test_groupby.py
Expand Down
2 changes: 1 addition & 1 deletion docs/supported_apis/dataframe_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ default to pandas.
+----------------------------+---------------------------+------------------------+----------------------------------------------------+
| ``rmul`` | `rmul`_ | Y | See ``add`` |
+----------------------------+---------------------------+------------------------+----------------------------------------------------+
| ``rolling`` | `rolling`_ | D | |
| ``rolling`` | `rolling`_ | Y | |
+----------------------------+---------------------------+------------------------+----------------------------------------------------+
| ``round`` | `round`_ | Y | |
+----------------------------+---------------------------+------------------------+----------------------------------------------------+
Expand Down
2 changes: 1 addition & 1 deletion docs/supported_apis/series_supported.rst
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ the related section on `Defaulting to pandas`_.
+-----------------------------+---------------------------------+
| ``rmul`` | Y |
+-----------------------------+---------------------------------+
| ``rolling`` | D |
| ``rolling`` | Y |
+-----------------------------+---------------------------------+
| ``round`` | Y |
+-----------------------------+---------------------------------+
Expand Down
130 changes: 130 additions & 0 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,136 @@ def resample_var(self, resample_args, ddof, *args, **kwargs):
def resample_quantile(self, resample_args, q, **kwargs):
return self._resample_func(resample_args, "quantile", q=q, **kwargs)

window_mean = FoldFunction.register(
lambda df, rolling_args, *args, **kwargs: pandas.DataFrame(
df.rolling(*rolling_args).mean(*args, **kwargs)
)
)
window_sum = FoldFunction.register(
lambda df, rolling_args, *args, **kwargs: pandas.DataFrame(
df.rolling(*rolling_args).sum(*args, **kwargs)
)
)
window_var = FoldFunction.register(
lambda df, rolling_args, ddof, *args, **kwargs: pandas.DataFrame(
df.rolling(*rolling_args).var(ddof=ddof, *args, **kwargs)
)
)
window_std = FoldFunction.register(
lambda df, rolling_args, ddof, *args, **kwargs: pandas.DataFrame(
df.rolling(*rolling_args).std(ddof=ddof, *args, **kwargs)
)
)
rolling_count = FoldFunction.register(
lambda df, rolling_args: pandas.DataFrame(df.rolling(*rolling_args).count())
)
rolling_sum = FoldFunction.register(
lambda df, rolling_args, *args, **kwargs: pandas.DataFrame(
df.rolling(*rolling_args).sum(*args, **kwargs)
)
)
rolling_mean = FoldFunction.register(
lambda df, rolling_args, *args, **kwargs: pandas.DataFrame(
df.rolling(*rolling_args).mean(*args, **kwargs)
)
)
rolling_median = FoldFunction.register(
lambda df, rolling_args, **kwargs: pandas.DataFrame(
df.rolling(*rolling_args).median(**kwargs)
)
)
rolling_var = FoldFunction.register(
lambda df, rolling_args, ddof, *args, **kwargs: pandas.DataFrame(
df.rolling(*rolling_args).var(ddof=ddof, *args, **kwargs)
)
)
rolling_std = FoldFunction.register(
lambda df, rolling_args, ddof, *args, **kwargs: pandas.DataFrame(
df.rolling(*rolling_args).std(ddof=ddof, *args, **kwargs)
)
)
rolling_min = FoldFunction.register(
lambda df, rolling_args, *args, **kwargs: pandas.DataFrame(
df.rolling(*rolling_args).min(*args, **kwargs)
)
)
rolling_max = FoldFunction.register(
lambda df, rolling_args, *args, **kwargs: pandas.DataFrame(
df.rolling(*rolling_args).max(*args, **kwargs)
)
)
rolling_skew = FoldFunction.register(
lambda df, rolling_args, **kwargs: pandas.DataFrame(
df.rolling(*rolling_args).skew(**kwargs)
)
)
rolling_kurt = FoldFunction.register(
lambda df, rolling_args, **kwargs: pandas.DataFrame(
df.rolling(*rolling_args).kurt(**kwargs)
)
)
rolling_apply = FoldFunction.register(
lambda df, rolling_args, func, raw, engine, engine_kwargs, args, kwargs: pandas.DataFrame(
df.rolling(*rolling_args).apply(
func=func,
raw=raw,
engine=engine,
engine_kwargs=engine_kwargs,
args=args,
kwargs=kwargs,
)
)
)
rolling_quantile = FoldFunction.register(
lambda df, rolling_args, quantile, interpolation, **kwargs: pandas.DataFrame(
df.rolling(*rolling_args).quantile(
quantile=quantile, interpolation=interpolation, **kwargs
)
)
)

def rolling_corr(self, rolling_args, other, pairwise, *args, **kwargs):
if len(self.columns) > 1:
return self.default_to_pandas(
lambda df: pandas.DataFrame.rolling(df, *rolling_args).corr(
other=other, pairwise=pairwise, *args, **kwargs
)
)
else:
return FoldFunction.register(
lambda df: pandas.DataFrame(
df.rolling(*rolling_args).corr(
other=other, pairwise=pairwise, *args, **kwargs
)
)
)(self)

def rolling_cov(self, rolling_args, other, pairwise, ddof, **kwargs):
if len(self.columns) > 1:
return self.default_to_pandas(
lambda df: pandas.DataFrame.rolling(df, *rolling_args).cov(
other=other, pairwise=pairwise, ddof=ddof, **kwargs
)
)
else:
return FoldFunction.register(
lambda df: pandas.DataFrame(
df.rolling(*rolling_args).cov(
other=other, pairwise=pairwise, ddof=ddof, **kwargs
)
)
)(self)

def rolling_aggregate(self, rolling_args, func, *args, **kwargs):
new_modin_frame = self._modin_frame._apply_full_axis(
0,
lambda df: pandas.DataFrame(
df.rolling(*rolling_args).aggregate(func=func, *args, **kwargs)
),
new_index=self.index,
)
return self.__constructor__(new_modin_frame)

# Map partitions operations
# These operations are operations that apply a function to every partition.
abs = MapFunction.register(pandas.DataFrame.abs, dtypes="copy")
Expand Down
Loading

0 comments on commit ee223d6

Please sign in to comment.