From bb4c7b4368ee416ff92f6115c6b7003b68efa3ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kai=20M=C3=BChlbauer?= Date: Wed, 9 Sep 2020 21:00:16 +0200 Subject: [PATCH] FIX: handle dask ValueErrors in `apply_ufunc` (set ``allow_rechunk=True``) (#4392) * FIX: catch dask chunk mismatch ValueErrors, warn and set ``allow_rechunk=True`` * FIX: raise ValueError for the core dimension case * add pull request reference to whats-new.rst (internal changes) * add test for FutureWarning * WIP allow_rechunk * fix allow_rechunk * use new is_duck_dask_array()-check * use get instead pop * Small change to error message. Co-authored-by: dcherian --- doc/whats-new.rst | 2 +- xarray/core/computation.py | 20 ++++++++++++++++++++ xarray/tests/test_computation.py | 5 ++--- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index b948679b637..f367286a244 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -98,7 +98,7 @@ Documentation Internal Changes ~~~~~~~~~~~~~~~~ - Use :py:func:`dask.array.apply_gufunc` instead of :py:func:`dask.array.blockwise` in - :py:func:`xarray.apply_ufunc` when using ``dask='parallelized'``. (:pull:`4060`, :pull:`4391`) + :py:func:`xarray.apply_ufunc` when using ``dask='parallelized'``. (:pull:`4060`, :pull:`4391`, :pull:`4392`) - Fix ``pip install .`` when no ``.git`` directory exists; namely when the xarray source directory has been rsync'ed by PyCharm Professional for a remote deployment over SSH. By `Guido Imperiale `_ diff --git a/xarray/core/computation.py b/xarray/core/computation.py index 507f12fe55e..db4a9b46e98 100644 --- a/xarray/core/computation.py +++ b/xarray/core/computation.py @@ -625,6 +625,26 @@ def apply_variable_ufunc( if dask_gufunc_kwargs is None: dask_gufunc_kwargs = {} + allow_rechunk = dask_gufunc_kwargs.get("allow_rechunk", None) + if allow_rechunk is None: + for n, (data, core_dims) in enumerate( + zip(input_data, signature.input_core_dims) + ): + if is_duck_dask_array(data): + # core dimensions cannot span multiple chunks + for axis, dim in enumerate(core_dims, start=-len(core_dims)): + if len(data.chunks[axis]) != 1: + raise ValueError( + f"dimension {dim} on {n}th function argument to " + "apply_ufunc with dask='parallelized' consists of " + "multiple chunks, but is also a core dimension. To " + "fix, either rechunk into a single dask array chunk along " + f"this dimension, i.e., ``.chunk({dim}: -1)``, or " + "pass ``allow_rechunk=True`` in ``dask_gufunc_kwargs`` " + "but beware that this may significantly increase memory usage." + ) + dask_gufunc_kwargs["allow_rechunk"] = True + output_sizes = dask_gufunc_kwargs.pop("output_sizes", {}) if output_sizes: output_sizes_renamed = {} diff --git a/xarray/tests/test_computation.py b/xarray/tests/test_computation.py index 7cb755b6dac..63bedfaf280 100644 --- a/xarray/tests/test_computation.py +++ b/xarray/tests/test_computation.py @@ -695,8 +695,7 @@ def check(x, y): check(data_array, 0 * data_array) check(data_array, 0 * data_array[0]) check(data_array[:, 0], 0 * data_array[0]) - with raises_regex(ValueError, "with different chunksize present"): - check(data_array, 0 * data_array.compute()) + check(data_array, 0 * data_array.compute()) @requires_dask @@ -710,7 +709,7 @@ def test_apply_dask_parallelized_errors(): with raises_regex(ValueError, "at least one input is an xarray object"): apply_ufunc(identity, array, dask="parallelized") - # formerly from _apply_blockwise, now from dask.array.apply_gufunc + # formerly from _apply_blockwise, now from apply_variable_ufunc with raises_regex(ValueError, "consists of multiple chunks"): apply_ufunc( identity,