Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rolling window with as_strided #1837

Merged
merged 82 commits into from
Mar 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
789134c
Rolling_window for np.ndarray
fujiisoup Jan 16, 2018
fa4e857
Add pad method to Variable
fujiisoup Jan 17, 2018
52915f3
Added rolling_window to DataArray and Dataset
fujiisoup Jan 17, 2018
b622007
remove pad_value option. Support dask.rolling_window
fujiisoup Jan 18, 2018
36a1fe9
Refactor rolling.reduce
fujiisoup Jan 18, 2018
71fed0f
add as_strided to npcompat. Tests added for reduce(np.nanmean)
fujiisoup Jan 18, 2018
3960134
Support boolean in maybe_promote
fujiisoup Jan 18, 2018
4bd38f3
move rolling_window into duck_array_op. Make DataArray.rolling_window…
fujiisoup Jan 19, 2018
af8362e
Added to_dataarray and to_dataset to rolling object.
fujiisoup Jan 19, 2018
76db6b5
Use pad in rolling to make compatible to pandas. Expose pad_with_fill…
fujiisoup Jan 20, 2018
87f53af
Refactor rolling
fujiisoup Jan 20, 2018
c23cedb
flake8
fujiisoup Jan 20, 2018
9547c57
Added a comment for dask's pad.
fujiisoup Jan 20, 2018
1f71cff
Use fastpath in rolling.to_dataarray
fujiisoup Jan 20, 2018
724776f
Merge branch 'master' into rolling_window
fujiisoup Jan 20, 2018
73862eb
Doc added.
fujiisoup Jan 20, 2018
859bb5c
Revert not to use fastpath
fujiisoup Jan 20, 2018
d5fc24e
Merge branch 'master' into rolling_window
fujiisoup Jan 21, 2018
05c72f0
Remove maybe_prompt for Boolean. Some improvements based on @shoyer's…
fujiisoup Jan 21, 2018
d55e498
Update test.
fujiisoup Jan 21, 2018
9393eb2
Bug fix in test_rolling_count_correct
fujiisoup Jan 21, 2018
9c71a50
fill_value for boolean array
fujiisoup Jan 21, 2018
54975b4
rolling_window(array, axis, window) -> rolling_window(array, window, …
fujiisoup Jan 21, 2018
e907fdf
support stride in rolling.to_dataarray
fujiisoup Jan 21, 2018
6482536
flake8
fujiisoup Jan 21, 2018
b8def4f
Improve doc. Add DataArrayRolling to api.rst
fujiisoup Jan 21, 2018
ff31589
Improve docs in common.rolling.
fujiisoup Jan 21, 2018
6c011cb
Expose groupby docs to public
fujiisoup Jan 21, 2018
684145a
Default fill_value=dtypes.NA, stride=1. Add comment for DataArrayRollig.
fujiisoup Jan 21, 2018
3a7526e
Default fill_value=dtypes.NA, stride=1. Add comment for DataArrayRollig.
fujiisoup Jan 21, 2018
a0968d6
Add fill_value option to rolling.to_dataarray
fujiisoup Jan 22, 2018
ac4f00e
Convert non-numeric array in reduce.
fujiisoup Jan 22, 2018
fbfc262
Fill_value = False for boolean array in rolling.reduce
fujiisoup Jan 22, 2018
c757986
Support old numpy plus bottleneck combination. Suppress warning for a…
fujiisoup Jan 22, 2018
8fd5fa3
flake8
fujiisoup Jan 22, 2018
ade5ba2
Add benchmark
fujiisoup Jan 22, 2018
2d6897f
Dataset.count. Benchmark
fujiisoup Jan 23, 2018
6461f84
Classize benchmark
fujiisoup Jan 23, 2018
aece1c4
Decoratorize for asv benchmark
fujiisoup Jan 24, 2018
d5ad4a0
Merge branch 'master' into rolling_window
fujiisoup Jan 24, 2018
4189d71
Classize benchmarks/indexing.py
fujiisoup Jan 24, 2018
081c928
Working with nanreduce
fujiisoup Jan 27, 2018
75c1d7d
Support .sum for object dtype.
fujiisoup Jan 30, 2018
452b219
Remove unused if-statements.
fujiisoup Jan 30, 2018
c5490c4
Default skipna for rolling.reduce
fujiisoup Jan 30, 2018
ab91394
Pass tests. Test added to make sure the consistency to pandas' behavior.
fujiisoup Jan 30, 2018
9fa0812
Delete duplicate file. flake8
fujiisoup Jan 30, 2018
0c1d49a
flake8 again
fujiisoup Jan 30, 2018
9463937
Working with numpy<1.13
fujiisoup Jan 30, 2018
dce4e37
Revert "Classize benchmarks/indexing.py"
fujiisoup Feb 10, 2018
b3050cb
rolling_window with dask.ghost
fujiisoup Feb 10, 2018
22f6d4a
Merge branch 'rolling_window_dask' into rolling_window
fujiisoup Feb 10, 2018
19e0fca
Merge branch 'master' into rolling_window
fujiisoup Feb 15, 2018
d3b1e2b
Optimize rolling.count.
fujiisoup Feb 15, 2018
2d06ec9
Merge branch 'master' into rolling_window
fujiisoup Feb 15, 2018
734da93
Fixing style errors.
stickler-ci Feb 15, 2018
1a000b8
Remove unused npcompat.nansum etc
fujiisoup Feb 15, 2018
27ff67c
flake8
fujiisoup Feb 16, 2018
a2c7141
require_dask -> has_dask
fujiisoup Feb 16, 2018
35dee9d
npcompat -> np
fujiisoup Feb 16, 2018
137709f
flake8
fujiisoup Feb 16, 2018
cc82cdc
Skip tests for old numpy.
fujiisoup Feb 16, 2018
b246411
Improve doc. Optmize missing._get_valid_fill_mask
fujiisoup Feb 17, 2018
b3a2105
to_dataarray -> construct
fujiisoup Feb 18, 2018
b80fbfd
remove assert_allclose_with_nan
fujiisoup Feb 18, 2018
3c010ae
Fixing style errors.
stickler-ci Feb 18, 2018
ab82f75
typo
fujiisoup Feb 18, 2018
b9f10cd
`to_dataset` -> `construct`
fujiisoup Feb 18, 2018
cc9c3d6
Update doc
fujiisoup Feb 18, 2018
52cc48d
Merge branch 'master' into rolling_window
fujiisoup Feb 18, 2018
2954cdf
Change boundary and add comments for dask_rolling_window.
fujiisoup Feb 18, 2018
f19e531
Refactor dask_array_ops.rolling_window and np_utils.rolling_window
fujiisoup Feb 24, 2018
a074df3
flake8
fujiisoup Feb 24, 2018
f6f78a5
Simplify tests
fujiisoup Feb 24, 2018
0ec8aba
flake8 again.
fujiisoup Feb 25, 2018
0261cfe
cleanup roling_window for dask.
fujiisoup Feb 25, 2018
a91c27f
Merge branch 'master' into rolling_window
fujiisoup Feb 26, 2018
c83d588
remove duplicates
fujiisoup Feb 26, 2018
3bb4668
remvove duplicate
fujiisoup Feb 26, 2018
d0d89ce
flake8
fujiisoup Feb 26, 2018
eaba563
delete unnecessary file.
fujiisoup Feb 26, 2018
aeabdf5
Merge branch 'master' into rolling_window
fujiisoup Feb 28, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions asv_bench/benchmarks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@
_counter = itertools.count()


def parameterized(names, params):
def decorator(func):
func.param_names = names
func.params = params
return func
return decorator


def requires_dask():
try:
import dask # noqa
Expand Down
50 changes: 50 additions & 0 deletions asv_bench/benchmarks/rolling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
import pandas as pd
import xarray as xr

from . import parameterized, randn, requires_dask

nx = 3000
ny = 2000
nt = 1000
window = 20


class Rolling(object):
def setup(self, *args, **kwargs):
self.ds = xr.Dataset(
{'var1': (('x', 'y'), randn((nx, ny), frac_nan=0.1)),
'var2': (('x', 't'), randn((nx, nt))),
'var3': (('t', ), randn(nt))},
coords={'x': np.arange(nx),
'y': np.linspace(0, 1, ny),
't': pd.date_range('1970-01-01', periods=nt, freq='D'),
'x_coords': ('x', np.linspace(1.1, 2.1, nx))})

@parameterized(['func', 'center'],
(['mean', 'count'], [True, False]))
def time_rolling(self, func, center):
getattr(self.ds.rolling(x=window, center=center), func)()

@parameterized(['window_', 'min_periods'],
([20, 40], [5, None]))
def time_rolling_np(self, window_, min_periods):
self.ds.rolling(x=window_, center=False,
min_periods=min_periods).reduce(getattr(np, 'nanmean'))

@parameterized(['center', 'stride'],
([True, False], [1, 200]))
def time_rolling_construct(self, center, stride):
self.ds.rolling(x=window, center=center).construct(
'window_dim', stride=stride).mean(dim='window_dim')


class RollingDask(Rolling):
def setup(self, *args, **kwargs):
requires_dask()
super(RollingDask, self).setup(**kwargs)
self.ds = self.ds.chunk({'x': 100, 'y': 50, 't': 50})
26 changes: 26 additions & 0 deletions doc/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,32 @@ DataArray methods
DataArray.load
DataArray.chunk

Rolling objects
===============

.. autosummary::
:toctree: generated/

core.rolling.DataArrayRolling
core.rolling.DataArrayRolling.construct
core.rolling.DataArrayRolling.reduce
core.rolling.DatasetRolling
core.rolling.DatasetRolling.construct
core.rolling.DatasetRolling.reduce

GroupByObjects
==============

.. autosummary::
:toctree: generated/

core.groupby.DataArrayGroupBy
core.groupby.DataArrayGroupBy.apply
core.groupby.DataArrayGroupBy.reduce
core.groupby.DatasetGroupBy
core.groupby.DatasetGroupBy.apply
core.groupby.DatasetGroupBy.reduce

Plotting
========

Expand Down
30 changes: 26 additions & 4 deletions doc/computation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -158,20 +158,42 @@ Aggregation and summary methods can be applied directly to the ``Rolling`` objec
r.mean()
r.reduce(np.std)

Note that rolling window aggregations are much faster (both asymptotically and
because they avoid a loop in Python) when bottleneck_ is installed. Otherwise,
we fall back to a slower, pure Python implementation.
Note that rolling window aggregations are faster when bottleneck_ is installed.

.. _bottleneck: https://github.com/kwgoodman/bottleneck/

Finally, we can manually iterate through ``Rolling`` objects:
We can also manually iterate through ``Rolling`` objects:

.. ipython:: python

@verbatim
for label, arr_window in r:
# arr_window is a view of x

Finally, the rolling object has ``construct`` method, which gives a
view of the original ``DataArray`` with the windowed dimension attached to
the last position.
You can use this for more advanced rolling operations, such as strided rolling,
windowed rolling, convolution, short-time FFT, etc.

.. ipython:: python

# rolling with 2-point stride
rolling_da = r.construct('window_dim', stride=2)
rolling_da
rolling_da.mean('window_dim', skipna=False)

Because the ``DataArray`` given by ``r.construct('window_dim')`` is a view
of the original array, it is memory efficient.

.. note::
numpy's Nan-aggregation functions such as ``nansum`` copy the original array.
In xarray, we internally use these functions in our aggregation methods
(such as ``.sum()``) if ``skipna`` argument is not specified or set to True.
This means ``rolling_da.mean('window_dim')`` is memory inefficient.
To avoid this, use ``skipna=False`` as the above example.


.. _compute.broadcasting:

Broadcasting by dimension name
Expand Down
14 changes: 13 additions & 1 deletion doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ Documentation
Enhancements
~~~~~~~~~~~~

- Improve :py:func:`~xarray.DataArray.rolling` logic.
:py:func:`~xarray.DataArrayRolling` object now supports
:py:func:`~xarray.DataArrayRolling.construct` method that returns a view
of the DataArray / Dataset object with the rolling-window dimension added
to the last axis. This enables more flexible operation, such as strided
rolling, windowed rolling, ND-rolling, short-time FFT and convolution.
(:issue:`1831`, :issue:`1142`, :issue:`819`)
By `Keisuke Fujii <https://github.com/fujiisoup>`_.

Bug fixes
~~~~~~~~~

Expand All @@ -64,7 +73,6 @@ Documentation

Enhancements
~~~~~~~~~~~~

**New functions and methods**:

- Added :py:meth:`DataArray.to_iris` and
Expand Down Expand Up @@ -140,6 +148,10 @@ Enhancements

Bug fixes
~~~~~~~~~
- Rolling aggregation with ``center=True`` option now gives the same result
with pandas including the last element (:issue:`1046`).
By `Keisuke Fujii <https://github.com/fujiisoup>`_.

- Support indexing with a 0d-np.ndarray (:issue:`1921`).
By `Keisuke Fujii <https://github.com/fujiisoup>`_.
- Added warning in api.py of a netCDF4 bug that occurs when
Expand Down
16 changes: 12 additions & 4 deletions xarray/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,11 @@ def groupby(self, group, squeeze=True):
grouped : GroupBy
A `GroupBy` object patterned after `pandas.GroupBy` that can be
iterated over in the form of `(unique_value, grouped_array)` pairs.

See Also
--------
core.groupby.DataArrayGroupBy
core.groupby.DatasetGroupBy
"""
return self._groupby_cls(self, group, squeeze=squeeze)

Expand Down Expand Up @@ -483,9 +488,6 @@ def rolling(self, min_periods=None, center=False, **windows):
"""
Rolling window object.

Rolling window aggregations are much faster when bottleneck is
installed.

Parameters
----------
min_periods : int, default None
Expand All @@ -503,7 +505,8 @@ def rolling(self, min_periods=None, center=False, **windows):

Returns
-------
rolling : type of input argument
Rolling object (core.rolling.DataArrayRolling for DataArray,
core.rolling.DatasetRolling for Dataset.)

Examples
--------
Expand Down Expand Up @@ -531,6 +534,11 @@ def rolling(self, min_periods=None, center=False, **windows):
array([ 1., 2., 3., 4., 5., 6., 7., 8., 9., 10.])
Coordinates:
* time (time) datetime64[ns] 2000-02-15 2000-03-15 2000-04-15 ...

See Also
--------
core.rolling.DataArrayRolling
core.rolling.DatasetRolling
"""

return self._rolling_cls(self, min_periods=min_periods,
Expand Down
74 changes: 72 additions & 2 deletions xarray/core/dask_array_ops.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""Define core operations for xarray objects.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import numpy as np
from . import nputils

try:
import dask.array as da
Expand All @@ -24,3 +27,70 @@ def dask_rolling_wrapper(moving_func, a, window, min_count=None, axis=-1):
# trim array
result = da.ghost.trim_internal(out, depth)
return result


def rolling_window(a, axis, window, center, fill_value):
""" Dask's equivalence to np.utils.rolling_window """
orig_shape = a.shape
# inputs for ghost
if axis < 0:
axis = a.ndim + axis
depth = {d: 0 for d in range(a.ndim)}
depth[axis] = int(window / 2)
# For evenly sized window, we need to crop the first point of each block.
offset = 1 if window % 2 == 0 else 0

if depth[axis] > min(a.chunks[axis]):
raise ValueError(
"For window size %d, every chunk should be larger than %d, "
"but the smallest chunk size is %d. Rechunk your array\n"
"with a larger chunk size or a chunk size that\n"
"more evenly divides the shape of your array." %
(window, depth[axis], min(a.chunks[axis])))

# Although dask.ghost pads values to boundaries of the array,
# the size of the generated array is smaller than what we want
# if center == False.
if center:
start = int(window / 2) # 10 -> 5, 9 -> 4
end = window - 1 - start
else:
start, end = window - 1, 0
pad_size = max(start, end) + offset - depth[axis]
drop_size = 0
# pad_size becomes more than 0 when the ghosted array is smaller than
# needed. In this case, we need to enlarge the original array by padding
# before ghosting.
if pad_size > 0:
if pad_size < depth[axis]:
# Ghosting requires each chunk larger than depth. If pad_size is
# smaller than the depth, we enlarge this and truncate it later.
drop_size = depth[axis] - pad_size
pad_size = depth[axis]
shape = list(a.shape)
shape[axis] = pad_size
chunks = list(a.chunks)
chunks[axis] = (pad_size, )
fill_array = da.full(shape, fill_value, dtype=a.dtype, chunks=chunks)
a = da.concatenate([fill_array, a], axis=axis)

boundary = {d: fill_value for d in range(a.ndim)}

# create ghosted arrays
ag = da.ghost.ghost(a, depth=depth, boundary=boundary)

# apply rolling func
def func(x, window, axis=-1):
x = np.asarray(x)
rolling = nputils._rolling_window(x, window, axis)
return rolling[(slice(None), ) * axis + (slice(offset, None), )]

chunks = list(a.chunks)
chunks.append(window)
out = ag.map_blocks(func, dtype=a.dtype, new_axis=a.ndim, chunks=chunks,
window=window, axis=axis)

# crop boundary.
index = (slice(None),) * axis + (slice(drop_size,
drop_size + orig_shape[axis]), )
return out[index]
19 changes: 18 additions & 1 deletion xarray/core/duck_array_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import numpy as np
import pandas as pd

from . import dtypes, npcompat
from . import dask_array_ops, dtypes, npcompat, nputils
from .nputils import nanfirst, nanlast
from .pycompat import dask_array_type

Expand Down Expand Up @@ -278,6 +278,10 @@ def f(values, axis=None, skipna=None, **kwargs):
dtype = kwargs.get('dtype', None)
values = asarray(values)

# dask requires dtype argument for object dtype
if (values.dtype == 'object' and name in ['sum', ]):
kwargs['dtype'] = values.dtype if dtype is None else dtype

if coerce_strings and values.dtype.kind in 'SU':
values = values.astype(object)

Expand Down Expand Up @@ -369,3 +373,16 @@ def last(values, axis, skipna=None):
_fail_on_dask_array_input_skipna(values)
return nanlast(values, axis)
return take(values, -1, axis=axis)


def rolling_window(array, axis, window, center, fill_value):
"""
Make an ndarray with a rolling window of axis-th dimension.
The rolling dimension will be placed at the last dimension.
"""
if isinstance(array, dask_array_type):
return dask_array_ops.rolling_window(
array, axis, window, center, fill_value)
else: # np.ndarray
return nputils.rolling_window(
array, axis, window, center, fill_value)
7 changes: 6 additions & 1 deletion xarray/core/missing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import numpy as np
import pandas as pd

from . import rolling
from .computation import apply_ufunc
from .npcompat import flip
from .pycompat import iteritems
Expand Down Expand Up @@ -326,4 +327,8 @@ def _get_valid_fill_mask(arr, dim, limit):
'''helper function to determine values that can be filled when limit is not
None'''
kw = {dim: limit + 1}
return arr.isnull().rolling(min_periods=1, **kw).sum() <= limit
# we explicitly use construct method to avoid copy.
new_dim = rolling._get_new_dimname(arr.dims, '_window')
return (arr.isnull().rolling(min_periods=1, **kw)
.construct(new_dim, fill_value=False)
.sum(new_dim, skipna=False)) <= limit
11 changes: 11 additions & 0 deletions xarray/core/npcompat.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
from __future__ import absolute_import, division, print_function

import numpy as np
from distutils.version import LooseVersion


if LooseVersion(np.__version__) >= LooseVersion('1.12'):
as_strided = np.lib.stride_tricks.as_strided
else:
def as_strided(x, shape=None, strides=None, subok=False, writeable=True):
array = np.lib.stride_tricks.as_strided(x, shape, strides, subok)
array.setflags(write=writeable)
return array


try:
from numpy import nancumsum, nancumprod, flip
Expand Down
Loading