Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add template xarray object kwarg to map_blocks #3816

Merged
merged 29 commits into from
May 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
064b583
Allow providing template dataset to map_blocks.
dcherian Feb 25, 2020
1f14b11
Update dimension shape check.
dcherian Feb 26, 2020
045ae2b
Allow user function to add new unindexed dimension.
dcherian Feb 26, 2020
0e37b63
Add docstring for template.
dcherian Feb 26, 2020
5704e84
renaming
dcherian Feb 26, 2020
2c458e1
Raise nice error if adding a new chunked dimension,
dcherian Feb 26, 2020
dced076
Raise nice error message when expected dimension is missing on return…
dcherian Feb 26, 2020
717d900
Revert "Allow user function to add new unindexed dimension."
dcherian Mar 2, 2020
42a9070
Add test + fix output_chunks for dataarray template
dcherian Mar 3, 2020
64ba31f
typing
dcherian Mar 3, 2020
a68cb41
fix test
dcherian Mar 3, 2020
0bc3754
Add nice error messages when result doesn't match template.
dcherian Mar 6, 2020
ee66c88
blacken
dcherian Mar 6, 2020
d52dfd6
Add template kwarg to DataArray.map_blocks & Dataset.map_blocks
dcherian Mar 19, 2020
8ef47f6
minor error message fixes.
dcherian Mar 19, 2020
376f242
docstring updates.
dcherian Mar 19, 2020
d9029eb
bugfix for expected shapes when template is not specified
dcherian Mar 19, 2020
6f69955
Add map_blocks docs.
dcherian Mar 19, 2020
da62535
Update doc/dask.rst
dcherian Mar 19, 2020
4a355e6
Merge remote-tracking branch 'upstream/master' into map-blocks-schema
dcherian Mar 28, 2020
66fe4c4
Merge branch 'map-blocks-schema' of github.com:dcherian/xarray into m…
dcherian Mar 28, 2020
085ce9a
Merge remote-tracking branch 'upstream/master' into map-blocks-schema
dcherian Apr 30, 2020
0f741e2
refactor out slicer for chunks
dcherian May 1, 2020
869e62d
Check expected index values.
dcherian May 1, 2020
334e8d3
Raise nice error when template object does not have required number o…
dcherian May 1, 2020
4dd699e
doc updates.
dcherian May 1, 2020
27eb873
more review comments.
dcherian May 1, 2020
04a2c3c
Mention that attrs are taken from template.
dcherian May 1, 2020
4b92168
Add test and explicit point out that attrs is copied from template
dcherian May 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions doc/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ Computation
Dataset.quantile
Dataset.differentiate
Dataset.integrate
Dataset.map_blocks
Dataset.polyfit

**Aggregation**:
Expand Down Expand Up @@ -358,6 +359,8 @@ Computation
DataArray.integrate
DataArray.polyfit
DataArray.str
DataArray.map_blocks


**Aggregation**:
:py:attr:`~DataArray.all`
Expand Down Expand Up @@ -518,7 +521,6 @@ Dataset methods
Dataset.load
Dataset.chunk
Dataset.unify_chunks
Dataset.map_blocks
Dataset.filter_by_attrs
Dataset.info

Expand Down Expand Up @@ -550,7 +552,6 @@ DataArray methods
DataArray.load
DataArray.chunk
DataArray.unify_chunks
DataArray.map_blocks

Coordinates objects
===================
Expand Down
114 changes: 110 additions & 4 deletions doc/dask.rst
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,21 @@ loaded into Dask or not:

.. _dask.automatic-parallelization:

Automatic parallelization
-------------------------
Automatic parallelization with ``apply_ufunc`` and ``map_blocks``
-----------------------------------------------------------------

Almost all of xarray's built-in operations work on Dask arrays. If you want to
use a function that isn't wrapped by xarray, one option is to extract Dask
arrays from xarray objects (``.data``) and use Dask directly.
use a function that isn't wrapped by xarray, and have it applied in parallel on
each block of your xarray object, you have three options:

1. Extract Dask arrays from xarray objects (``.data``) and use Dask directly.
2. Use :py:func:`~xarray.apply_ufunc` to apply functions that consume and return NumPy arrays.
3. Use :py:func:`~xarray.map_blocks`, :py:meth:`Dataset.map_blocks` or :py:meth:`DataArray.map_blocks`
to apply functions that consume and return xarray objects.


``apply_ufunc``
~~~~~~~~~~~~~~~

Another option is to use xarray's :py:func:`~xarray.apply_ufunc`, which can
automate `embarrassingly parallel
Expand Down Expand Up @@ -400,6 +409,103 @@ application.
structure of a problem, unlike the generic speedups offered by
``dask='parallelized'``.


dcherian marked this conversation as resolved.
Show resolved Hide resolved
``map_blocks``
~~~~~~~~~~~~~~

Functions that consume and return xarray objects can be easily applied in parallel using :py:func:`map_blocks`.
Your function will receive an xarray Dataset or DataArray subset to one chunk
along each chunked dimension.

.. ipython:: python

ds.temperature

This DataArray has 3 chunks each with length 10 along the time dimension.
At compute time, a function applied with :py:func:`map_blocks` will receive a DataArray corresponding to a single block of shape 10x180x180
(time x latitude x longitude) with values loaded. The following snippet illustrates how to check the shape of the object
received by the applied function.

.. ipython:: python

def func(da):
print(da.sizes)
return da.time

mapped = xr.map_blocks(func, ds.temperature)
mapped

Notice that the :py:meth:`map_blocks` call printed
``Frozen({'time': 0, 'latitude': 0, 'longitude': 0})`` to screen.
``func`` is received 0-sized blocks! :py:meth:`map_blocks` needs to know what the final result
looks like in terms of dimensions, shapes etc. It does so by running the provided function on 0-shaped
inputs (*automated inference*). This works in many cases, but not all. If automatic inference does not
work for your function, provide the ``template`` kwarg (see below).

In this case, automatic inference has worked so let's check that the result is as expected.

.. ipython:: python

mapped.load(scheduler="single-threaded")
mapped.identical(ds.time)

Note that we use ``.load(scheduler="single-threaded")`` to execute the computation.
This executes the Dask graph in `serial` using a for loop, but allows for printing to screen and other
debugging techniques. We can easily see that our function is receiving blocks of shape 10x180x180 and
the returned result is identical to ``ds.time`` as expected.


Here is a common example where automated inference will not work.

.. ipython:: python
:okexcept:

def func(da):
print(da.sizes)
dcherian marked this conversation as resolved.
Show resolved Hide resolved
return da.isel(time=[1])

mapped = xr.map_blocks(func, ds.temperature)

``func`` cannot be run on 0-shaped inputs because it is not possible to extract element 1 along a
dimension of size 0. In this case we need to tell :py:func:`map_blocks` what the returned result looks
like using the ``template`` kwarg. ``template`` must be an xarray Dataset or DataArray (depending on
what the function returns) with dimensions, shapes, chunk sizes, attributes, coordinate variables *and* data
variables that look exactly like the expected result. The variables should be dask-backed and hence not
incur much memory cost.

.. note::

Note that when ``template`` is provided, ``attrs`` from ``template`` are copied over to the result. Any
``attrs`` set in ``func`` will be ignored.


.. ipython:: python

template = ds.temperature.isel(time=[1, 11, 21])
mapped = xr.map_blocks(func, ds.temperature, template=template)


Notice that the 0-shaped sizes were not printed to screen. Since ``template`` has been provided
:py:func:`map_blocks` does not need to infer it by running ``func`` on 0-shaped inputs.

.. ipython:: python

mapped.identical(template)


:py:func:`map_blocks` also allows passing ``args`` and ``kwargs`` down to the user function ``func``.
``func`` will be executed as ``func(block_xarray, *args, **kwargs)`` so ``args`` must be a list and ``kwargs`` must be a dictionary.

.. ipython:: python

def func(obj, a, b=0):
return obj + a + b

mapped = ds.map_blocks(func, args=[10], kwargs={"b": 10})
expected = ds + 10 + 10
mapped.identical(expected)


Chunking and performance
------------------------

Expand Down
5 changes: 5 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ New Features
the :py:class:`~core.accessor_dt.DatetimeAccessor` (:pull:`3935`). This
feature requires cftime version 1.1.0 or greater. By
`Spencer Clark <https://github.com/spencerkclark>`_.
- :py:meth:`map_blocks` now accepts a ``template`` kwarg. This allows use cases
where the result of a computation could not be inferred automatically.
By `Deepak Cherian <https://github.com/dcherian>`_

Bug fixes
~~~~~~~~~
Expand Down Expand Up @@ -115,6 +118,8 @@ Documentation
By `Matthias Riße <https://github.com/risebell>`_.
- Apply ``black`` to all the code in the documentation (:pull:`4012`)
By `Justus Magin <https://github.com/keewis>`_.
- Narrative documentation now describes :py:meth:`map_blocks`. :ref:`dask.automatic-parallelization`.
By `Deepak Cherian <https://github.com/dcherian>`_.

Internal Changes
~~~~~~~~~~~~~~~~
Expand Down
30 changes: 17 additions & 13 deletions xarray/core/dataarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -3260,34 +3260,38 @@ def map_blocks(
func: "Callable[..., T_DSorDA]",
args: Sequence[Any] = (),
kwargs: Mapping[str, Any] = None,
template: Union["DataArray", "Dataset"] = None,
) -> "T_DSorDA":
"""
Apply a function to each chunk of this DataArray. This method is experimental
and its signature may change.
Apply a function to each block of this DataArray.

.. warning::
This method is experimental and its signature may change.

Parameters
----------
func: callable
User-provided function that accepts a DataArray as its first parameter. The
function will receive a subset of this DataArray, corresponding to one chunk
along each chunked dimension. ``func`` will be executed as
``func(obj_subset, *args, **kwargs)``.

The function will be first run on mocked-up data, that looks like this array
but has sizes 0, to determine properties of the returned object such as
dtype, variable names, new dimensions and new indexes (if any).
User-provided function that accepts a DataArray as its first
parameter. The function will receive a subset, i.e. one block, of this DataArray
(see below), corresponding to one chunk along each chunked dimension. ``func`` will be
executed as ``func(block_subset, *args, **kwargs)``.

This function must return either a single DataArray or a single Dataset.

This function cannot change size of existing dimensions, or add new chunked
dimensions.
This function cannot add a new chunked dimension.
args: Sequence
Passed verbatim to func after unpacking, after the sliced DataArray. xarray
objects, if any, will not be split by chunks. Passing dask collections is
not allowed.
kwargs: Mapping
Passed verbatim to func after unpacking. xarray objects, if any, will not be
split by chunks. Passing dask collections is not allowed.
template: (optional) DataArray, Dataset
xarray object representing the final result after compute is called. If not provided,
the function will be first run on mocked-up data, that looks like 'obj' but
has sizes 0, to determine properties of the returned object such as dtype,
variable names, new dimensions and new indexes (if any).
'template' must be provided if the function changes the size of existing dimensions.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume attrs are also copied from the template and ignored from the computed chunks?

Copy link
Contributor Author

@dcherian dcherian May 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. But I don't know that this is right.

With auto-inferred templates, attrs are set by the user function. I think it would be nice to preserve that behaviour. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this some more... there's no way to update attrs after computation is there?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, attrs needs to be set when the resulting Dataset is created, before we do the the computation.


Returns
-------
Expand All @@ -3310,7 +3314,7 @@ def map_blocks(
"""
from .parallel import map_blocks

return map_blocks(func, self, args, kwargs)
return map_blocks(func, self, args, kwargs, template)

def polyfit(
self,
Expand Down
32 changes: 18 additions & 14 deletions xarray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5708,34 +5708,38 @@ def map_blocks(
func: "Callable[..., T_DSorDA]",
args: Sequence[Any] = (),
kwargs: Mapping[str, Any] = None,
template: Union["DataArray", "Dataset"] = None,
) -> "T_DSorDA":
"""
Apply a function to each chunk of this Dataset. This method is experimental and
its signature may change.
Apply a function to each block of this Dataset.

.. warning::
This method is experimental and its signature may change.

Parameters
----------
func: callable
User-provided function that accepts a Dataset as its first parameter. The
function will receive a subset of this Dataset, corresponding to one chunk
along each chunked dimension. ``func`` will be executed as
``func(obj_subset, *args, **kwargs)``.

The function will be first run on mocked-up data, that looks like this
Dataset but has sizes 0, to determine properties of the returned object such
as dtype, variable names, new dimensions and new indexes (if any).
User-provided function that accepts a Dataset as its first
parameter. The function will receive a subset, i.e. one block, of this Dataset
(see below), corresponding to one chunk along each chunked dimension. ``func`` will be
executed as ``func(block_subset, *args, **kwargs)``.

This function must return either a single DataArray or a single Dataset.

This function cannot change size of existing dimensions, or add new chunked
dimensions.
This function cannot add a new chunked dimension.
args: Sequence
Passed verbatim to func after unpacking, after the sliced DataArray. xarray
objects, if any, will not be split by chunks. Passing dask collections is
not allowed.
kwargs: Mapping
Passed verbatim to func after unpacking. xarray objects, if any, will not be
split by chunks. Passing dask collections is not allowed.
template: (optional) DataArray, Dataset
xarray object representing the final result after compute is called. If not provided,
the function will be first run on mocked-up data, that looks like 'obj' but
has sizes 0, to determine properties of the returned object such as dtype,
variable names, new dimensions and new indexes (if any).
'template' must be provided if the function changes the size of existing dimensions.

Returns
-------
Expand All @@ -5758,7 +5762,7 @@ def map_blocks(
"""
from .parallel import map_blocks

return map_blocks(func, self, args, kwargs)
return map_blocks(func, self, args, kwargs, template)

def polyfit(
self,
Expand Down Expand Up @@ -5933,7 +5937,7 @@ def polyfit(
"The number of data points must exceed order to scale the covariance matrix."
)
fac = residuals / (x.shape[0] - order)
covariance = xr.DataArray(Vbase, dims=("cov_i", "cov_j"),) * fac
covariance = xr.DataArray(Vbase, dims=("cov_i", "cov_j")) * fac
variables[name + "polyfit_covariance"] = covariance

return Dataset(data_vars=variables, attrs=self.attrs.copy())
Expand Down
Loading