From 13c09dc28ec8ff791c6d87e2d8e80c362c65ffd4 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Sun, 20 Sep 2020 00:21:56 -0500 Subject: [PATCH] Fixed dask.optimize on datasets (#4438) * Fixed dask.optimize on datasets Another attempt to fix #3698. The issue with my fix in is that we hit `Variable._dask_finalize` in both `dask.optimize` and `dask.persist`. We want to do the culling of unnecessary tasks (`test_persist_Dataset`) but only in the persist case, not optimize (`test_optimize`). * Update whats-new.rst * Update doc/whats-new.rst Co-authored-by: Deepak Cherian Co-authored-by: Maximilian Roos <5635139+max-sixty@users.noreply.github.com> --- doc/whats-new.rst | 3 ++- xarray/core/dataset.py | 11 ++++++++++- xarray/core/variable.py | 3 --- xarray/tests/test_dask.py | 8 ++++++++ 4 files changed, 20 insertions(+), 5 deletions(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 4c0658f6972..82f51a1beec 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -101,6 +101,8 @@ Bug fixes By `Jens Svensmark `_ - Fix incorrect legend labels for :py:meth:`Dataset.plot.scatter` (:issue:`4126`). By `Peter Hausamann `_. +- Fix ``dask.optimize`` on ``DataArray`` producing an invalid Dask task graph (:issue:`3698`) + By `Tom Augspurger `_ - Fix ``pip install .`` when no ``.git`` directory exists; namely when the xarray source directory has been rsync'ed by PyCharm Professional for a remote deployment over SSH. By `Guido Imperiale `_ @@ -109,7 +111,6 @@ Bug fixes - Avoid relying on :py:class:`set` objects for the ordering of the coordinates (:pull:`4409`) By `Justus Magin `_. - Documentation ~~~~~~~~~~~~~ diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index ce72d4a5886..1777ee356af 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -777,10 +777,19 @@ def _dask_postcompute(results, info, *args): @staticmethod def _dask_postpersist(dsk, info, *args): variables = {} + # postpersist is called in both dask.optimize and dask.persist + # When persisting, we want to filter out unrelated keys for + # each Variable's task graph. + is_persist = len(dsk) == len(info) for is_dask, k, v in info: if is_dask: func, args2 = v - result = func(dsk, *args2) + if is_persist: + name = args2[1][0] + dsk2 = {k: v for k, v in dsk.items() if k[0] == name} + else: + dsk2 = dsk + result = func(dsk2, *args2) else: result = v variables[k] = result diff --git a/xarray/core/variable.py b/xarray/core/variable.py index 6de00ee882a..c55e61cb816 100644 --- a/xarray/core/variable.py +++ b/xarray/core/variable.py @@ -501,9 +501,6 @@ def __dask_postpersist__(self): @staticmethod def _dask_finalize(results, array_func, array_args, dims, attrs, encoding): - if isinstance(results, dict): # persist case - name = array_args[0] - results = {k: v for k, v in results.items() if k[0] == name} data = array_func(results, *array_args) return Variable(dims, data, attrs=attrs, encoding=encoding) diff --git a/xarray/tests/test_dask.py b/xarray/tests/test_dask.py index 46685a29a47..7d664aca3e4 100644 --- a/xarray/tests/test_dask.py +++ b/xarray/tests/test_dask.py @@ -1607,3 +1607,11 @@ def test_more_transforms_pass_lazy_array_equiv(map_da, map_ds): assert_equal(map_da._from_temp_dataset(map_da._to_temp_dataset()), map_da) assert_equal(map_da.astype(map_da.dtype), map_da) assert_equal(map_da.transpose("y", "x", transpose_coords=False).cxy, map_da.cxy) + + +def test_optimize(): + # https://github.com/pydata/xarray/issues/3698 + a = dask.array.ones((10, 4), chunks=(5, 2)) + arr = xr.DataArray(a).chunk(5) + (arr2,) = dask.optimize(arr) + arr2.compute()