Skip to content

Commit

Permalink
FIX: handle dask ValueErrors in apply_ufunc (set ``allow_rechunk=Tr…
Browse files Browse the repository at this point in the history
…ue``) (pydata#4392)

* FIX: catch dask chunk mismatch ValueErrors, warn and set ``allow_rechunk=True``

* FIX: raise ValueError for the core dimension case

* add pull request reference to whats-new.rst (internal changes)

* add test for FutureWarning

* WIP allow_rechunk

* fix allow_rechunk

* use new is_duck_dask_array()-check

* use get instead pop

* Small change to error message.

Co-authored-by: dcherian <[email protected]>
  • Loading branch information
kmuehlbauer and dcherian authored Sep 9, 2020
1 parent 572a528 commit bb4c7b4
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 4 deletions.
2 changes: 1 addition & 1 deletion doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ Documentation
Internal Changes
~~~~~~~~~~~~~~~~
- Use :py:func:`dask.array.apply_gufunc` instead of :py:func:`dask.array.blockwise` in
:py:func:`xarray.apply_ufunc` when using ``dask='parallelized'``. (:pull:`4060`, :pull:`4391`)
:py:func:`xarray.apply_ufunc` when using ``dask='parallelized'``. (:pull:`4060`, :pull:`4391`, :pull:`4392`)
- Fix ``pip install .`` when no ``.git`` directory exists; namely when the xarray source
directory has been rsync'ed by PyCharm Professional for a remote deployment over SSH.
By `Guido Imperiale <https://github.com/crusaderky>`_
Expand Down
20 changes: 20 additions & 0 deletions xarray/core/computation.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,26 @@ def apply_variable_ufunc(
if dask_gufunc_kwargs is None:
dask_gufunc_kwargs = {}

allow_rechunk = dask_gufunc_kwargs.get("allow_rechunk", None)
if allow_rechunk is None:
for n, (data, core_dims) in enumerate(
zip(input_data, signature.input_core_dims)
):
if is_duck_dask_array(data):
# core dimensions cannot span multiple chunks
for axis, dim in enumerate(core_dims, start=-len(core_dims)):
if len(data.chunks[axis]) != 1:
raise ValueError(
f"dimension {dim} on {n}th function argument to "
"apply_ufunc with dask='parallelized' consists of "
"multiple chunks, but is also a core dimension. To "
"fix, either rechunk into a single dask array chunk along "
f"this dimension, i.e., ``.chunk({dim}: -1)``, or "
"pass ``allow_rechunk=True`` in ``dask_gufunc_kwargs`` "
"but beware that this may significantly increase memory usage."
)
dask_gufunc_kwargs["allow_rechunk"] = True

output_sizes = dask_gufunc_kwargs.pop("output_sizes", {})
if output_sizes:
output_sizes_renamed = {}
Expand Down
5 changes: 2 additions & 3 deletions xarray/tests/test_computation.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,7 @@ def check(x, y):
check(data_array, 0 * data_array)
check(data_array, 0 * data_array[0])
check(data_array[:, 0], 0 * data_array[0])
with raises_regex(ValueError, "with different chunksize present"):
check(data_array, 0 * data_array.compute())
check(data_array, 0 * data_array.compute())


@requires_dask
Expand All @@ -710,7 +709,7 @@ def test_apply_dask_parallelized_errors():
with raises_regex(ValueError, "at least one input is an xarray object"):
apply_ufunc(identity, array, dask="parallelized")

# formerly from _apply_blockwise, now from dask.array.apply_gufunc
# formerly from _apply_blockwise, now from apply_variable_ufunc
with raises_regex(ValueError, "consists of multiple chunks"):
apply_ufunc(
identity,
Expand Down

0 comments on commit bb4c7b4

Please sign in to comment.