diff --git a/doc/whats-new.rst b/doc/whats-new.rst index a6567193951..00701c718af 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -25,6 +25,10 @@ Breaking changes merges will now succeed in cases that previously raised ``xarray.MergeError``. Set ``compat='broadcast_equals'`` to restore the previous default. +- Pickling an xarray object based on the dask backend, or reading its + :py:meth:`values` property, won't automatically convert the array from dask + to numpy in the original object anymore. + By `Guido Imperiale `_. Deprecations ~~~~~~~~~~~~ @@ -45,33 +49,33 @@ Deprecations Enhancements ~~~~~~~~~~~~ - Add checking of ``attr`` names and values when saving to netCDF, raising useful -error messages if they are invalid. (:issue:`911`). -By `Robin Wilson `_. - + error messages if they are invalid. (:issue:`911`). + By `Robin Wilson `_. - Added ability to save ``DataArray`` objects directly to netCDF files using :py:meth:`~xarray.DataArray.to_netcdf`, and to load directly from netCDF files using :py:func:`~xarray.open_dataarray` (:issue:`915`). These remove the need to convert a ``DataArray`` to a ``Dataset`` before saving as a netCDF file, and deals with names to ensure a perfect 'roundtrip' capability. By `Robin Wilson `_. - - Added the ``compat`` option ``'no_conflicts'`` to ``merge``, allowing the combination of xarray objects with disjoint (:issue:`742`) or overlapping (:issue:`835`) coordinates as long as all present data agrees. By `Johnnie Gray `_. See :ref:`combining.no_conflicts` for more details. - - It is now possible to set ``concat_dim=None`` explicitly in :py:func:`~xarray.open_mfdataset` to disable inferring a dimension along which to concatenate. By `Stephan Hoyer `_. +- Added methods :py:meth:`DataArray.compute`, :py:meth:`Dataset.compute`, and + :py:meth:`Variable.compute` as a non-mutating alternative to + :py:meth:`~DataArray.load`. + By `Guido Imperiale `_. Bug fixes ~~~~~~~~~ diff --git a/xarray/core/dataarray.py b/xarray/core/dataarray.py index 44d5865ad37..79fdd7c1100 100644 --- a/xarray/core/dataarray.py +++ b/xarray/core/dataarray.py @@ -561,6 +561,19 @@ def load(self): self._coords = new._coords return self + def compute(self): + """Manually trigger loading of this array's data from disk or a + remote source into memory and return a new array. The original is + left unaltered. + + Normally, it should not be necessary to call this method in user code, + because all xarray functions should either work on deferred data or + load data automatically. However, this method can be necessary when + working with many file objects on disk. + """ + new = self.copy(deep=False) + return new.load() + def copy(self, deep=True): """Returns a copy of this array. diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index 45650d21501..30eaf167f6e 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -255,8 +255,11 @@ def load_store(cls, store, decoder=None): return obj def __getstate__(self): - """Always load data in-memory before pickling""" - self.load() + """Load data in-memory before pickling (except for Dask data)""" + for v in self.variables.values(): + if not isinstance(v.data, dask_array_type): + v.load() + # self.__dict__ is the default pickle object, we don't need to # implement our own __setstate__ method to make pickle work state = self.__dict__.copy() @@ -319,6 +322,19 @@ def load(self): return self + def compute(self): + """Manually trigger loading of this dataset's data from disk or a + remote source into memory and return a new dataset. The original is + left unaltered. + + Normally, it should not be necessary to call this method in user code, + because all xarray functions should either work on deferred data or + load data automatically. However, this method can be necessary when + working with many file objects on disk. + """ + new = self.copy(deep=False) + return new.load() + @classmethod def _construct_direct(cls, variables, coord_names, dims=None, attrs=None, file_obj=None): @@ -401,14 +417,12 @@ def copy(self, deep=False): """Returns a copy of this dataset. If `deep=True`, a deep copy is made of each of the component variables. - Otherwise, a shallow copy is made, so each variable in the new dataset - is also a variable in the original dataset. + Otherwise, a shallow copy of each of the component variable is made, so + that the underlying memory region of the new dataset is the same as in + the original dataset. """ - if deep: - variables = OrderedDict((k, v.copy(deep=True)) - for k, v in iteritems(self._variables)) - else: - variables = self._variables.copy() + variables = OrderedDict((k, v.copy(deep=deep)) + for k, v in iteritems(self._variables)) # skip __init__ to avoid costly validation return self._construct_direct(variables, self._coord_names.copy(), self._dims.copy(), self._attrs_copy()) diff --git a/xarray/core/variable.py b/xarray/core/variable.py index b524b95f87f..9cbc21ac010 100644 --- a/xarray/core/variable.py +++ b/xarray/core/variable.py @@ -274,10 +274,21 @@ def data(self, data): "replacement data must match the Variable's shape") self._data = data + def _data_cast(self): + if isinstance(self._data, (np.ndarray, PandasIndexAdapter)): + return self._data + else: + return np.asarray(self._data) + def _data_cached(self): - if not isinstance(self._data, (np.ndarray, PandasIndexAdapter)): - self._data = np.asarray(self._data) - return self._data + """Load data into memory and return it. + Do not cache dask arrays automatically; that should + require an explicit load() call. + """ + new_data = self._data_cast() + if not isinstance(self._data, dask_array_type): + self._data = new_data + return new_data @property def _indexable_data(self): @@ -290,12 +301,25 @@ def load(self): Normally, it should not be necessary to call this method in user code, because all xarray functions should either work on deferred data or load data automatically. - """ - self._data_cached() + """ + self._data = self._data_cast() return self + def compute(self): + """Manually trigger loading of this variable's data from disk or a + remote source into memory and return a new variable. The original is + left unaltered. + + Normally, it should not be necessary to call this method in user code, + because all xarray functions should either work on deferred data or + load data automatically. + """ + new = self.copy(deep=False) + return new.load() + def __getstate__(self): - """Always cache data as an in-memory array before pickling""" + """Always cache data as an in-memory array before pickling + (with the exception of dask backend)""" self._data_cached() # self.__dict__ is the default pickle object, we don't need to # implement our own __setstate__ method to make pickle work @@ -1093,10 +1117,11 @@ def __init__(self, dims, data, attrs=None, encoding=None, fastpath=False): raise ValueError('%s objects must be 1-dimensional' % type(self).__name__) - def _data_cached(self): - if not isinstance(self._data, PandasIndexAdapter): - self._data = PandasIndexAdapter(self._data) - return self._data + def _data_cast(self): + if isinstance(self._data, PandasIndexAdapter): + return self._data + else: + return PandasIndexAdapter(self._data) def __getitem__(self, key): key = self._item_key_to_tuple(key) diff --git a/xarray/test/test_backends.py b/xarray/test/test_backends.py index eeb5561579b..0ff5cffec26 100644 --- a/xarray/test/test_backends.py +++ b/xarray/test/test_backends.py @@ -149,6 +149,24 @@ def assert_loads(vars=None): actual = ds.load() self.assertDatasetAllClose(expected, actual) + def test_dataset_compute(self): + expected = create_test_data() + + with self.roundtrip(expected) as actual: + # Test Dataset.compute() + for v in actual.variables.values(): + self.assertFalse(v._in_memory) + + computed = actual.compute() + + for v in actual.data_vars.values(): + self.assertFalse(v._in_memory) + for v in computed.variables.values(): + self.assertTrue(v._in_memory) + + self.assertDatasetAllClose(expected, actual) + self.assertDatasetAllClose(expected, computed) + def test_roundtrip_None_variable(self): expected = Dataset({None: (('x', 'y'), [[0, 1], [2, 3]])}) with self.roundtrip(expected) as actual: @@ -230,18 +248,6 @@ def test_roundtrip_coordinates(self): with self.roundtrip(expected) as actual: self.assertDatasetIdentical(expected, actual) - expected = original.copy() - expected.attrs['coordinates'] = 'something random' - with self.assertRaisesRegexp(ValueError, 'cannot serialize'): - with self.roundtrip(expected): - pass - - expected = original.copy(deep=True) - expected['foo'].attrs['coordinates'] = 'something random' - with self.assertRaisesRegexp(ValueError, 'cannot serialize'): - with self.roundtrip(expected): - pass - def test_roundtrip_boolean_dtype(self): original = create_boolean_data() self.assertEqual(original['x'].dtype, 'bool') @@ -872,7 +878,26 @@ def test_read_byte_attrs_as_unicode(self): @requires_dask @requires_scipy @requires_netCDF4 -class DaskTest(TestCase): +class DaskTest(TestCase, DatasetIOTestCases): + @contextlib.contextmanager + def create_store(self): + yield Dataset() + + @contextlib.contextmanager + def roundtrip(self, data, save_kwargs={}, open_kwargs={}): + yield data.chunk() + + def test_roundtrip_datetime_data(self): + # Override method in DatasetIOTestCases - remove not applicable save_kwds + times = pd.to_datetime(['2000-01-01', '2000-01-02', 'NaT']) + expected = Dataset({'t': ('t', times), 't0': times[0]}) + with self.roundtrip(expected) as actual: + self.assertDatasetIdentical(expected, actual) + + def test_write_store(self): + # Override method in DatasetIOTestCases - not applicable to dask + pass + def test_open_mfdataset(self): original = Dataset({'foo': ('x', np.random.randn(10))}) with create_tmp_file() as tmp1: @@ -992,7 +1017,16 @@ def test_deterministic_names(self): self.assertIn(tmp, dask_name) self.assertEqual(original_names, repeat_names) - + def test_dataarray_compute(self): + # Test DataArray.compute() on dask backend. + # The test for Dataset.compute() is already in DatasetIOTestCases; + # however dask is the only tested backend which supports DataArrays + actual = DataArray([1,2]).chunk() + computed = actual.compute() + self.assertFalse(actual._in_memory) + self.assertTrue(computed._in_memory) + self.assertDataArrayAllClose(actual, computed) + @requires_scipy_or_netCDF4 @requires_pydap class PydapTest(TestCase): diff --git a/xarray/test/test_dask.py b/xarray/test/test_dask.py index d3d602f5285..7f93ab18a50 100644 --- a/xarray/test/test_dask.py +++ b/xarray/test/test_dask.py @@ -1,3 +1,4 @@ +import pickle import numpy as np import pandas as pd @@ -12,44 +13,20 @@ import dask.array as da -def _copy_at_variable_level(arg): - """We need to copy the argument at the level of xarray.Variable objects, so - that viewing its values does not trigger lazy loading. - """ - if isinstance(arg, Variable): - return arg.copy(deep=False) - elif isinstance(arg, DataArray): - ds = arg.to_dataset(name='__copied__') - return _copy_at_variable_level(ds)['__copied__'] - elif isinstance(arg, Dataset): - ds = arg.copy() - for k in list(ds): - ds._variables[k] = ds._variables[k].copy(deep=False) - return ds - else: - assert False - - class DaskTestCase(TestCase): def assertLazyAnd(self, expected, actual, test): - expected_copy = _copy_at_variable_level(expected) - actual_copy = _copy_at_variable_level(actual) with dask.set_options(get=dask.get): - test(actual_copy, expected_copy) - var = getattr(actual, 'variable', actual) - self.assertIsInstance(var.data, da.Array) + test(actual, expected) + if isinstance(actual, Dataset): + for var in actual.variables.values(): + self.assertIsInstance(var.data, da.Array) + else: + var = getattr(actual, 'variable', actual) + self.assertIsInstance(var.data, da.Array) @requires_dask class TestVariable(DaskTestCase): - def assertLazyAnd(self, expected, actual, test): - expected_copy = expected.copy(deep=False) - actual_copy = actual.copy(deep=False) - with dask.set_options(get=dask.get): - test(actual_copy, expected_copy) - var = getattr(actual, 'variable', actual) - self.assertIsInstance(var.data, da.Array) - def assertLazyAndIdentical(self, expected, actual): self.assertLazyAnd(expected, actual, self.assertVariableIdentical) @@ -199,6 +176,9 @@ def assertLazyAndIdentical(self, expected, actual): def assertLazyAndAllClose(self, expected, actual): self.assertLazyAnd(expected, actual, self.assertDataArrayAllClose) + def assertLazyAndEqual(self, expected, actual): + self.assertLazyAnd(expected, actual, self.assertDataArrayEqual) + def setUp(self): self.values = np.random.randn(4, 6) self.data = da.from_array(self.values, chunks=(2, 2)) @@ -266,7 +246,7 @@ def test_to_dataset_roundtrip(self): v = self.lazy_array expected = u.assign_coords(x=u['x']) - self.assertLazyAndIdentical(expected, v.to_dataset('x').to_array('x')) + self.assertLazyAndEqual(expected, v.to_dataset('x').to_array('x')) def test_merge(self): @@ -275,7 +255,7 @@ def duplicate_and_merge(array): expected = duplicate_and_merge(self.eager_array) actual = duplicate_and_merge(self.lazy_array) - self.assertLazyAndIdentical(expected, actual) + self.assertLazyAndEqual(expected, actual) def test_ufuncs(self): u = self.eager_array @@ -288,9 +268,9 @@ def test_where_dispatching(self): x = da.from_array(a, 5) y = da.from_array(b, 5) expected = DataArray(a).where(b) - self.assertLazyAndIdentical(expected, DataArray(a).where(y)) - self.assertLazyAndIdentical(expected, DataArray(x).where(b)) - self.assertLazyAndIdentical(expected, DataArray(x).where(y)) + self.assertLazyAndEqual(expected, DataArray(a).where(y)) + self.assertLazyAndEqual(expected, DataArray(x).where(b)) + self.assertLazyAndEqual(expected, DataArray(x).where(y)) def test_simultaneous_compute(self): ds = Dataset({'foo': ('x', range(5)), @@ -315,13 +295,39 @@ def test_stack(self): expected = DataArray(data.reshape(2, -1), {'w': [0, 1], 'z': z}, dims=['w', 'z']) assert stacked.data.chunks == expected.data.chunks - self.assertLazyAndIdentical(expected, stacked) + self.assertLazyAndEqual(expected, stacked) def test_dot(self): eager = self.eager_array.dot(self.eager_array[0]) lazy = self.lazy_array.dot(self.lazy_array[0]) self.assertLazyAndAllClose(eager, lazy) + def test_dataarray_pickle(self): + # Test that pickling/unpickling does not convert the dask + # backend to numpy + a1 = DataArray([1,2]).chunk() + self.assertFalse(a1._in_memory) + a2 = pickle.loads(pickle.dumps(a1)) + self.assertDataArrayIdentical(a1, a2) + self.assertFalse(a1._in_memory) + self.assertFalse(a2._in_memory) + + def test_dataset_pickle(self): + ds1 = Dataset({'a': [1,2]}).chunk() + self.assertFalse(ds1['a']._in_memory) + ds2 = pickle.loads(pickle.dumps(ds1)) + self.assertDatasetIdentical(ds1, ds2) + self.assertFalse(ds1['a']._in_memory) + self.assertFalse(ds2['a']._in_memory) + + def test_values(self): + # Test that invoking the values property does not convert the dask + # backend to numpy + a = DataArray([1,2]).chunk() + self.assertFalse(a._in_memory) + self.assertEquals(a.values.tolist(), [1, 2]) + self.assertFalse(a._in_memory) + def test_from_dask_variable(self): # Test array creation from Variable with dask backend. # This is used e.g. in broadcast() diff --git a/xarray/test/test_dataset.py b/xarray/test/test_dataset.py index b9798dbd611..bac07d68d0f 100644 --- a/xarray/test/test_dataset.py +++ b/xarray/test/test_dataset.py @@ -1287,10 +1287,13 @@ def test_copy(self): for copied in [data.copy(deep=False), copy(data)]: self.assertDatasetIdentical(data, copied) - for k in data: + # Note: IndexVariable objects with string dtype are always + # copied because of xarray.core.util.safe_cast_to_index. + # Limiting the test to data variables. + for k in data.data_vars: v0 = data.variables[k] v1 = copied.variables[k] - self.assertIs(v0, v1) + assert source_ndarray(v0.data) is source_ndarray(v1.data) copied['foo'] = ('z', np.arange(5)) self.assertNotIn('foo', data)