-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Dask interface #1674
Support Dask interface #1674
Conversation
I've only done Variable so far. Hopefully what's here seems straightforward. I'll do DataArray and DataSet next and then look at what legacy code I can clean up within XArray. I'll be working on this while on a long flight and so may not respond quickly. |
xarray/core/variable.py
Outdated
|
||
def visualize(self, **kwargs): | ||
import dask | ||
return dask.visualize(self, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My inclination would be to leave this out and require using dask.visualize()
. My concern is that it could be easily confused with .plot()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
Also test distributed computing
For the distributed work this now also uses dask/distributed#1513 |
I've updated this to dataarray and dataset as well |
@mrocklin - thanks for getting this started. Curious, does the test suite pass when you combine this with dask/dask#2847 ? |
Generally yes, things work. There are a few xfailed failures in tests that used mock on functions that are no longer being used. Also |
91edf8b
to
da8a8dc
Compare
da8a8dc
to
56ec487
Compare
OK, this is now backwards compatible. Tests should pass. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally this looks great, thanks for putting this together!
xarray/core/dataset.py
Outdated
if dask.is_dask_collection(v) else | ||
(False, k, v) for k, v in self._variables.items()] | ||
return self._dask_postcompute, (info, self._coord_names, self._dims, | ||
self._attrs, self._file_obj, self._encoding) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please indent to match the opening (
on the previous line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Is it possible to add this style concern to the flake8 tests?
return None | ||
|
||
def __dask_keys__(self): | ||
return self._data.__dask_keys__() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is OK if these methods error (with AttributeError
) when self._data
is not a dask array?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we always check if the object is a dask collection first by calling __dask_graph__
xarray/core/dataarray.py
Outdated
@@ -576,6 +576,33 @@ def reset_coords(self, names=None, drop=False, inplace=False): | |||
dataset[self.name] = self.variable | |||
return dataset | |||
|
|||
def __dask_graph__(self): | |||
return self._variable.__dask_graph__() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's actually possible to have multiple dask arrays in an xarray.DataArray
, if there are dask arrays in the coordinates. So it would be better to handle DataArray by converting to a Dataset than to a Variable. We use the _to_temp_dataset
/_from_temp_dataset
as a shortcut for these types of cases, e.g., see the current implementation of DataArray.persist()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, that will be a bit tricky. My guess is that it might be simpler to just account for all of the possible dask things explicitly, as we do in dataset. Otherwise we're converting to and from datasets in each of the __dask_foo__
methods, and I would not be surprised to run into oddness there. I'm not sure though.
Can you recommend a test case that includes dask arrays in the coordinates?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some reason this fails when I use dask.arrays for coordinates
coord = da.arange(8, chunks=(4,))
data = da.random.random((8, 8), chunks=(4, 4)) + 1
array = DataArray(data,
coords={'x': coord, 'y': coord},
dims=['x', 'y'])
Replacing coords with a numpy array works fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mrocklin - what is the failure you are referring to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In [1]: import xarray
In [2]: import dask.array as da
In [3]: coord = da.arange(8, chunks=(4,))
...: data = da.random.random((8, 8), chunks=(4, 4)) + 1
...: array = xarray.DataArray(data,
...: coords={'x': coord, 'y': coord},
...: dims=['x', 'y'])
...:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-3-b90a33ebf436> in <module>()
3 array = xarray.DataArray(data,
4 coords={'x': coord, 'y': coord},
----> 5 dims=['x', 'y'])
/home/mrocklin/workspace/xarray/xarray/core/dataarray.py in __init__(self, data, coords, dims, name, attrs, encoding, fastpath)
227
228 data = as_compatible_data(data)
--> 229 coords, dims = _infer_coords_and_dims(data.shape, coords, dims)
230 variable = Variable(dims, data, attrs, encoding, fastpath=True)
231
/home/mrocklin/workspace/xarray/xarray/core/dataarray.py in _infer_coords_and_dims(shape, coords, dims)
68 if utils.is_dict_like(coords):
69 for k, v in coords.items():
---> 70 new_coords[k] = as_variable(v, name=k)
71 elif coords is not None:
72 for dim, coord in zip(dims, coords):
/home/mrocklin/workspace/xarray/xarray/core/variable.py in as_variable(obj, name)
94 '{}'.format(obj))
95 elif utils.is_scalar(obj):
---> 96 obj = Variable([], obj)
97 elif getattr(obj, 'name', None) is not None:
98 obj = Variable(obj.name, obj)
/home/mrocklin/workspace/xarray/xarray/core/variable.py in __init__(self, dims, data, attrs, encoding, fastpath)
275 """
276 self._data = as_compatible_data(data, fastpath=fastpath)
--> 277 self._dims = self._parse_dimensions(dims)
278 self._attrs = None
279 self._encoding = None
/home/mrocklin/workspace/xarray/xarray/core/variable.py in _parse_dimensions(self, dims)
439 raise ValueError('dimensions %s must have the same length as the '
440 'number of data dimensions, ndim=%s'
--> 441 % (dims, self.ndim))
442 return dims
443
ValueError: dimensions () must have the same length as the number of data dimensions, ndim=1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My objective here is to produce a case where a data array has dask arrays in its coordinates so that I can write code to handle such cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mrocklin - something funny is going on here. I'm going to open a separate issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the short term, this may help you move forward:
In [21]: x = xr.Variable('x', da.arange(8, chunks=(4,)))
...: y = xr.Variable('y', da.arange(8, chunks=(4,)) * 2)
...: data = da.random.random((8, 8), chunks=(4, 4)) + 1
...: array = xr.DataArray(data,
...: coords={'xx': x, 'yy': y},
...: dims=['x', 'y'])
...:
In [22]: array
Out[22]:
<xarray.DataArray 'add-a034ba104341d3cca6b28ad7bf059b14' (x: 8, y: 8)>
dask.array<shape=(8, 8), dtype=float64, chunksize=(4, 4)>
Coordinates:
xx (x) int64 dask.array<shape=(8,), chunksize=(4,)>
yy (y) int64 dask.array<shape=(8,), chunksize=(4,)>
Dimensions without coordinates: x, y
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
xarray/core/dataset.py
Outdated
if dask.is_dask_collection(v) else | ||
(False, k, v) for k, v in self._variables.items()] | ||
return self._dask_postpersist, (info, self._coord_names, self._dims, | ||
self._attrs, self._file_obj, self._encoding) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please indent
xarray/tests/test_dask.py
Outdated
lambda x: x.persist(), | ||
pytest.mark.skipif(LooseVersion(dask.__version__) < '0.16', | ||
lambda x: dask.persist(x)[0], | ||
reason='Need Dask 0.16+') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty confusing at first glance, unless you already deeply understand how pytest marks work.
I don't really have a suggestion for how to make this better, but maybe a comment is in order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment on the first such use of this parametrization
xarray/tests/test_distributed.py
Outdated
assert dask.is_dask_collection(y) | ||
assert dask.is_dask_collection(y.var1) | ||
assert dask.is_dask_collection(y.var2) | ||
# assert not dask.is_dask_collection(y.var3) # TODO: avoid chunking unnecessarily in dataset.py::maybe_chunk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could probably argue about whether .chunk()
should chunk variables that don't use the supplied dimension. But the default behavior is chunk everything when given an empty argument, so I think it's actually correct to do it this way (it certainly makes the return value easier to understand).
Probably a better way to do this would be to construct the dataset by hand from dask arrays, e.g.,
ds = Dataset({'foo': ('x', da.arange(3, chunks=(3,)), 'bar': ('x', np.arange(3))})
assert dask.is_dask_collection(ds)
assert dask.is_dask_collection(ds.foo)
assert not dask.is_dask_collection(ds.bar)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a distributed context there is more cost to this behavior than with the threaded scheduler because we communicate the array around the network, rather than do things immediately/locally.
3ea0dc1
to
d65569b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, though it still needs the note in "What's new". This is pretty slow-risk in its current state (just adding new methods), so I would be OK including it in v0.10.
xarray/tests/test_distributed.py
Outdated
assert dask.is_dask_collection(z) | ||
assert dask.is_dask_collection(z.var1) | ||
assert dask.is_dask_collection(z.var2) | ||
# assert not dask.is_dask_collection(z.var3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
So, we're currently labeling the Dask collection interface (what we've implemented here) as experimental and subject to change without a deprecation cycle. I don't foresee much changing, but it might be wise to let people experiment with this in master for a while without putting it in the release. I would not be surprised if we find issues with it after moderate use. |
I just tried things and persisting datasets seems to work well for me in practice. |
Thank you for stepping in @shoyer . From my perspective this is good to go. However I'm also not in any rush. |
Thanks Matt. I was just waiting for CI to pass. I've indicated that this is experimental in the release notes, so as long as we keep the messaging consistent on the Dask side I think we have room to change this up if needed. |
Great. I'm glad to see this in. Thanks for the help! |
This integrates the new dask interface methods into XArray. This will place XArray as a first-class dask collection and help in particular with newer dask.distributed features.
git diff upstream/master **/*py | flake8 --diff
whats-new.rst
for all changes andapi.rst
for new APIBuilds on work from @jcrist here: dask/dask#2748
Depends on dask/dask#2847