Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set allow_rechunk=True in apply_ufunc #4372

Closed
dcherian opened this issue Aug 24, 2020 · 10 comments · Fixed by #4392
Closed

Set allow_rechunk=True in apply_ufunc #4372

dcherian opened this issue Aug 24, 2020 · 10 comments · Fixed by #4392
Labels

Comments

@dcherian
Copy link
Contributor

What happened:

blockwise calls unify_chunks by default but apply_gufunc does not; so we have a regression in apply_ufunc now that we've switched from blockwise to apply_gufunc.

Minimal Complete Verifiable Example:

import operator

a = xr.DataArray(np.arange(10), dims=("a")).chunk({"a": 2})
b = xr.DataArray(np.arange(10), dims=("a")).chunk({"a": 4})

xr.apply_ufunc(operator.add, a, b, dask="parallelized", output_dtypes=[a.dtype]).compute()

raises

ValueError: Dimension `'__loopdim0__'` with different chunksize present

on master but works with 0.16.0

I think we need to do dask_gufunc_kwargs.setdefault("allow_rechunk", True)

If we want to avoid that, we'll need to go through a deprecation cycle.

@dcherian dcherian added the bug label Aug 24, 2020
@dcherian
Copy link
Contributor Author

(this is causing downstream test failures: NCAR/pop-tools#59; thanks @mnlevy1981)

@kmuehlbauer
Copy link
Contributor

The behaviour changed in #4060 (commit a7fb5a9). Please see discussion with regard to allow_rechunk over there. Reason to not handle/set allow_rechunk=True was @shoyer's comment.

@dcherian
Copy link
Contributor Author

Copying over that comment...

I am pretty confident that existing behavior of xarray.apply_ufunc with un-equal chunks along non-core dimensions is entirely broken. I am OK with just erroring for now.

@shoyer could you please clarify what you meant?

For example, this works with v0.16.0 but fails on master

import operator

import numpy as np

import xarray as xr

a = xr.DataArray(np.ones((10, 10)), dims=("a", "b")).chunk({"a": 2, "b": 1})
b = xr.DataArray(np.ones((10, 10)), dims=("a", "b")).chunk({"a": -1, "b": 4})

xr.apply_ufunc(
    operator.add, a, b, dask="parallelized", output_dtypes=[a.dtype],
).compute().equals(a.compute() + b.compute())

@kmuehlbauer
Copy link
Contributor

One solution would be to catch this ValueError, issue a FutureWarning and add allow_rechunk=True to dask_gufunc_kwargs here:

def func(*arrays):
import dask.array as da
res = da.apply_gufunc(
numpy_func,
signature.to_gufunc_string(exclude_dims),
*arrays,
vectorize=vectorize,
output_dtypes=output_dtypes,
**dask_gufunc_kwargs,
)

        def func(*arrays):
            import dask.array as da

            gufunc = functools.partial(
                da.apply_gufunc,
                numpy_func,
                signature.to_gufunc_string(exclude_dims),
                *arrays,
                vectorize=vectorize,
                output_dtypes=output_dtypes,
            )

            try:
                res = gufunc(**dask_gufunc_kwargs)
            except ValueError as exc:
                if "with different chunksize present" in str(exc):
                    warnings.warn(
                        f"``allow_rechunk=True`` need to be explicitely set in the "
                        f"``dask_gufunc_kwargs`` parameter. Not setting will raise dask "
                        f"ValueError ``{str(exc)}`` in a future version.",
                        FutureWarning,
                        stacklevel=2,
                    )
                    dask_gufunc_kwargs["allow_rechunk"] = True
                    res = gufunc(**dask_gufunc_kwargs)
                else:
                    raise

I could make a PR out of this. The message wording can surely be improved. WDYT @dcherian and @shoyer?

@shoyer
Copy link
Member

shoyer commented Aug 28, 2020

Maybe we do want to set allow_rechunk=True? It seems that I was just mistaken about the current behavior.

@kmuehlbauer
Copy link
Contributor

@shoyer In this case: Should we warn the user, that data might be loaded into memory?

Another questions are, why does this kwarg exist in dask and why do they not rechunk per default?

@kmuehlbauer
Copy link
Contributor

From the dask apply_gufunc docstring:

"""
allow_rechunk: Optional, bool, keyword only

    Allows rechunking, otherwise chunk sizes need to match and core dimensions are to consist only of one chunk. 
    Warning: enabling this can increase memory usage significantly. Defaults to False
"""

Current code handling in dask:

https://github.com/dask/dask/blob/42873f27ce11ce35652dda344dae5c47b742bef2/dask/array/gufunc.py#L398-L417

         if not allow_rechunk:
            chunksizes = chunksizess[dim]
            #### Check if core dimensions consist of only one chunk
            if (dim in core_shapes) and (chunksizes[0][0] < core_shapes[dim]):
                raise ValueError(
                    "Core dimension `'{}'` consists of multiple chunks. To fix, rechunk into a single \
chunk along this dimension or set `allow_rechunk=True`, but beware that this may increase memory usage \
significantly.".format(
                        dim
                    )
                )
            #### Check if loop dimensions consist of same chunksizes, when they have sizes > 1
            relevant_chunksizes = list(
                unique(c for s, c in zip(sizes, chunksizes) if s > 1)
            )
            if len(relevant_chunksizes) > 1:
                raise ValueError(
                    "Dimension `'{}'` with different chunksize present".format(dim)
                )

IIUTC, this not only rechunks non-core dimensions but also fixes core dimensions with more than one chunk. Would this be intended from the xarray-side? Before #4060 core dimension chunks>1 was catched and errored:

            # core dimensions cannot span multiple chunks
            for axis, dim in enumerate(core_dims, start=-len(core_dims)):
                if len(data.chunks[axis]) != 1:
                    raise ValueError(
                        "dimension {!r} on {}th function argument to "
                        "apply_ufunc with dask='parallelized' consists of "
                        "multiple chunks, but is also a core dimension. To "
                        "fix, rechunk into a single dask array chunk along "
                        "this dimension, i.e., ``.chunk({})``, but beware "
                        "that this may significantly increase memory usage.".format(
                            dim, n, {dim: -1}
                        )
                    )

Explicit rechunk was recommended to the user, though.

That means setting allow_rechunk=True per default alone will not give us same behaviour as before #4060. I'm unsure how to proceed.

@kmuehlbauer
Copy link
Contributor

Another questions are, why does this kwarg exist in dask and why do they not rechunk per default?

Trying to answer this from looking at the dask code.

  • allow_rechunk=False: catch chunking problems in core and non-core-dimensions and raise an error.
    This helps to prevent users running into loading huge dask arrays into memory without further notice.
  • allow_rechunk=True: blockwise is called with align_arrays=True per default which means automatic rechunking for all arrays (core and non-core dimensions). Users can use this, if they are sure the system can handle possible large amounts of data.

@dcherian
Copy link
Contributor Author

So to maintain backward compatibility, we should add that same check

            # core dimensions cannot span multiple chunks
            for axis, dim in enumerate(core_dims, start=-len(core_dims)):
                if len(data.chunks[axis]) != 1:
                    raise ValueError(
                        "dimension {!r} on {}th function argument to "
                        "apply_ufunc with dask='parallelized' consists of "
                        "multiple chunks, but is also a core dimension. To "
                        "fix, rechunk into a single dask array chunk along "
                        "this dimension, i.e., ``.chunk({})``, but beware "
                        "that this may significantly increase memory usage.".format(
                            dim, n, {dim: -1}
                        )
                    )

and set allow_rechunk=True.

We could deprecate and remove this check in a couple of versions but I don't know if it's worth the effort...

@kmuehlbauer
Copy link
Contributor

@dcherian @shoyer

In #4392 I've tried to get around this bug. I found it easier to just catch the dask ValueError's and not add more code checks. I'll add more information in that PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants