Skip to content

Commit

Permalink
Add template xarray object kwarg to map_blocks (#3816)
Browse files Browse the repository at this point in the history
* Allow providing template dataset to map_blocks.

* Update dimension shape check.

This accounts for dimension sizes being changed by the applied function.

* Allow user function to add new unindexed dimension.

* Add docstring for template.

* renaming

* Raise nice error if adding a new chunked dimension,

* Raise nice error message when expected dimension is missing on returned object

* Revert "Allow user function to add new unindexed dimension."

This reverts commit 045ae2b.

* Add test + fix output_chunks for dataarray template

* typing

* fix test

* Add nice error messages when result doesn't match template.

* blacken

* Add template kwarg to DataArray.map_blocks & Dataset.map_blocks

* minor error message fixes.

* docstring updates.

* bugfix for expected shapes when template is not specified

* Add map_blocks docs.

* Update doc/dask.rst

Co-Authored-By: Joe Hamman <[email protected]>

* refactor out slicer for chunks

* Check expected index values.

* Raise nice error when template object does not have required number of chunks

* doc updates.

* more review comments.

* Mention that attrs are taken from template.

* Add test and explicit point out that attrs is copied from template

Co-authored-by: Joe Hamman <[email protected]>
  • Loading branch information
dcherian and Joe Hamman authored May 6, 2020
1 parent fe7962a commit 0b6e22f
Show file tree
Hide file tree
Showing 7 changed files with 359 additions and 74 deletions.
5 changes: 3 additions & 2 deletions doc/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ Computation
Dataset.quantile
Dataset.differentiate
Dataset.integrate
Dataset.map_blocks
Dataset.polyfit

**Aggregation**:
Expand Down Expand Up @@ -358,6 +359,8 @@ Computation
DataArray.integrate
DataArray.polyfit
DataArray.str
DataArray.map_blocks


**Aggregation**:
:py:attr:`~DataArray.all`
Expand Down Expand Up @@ -518,7 +521,6 @@ Dataset methods
Dataset.load
Dataset.chunk
Dataset.unify_chunks
Dataset.map_blocks
Dataset.filter_by_attrs
Dataset.info

Expand Down Expand Up @@ -550,7 +552,6 @@ DataArray methods
DataArray.load
DataArray.chunk
DataArray.unify_chunks
DataArray.map_blocks

Coordinates objects
===================
Expand Down
114 changes: 110 additions & 4 deletions doc/dask.rst
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,21 @@ loaded into Dask or not:

.. _dask.automatic-parallelization:

Automatic parallelization
-------------------------
Automatic parallelization with ``apply_ufunc`` and ``map_blocks``
-----------------------------------------------------------------

Almost all of xarray's built-in operations work on Dask arrays. If you want to
use a function that isn't wrapped by xarray, one option is to extract Dask
arrays from xarray objects (``.data``) and use Dask directly.
use a function that isn't wrapped by xarray, and have it applied in parallel on
each block of your xarray object, you have three options:

1. Extract Dask arrays from xarray objects (``.data``) and use Dask directly.
2. Use :py:func:`~xarray.apply_ufunc` to apply functions that consume and return NumPy arrays.
3. Use :py:func:`~xarray.map_blocks`, :py:meth:`Dataset.map_blocks` or :py:meth:`DataArray.map_blocks`
to apply functions that consume and return xarray objects.


``apply_ufunc``
~~~~~~~~~~~~~~~

Another option is to use xarray's :py:func:`~xarray.apply_ufunc`, which can
automate `embarrassingly parallel
Expand Down Expand Up @@ -400,6 +409,103 @@ application.
structure of a problem, unlike the generic speedups offered by
``dask='parallelized'``.


``map_blocks``
~~~~~~~~~~~~~~

Functions that consume and return xarray objects can be easily applied in parallel using :py:func:`map_blocks`.
Your function will receive an xarray Dataset or DataArray subset to one chunk
along each chunked dimension.

.. ipython:: python
ds.temperature
This DataArray has 3 chunks each with length 10 along the time dimension.
At compute time, a function applied with :py:func:`map_blocks` will receive a DataArray corresponding to a single block of shape 10x180x180
(time x latitude x longitude) with values loaded. The following snippet illustrates how to check the shape of the object
received by the applied function.

.. ipython:: python
def func(da):
print(da.sizes)
return da.time
mapped = xr.map_blocks(func, ds.temperature)
mapped
Notice that the :py:meth:`map_blocks` call printed
``Frozen({'time': 0, 'latitude': 0, 'longitude': 0})`` to screen.
``func`` is received 0-sized blocks! :py:meth:`map_blocks` needs to know what the final result
looks like in terms of dimensions, shapes etc. It does so by running the provided function on 0-shaped
inputs (*automated inference*). This works in many cases, but not all. If automatic inference does not
work for your function, provide the ``template`` kwarg (see below).

In this case, automatic inference has worked so let's check that the result is as expected.

.. ipython:: python
mapped.load(scheduler="single-threaded")
mapped.identical(ds.time)
Note that we use ``.load(scheduler="single-threaded")`` to execute the computation.
This executes the Dask graph in `serial` using a for loop, but allows for printing to screen and other
debugging techniques. We can easily see that our function is receiving blocks of shape 10x180x180 and
the returned result is identical to ``ds.time`` as expected.


Here is a common example where automated inference will not work.

.. ipython:: python
:okexcept:
def func(da):
print(da.sizes)
return da.isel(time=[1])
mapped = xr.map_blocks(func, ds.temperature)
``func`` cannot be run on 0-shaped inputs because it is not possible to extract element 1 along a
dimension of size 0. In this case we need to tell :py:func:`map_blocks` what the returned result looks
like using the ``template`` kwarg. ``template`` must be an xarray Dataset or DataArray (depending on
what the function returns) with dimensions, shapes, chunk sizes, attributes, coordinate variables *and* data
variables that look exactly like the expected result. The variables should be dask-backed and hence not
incur much memory cost.

.. note::

Note that when ``template`` is provided, ``attrs`` from ``template`` are copied over to the result. Any
``attrs`` set in ``func`` will be ignored.


.. ipython:: python
template = ds.temperature.isel(time=[1, 11, 21])
mapped = xr.map_blocks(func, ds.temperature, template=template)
Notice that the 0-shaped sizes were not printed to screen. Since ``template`` has been provided
:py:func:`map_blocks` does not need to infer it by running ``func`` on 0-shaped inputs.

.. ipython:: python
mapped.identical(template)
:py:func:`map_blocks` also allows passing ``args`` and ``kwargs`` down to the user function ``func``.
``func`` will be executed as ``func(block_xarray, *args, **kwargs)`` so ``args`` must be a list and ``kwargs`` must be a dictionary.

.. ipython:: python
def func(obj, a, b=0):
return obj + a + b
mapped = ds.map_blocks(func, args=[10], kwargs={"b": 10})
expected = ds + 10 + 10
mapped.identical(expected)
Chunking and performance
------------------------

Expand Down
5 changes: 5 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ New Features
the :py:class:`~core.accessor_dt.DatetimeAccessor` (:pull:`3935`). This
feature requires cftime version 1.1.0 or greater. By
`Spencer Clark <https://github.com/spencerkclark>`_.
- :py:meth:`map_blocks` now accepts a ``template`` kwarg. This allows use cases
where the result of a computation could not be inferred automatically.
By `Deepak Cherian <https://github.com/dcherian>`_

Bug fixes
~~~~~~~~~
Expand Down Expand Up @@ -123,6 +126,8 @@ Documentation
By `Matthias Riße <https://github.com/risebell>`_.
- Apply ``black`` to all the code in the documentation (:pull:`4012`)
By `Justus Magin <https://github.com/keewis>`_.
- Narrative documentation now describes :py:meth:`map_blocks`. :ref:`dask.automatic-parallelization`.
By `Deepak Cherian <https://github.com/dcherian>`_.

Internal Changes
~~~~~~~~~~~~~~~~
Expand Down
30 changes: 17 additions & 13 deletions xarray/core/dataarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -3250,34 +3250,38 @@ def map_blocks(
func: "Callable[..., T_DSorDA]",
args: Sequence[Any] = (),
kwargs: Mapping[str, Any] = None,
template: Union["DataArray", "Dataset"] = None,
) -> "T_DSorDA":
"""
Apply a function to each chunk of this DataArray. This method is experimental
and its signature may change.
Apply a function to each block of this DataArray.
.. warning::
This method is experimental and its signature may change.
Parameters
----------
func: callable
User-provided function that accepts a DataArray as its first parameter. The
function will receive a subset of this DataArray, corresponding to one chunk
along each chunked dimension. ``func`` will be executed as
``func(obj_subset, *args, **kwargs)``.
The function will be first run on mocked-up data, that looks like this array
but has sizes 0, to determine properties of the returned object such as
dtype, variable names, new dimensions and new indexes (if any).
User-provided function that accepts a DataArray as its first
parameter. The function will receive a subset, i.e. one block, of this DataArray
(see below), corresponding to one chunk along each chunked dimension. ``func`` will be
executed as ``func(block_subset, *args, **kwargs)``.
This function must return either a single DataArray or a single Dataset.
This function cannot change size of existing dimensions, or add new chunked
dimensions.
This function cannot add a new chunked dimension.
args: Sequence
Passed verbatim to func after unpacking, after the sliced DataArray. xarray
objects, if any, will not be split by chunks. Passing dask collections is
not allowed.
kwargs: Mapping
Passed verbatim to func after unpacking. xarray objects, if any, will not be
split by chunks. Passing dask collections is not allowed.
template: (optional) DataArray, Dataset
xarray object representing the final result after compute is called. If not provided,
the function will be first run on mocked-up data, that looks like 'obj' but
has sizes 0, to determine properties of the returned object such as dtype,
variable names, new dimensions and new indexes (if any).
'template' must be provided if the function changes the size of existing dimensions.
Returns
-------
Expand All @@ -3300,7 +3304,7 @@ def map_blocks(
"""
from .parallel import map_blocks

return map_blocks(func, self, args, kwargs)
return map_blocks(func, self, args, kwargs, template)

def polyfit(
self,
Expand Down
30 changes: 17 additions & 13 deletions xarray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5709,34 +5709,38 @@ def map_blocks(
func: "Callable[..., T_DSorDA]",
args: Sequence[Any] = (),
kwargs: Mapping[str, Any] = None,
template: Union["DataArray", "Dataset"] = None,
) -> "T_DSorDA":
"""
Apply a function to each chunk of this Dataset. This method is experimental and
its signature may change.
Apply a function to each block of this Dataset.
.. warning::
This method is experimental and its signature may change.
Parameters
----------
func: callable
User-provided function that accepts a Dataset as its first parameter. The
function will receive a subset of this Dataset, corresponding to one chunk
along each chunked dimension. ``func`` will be executed as
``func(obj_subset, *args, **kwargs)``.
The function will be first run on mocked-up data, that looks like this
Dataset but has sizes 0, to determine properties of the returned object such
as dtype, variable names, new dimensions and new indexes (if any).
User-provided function that accepts a Dataset as its first
parameter. The function will receive a subset, i.e. one block, of this Dataset
(see below), corresponding to one chunk along each chunked dimension. ``func`` will be
executed as ``func(block_subset, *args, **kwargs)``.
This function must return either a single DataArray or a single Dataset.
This function cannot change size of existing dimensions, or add new chunked
dimensions.
This function cannot add a new chunked dimension.
args: Sequence
Passed verbatim to func after unpacking, after the sliced DataArray. xarray
objects, if any, will not be split by chunks. Passing dask collections is
not allowed.
kwargs: Mapping
Passed verbatim to func after unpacking. xarray objects, if any, will not be
split by chunks. Passing dask collections is not allowed.
template: (optional) DataArray, Dataset
xarray object representing the final result after compute is called. If not provided,
the function will be first run on mocked-up data, that looks like 'obj' but
has sizes 0, to determine properties of the returned object such as dtype,
variable names, new dimensions and new indexes (if any).
'template' must be provided if the function changes the size of existing dimensions.
Returns
-------
Expand All @@ -5759,7 +5763,7 @@ def map_blocks(
"""
from .parallel import map_blocks

return map_blocks(func, self, args, kwargs)
return map_blocks(func, self, args, kwargs, template)

def polyfit(
self,
Expand Down
Loading

0 comments on commit 0b6e22f

Please sign in to comment.