From 9cc05507e47c39bd3053f81a24be8b7ebd0d50a0 Mon Sep 17 00:00:00 2001 From: Ryan Abernathey Date: Mon, 19 Nov 2018 17:00:55 -0500 Subject: [PATCH 01/14] wip: getting started --- xarray/backends/zarr.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/xarray/backends/zarr.py b/xarray/backends/zarr.py index 06fe7f04e4f..83ec8d24eaf 100644 --- a/xarray/backends/zarr.py +++ b/xarray/backends/zarr.py @@ -224,7 +224,8 @@ class ZarrStore(AbstractWritableDataStore): """ @classmethod - def open_group(cls, store, mode='r', synchronizer=None, group=None): + def open_group(cls, store, mode='r', synchronizer=None, group=None, + consolidated=False): import zarr min_zarr = '2.2' @@ -234,8 +235,13 @@ def open_group(cls, store, mode='r', synchronizer=None, group=None): "installation " "http://zarr.readthedocs.io/en/stable/" "#installation" % min_zarr) - zarr_group = zarr.open_group(store=store, mode=mode, - synchronizer=synchronizer, path=group) + + open_kwargs = dict(mode=mode, synchronizer=synchronizer, path=group) + if consolidated: + # TODO: an option to pass the metadata_key keyword + zarr_group = zarr.open_consolidated(store, **open_kwargs) + else: + zarr_group = zarr.open_group(store, **open_kwargs) return cls(zarr_group) def __init__(self, zarr_group): @@ -337,7 +343,7 @@ def sync(self): def open_zarr(store, group=None, synchronizer=None, auto_chunk=True, decode_cf=True, mask_and_scale=True, decode_times=True, concat_characters=True, decode_coords=True, - drop_variables=None): + drop_variables=None, consolidated=False): """Load and decode a dataset from a Zarr store. .. note:: Experimental @@ -383,10 +389,13 @@ def open_zarr(store, group=None, synchronizer=None, auto_chunk=True, decode_coords : bool, optional If True, decode the 'coordinates' attribute to identify coordinates in the resulting dataset. - drop_variables: string or iterable, optional + drop_variables : string or iterable, optional A variable or list of variables to exclude from being parsed from the dataset. This may be useful to drop variables with problems or inconsistent values. + consolidated : bool, optional + Whether to open the store using zarr's consolidated metadata + capability. Only works for stores that have already been consolidated. Returns ------- @@ -423,7 +432,7 @@ def maybe_decode_store(store, lock=False): mode = 'r' zarr_store = ZarrStore.open_group(store, mode=mode, synchronizer=synchronizer, - group=group) + group=group, consolidated=consolidated) ds = maybe_decode_store(zarr_store) # auto chunking needs to be here and not in ZarrStore because variable From 7eda4cc113d46bcdf041078aefec3727f6c4b947 Mon Sep 17 00:00:00 2001 From: Ryan Abernathey Date: Mon, 19 Nov 2018 23:36:32 -0500 Subject: [PATCH 02/14] preliminary support for zarr consolidated metadata --- xarray/backends/api.py | 17 ++++++++++++----- xarray/core/dataset.py | 20 ++++++++++++++------ xarray/tests/test_backends.py | 8 ++++++++ 3 files changed, 34 insertions(+), 11 deletions(-) diff --git a/xarray/backends/api.py b/xarray/backends/api.py index c1ace7774f9..7e58913bd2c 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -861,7 +861,7 @@ def save_mfdataset(datasets, paths, mode='w', format=None, groups=None, def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None, - encoding=None, compute=True): + encoding=None, compute=True, consolidate=False): """This function creates an appropriate datastore for writing a dataset to a zarr ztore @@ -876,16 +876,23 @@ def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None, _validate_dataset_names(dataset) _validate_attrs(dataset) - store = backends.ZarrStore.open_group(store=store, mode=mode, + zstore = backends.ZarrStore.open_group(store=store, mode=mode, synchronizer=synchronizer, group=group) writer = ArrayWriter() # TODO: figure out how to properly handle unlimited_dims - dump_to_store(dataset, store, writer, encoding=encoding) + dump_to_store(dataset, zstore, writer, encoding=encoding) writes = writer.sync(compute=compute) if not compute: import dask - return dask.delayed(_finalize_store)(writes, store) - return store + return dask.delayed(_finalize_store)(writes, zstore) + # TODO: handle consolidation for delayed case + + if consolidate: + import zarr + zarr.consolidate_metadata(store) + # do we need to reload ztore now that we have consolidated? + + return zstore diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index 4f9c61b3269..d17a9168bfa 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -1222,7 +1222,7 @@ def to_netcdf(self, path=None, mode='w', format=None, group=None, compute=compute) def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None, - encoding=None, compute=True): + encoding=None, compute=True, consolidate=False): """Write dataset contents to a zarr group. .. note:: Experimental @@ -1244,9 +1244,16 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None, Nested dictionary with variable names as keys and dictionaries of variable specific encodings as values, e.g., ``{'my_variable': {'dtype': 'int16', 'scale_factor': 0.1,}, ...}`` - compute: boolean - If true compute immediately, otherwise return a + compute: bool, optional + If True compute immediately, otherwise return a ``dask.delayed.Delayed`` object that can be computed later. + consolidate: bool, optional + If True, apply zarr's `consolidate_metadata` function to the store + after writing. + + References + ---------- + https://zarr.readthedocs.io/ """ if encoding is None: encoding = {} @@ -1256,7 +1263,8 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None, "and 'w-'.") from ..backends.api import to_zarr return to_zarr(self, store=store, mode=mode, synchronizer=synchronizer, - group=group, encoding=encoding, compute=compute) + group=group, encoding=encoding, compute=compute, + consolidate=consolidate) def __unicode__(self): return formatting.dataset_repr(self) @@ -1380,7 +1388,7 @@ def _validate_indexers(self, indexers): """ Here we make sure + indexer has a valid keys + indexer is in a valid data type - + string indexers are cast to the appropriate date type if the + + string indexers are cast to the appropriate date type if the associated index is a DatetimeIndex or CFTimeIndex """ from .dataarray import DataArray @@ -1963,7 +1971,7 @@ def _validate_interp_indexer(x, new_x): 'Instead got\n{}'.format(new_x)) else: return (x, new_x) - + variables = OrderedDict() for name, var in iteritems(obj._variables): if name not in indexers: diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index fb9c43c0165..8a40d01a49f 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -1320,6 +1320,14 @@ def roundtrip_append(self, data, save_kwargs={}, open_kwargs={}, allow_cleanup_failure=False): pytest.skip("zarr backend does not support appending") + def test_roundtrip_consolidated(self): + expected = create_test_data() + with self.roundtrip(expected, + save_kwargs={'consolidate': True}, + open_kwargs={'consolidated': True}) as actual: + self.check_dtypes_roundtripped(expected, actual) + assert_identical(expected, actual) + def test_auto_chunk(self): original = create_test_data().chunk() From 0af5abd2f7d6a6e1bbad4a7ac1502a67251cd347 Mon Sep 17 00:00:00 2001 From: Ryan Abernathey Date: Mon, 19 Nov 2018 23:44:48 -0500 Subject: [PATCH 03/14] update zarr dev repo --- ci/requirements-py36-zarr-dev.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/requirements-py36-zarr-dev.yml b/ci/requirements-py36-zarr-dev.yml index 7fbce63aa81..6ed466ba5cb 100644 --- a/ci/requirements-py36-zarr-dev.yml +++ b/ci/requirements-py36-zarr-dev.yml @@ -18,4 +18,4 @@ dependencies: - pip: - coveralls - pytest-cov - - git+https://github.com/alimanfoo/zarr.git + - git+https://github.com/zarr-developers/zarr.git From bb6f9c23459949bd857fc3f6699514556c0f807d Mon Sep 17 00:00:00 2001 From: Ryan Abernathey Date: Sat, 24 Nov 2018 15:41:05 -0500 Subject: [PATCH 04/14] add consolidate to close --- xarray/backends/api.py | 15 ++++++--------- xarray/backends/zarr.py | 18 +++++++++++++++--- xarray/tests/test_distributed.py | 10 ++++++++++ 3 files changed, 31 insertions(+), 12 deletions(-) diff --git a/xarray/backends/api.py b/xarray/backends/api.py index 7e58913bd2c..ee673bf4801 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -878,21 +878,18 @@ def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None, zstore = backends.ZarrStore.open_group(store=store, mode=mode, synchronizer=synchronizer, - group=group) + group=group, + consolidate_on_close=consolidate) writer = ArrayWriter() # TODO: figure out how to properly handle unlimited_dims dump_to_store(dataset, zstore, writer, encoding=encoding) writes = writer.sync(compute=compute) - if not compute: + if compute: + _finalize_store(writes, zstore) + else: import dask return dask.delayed(_finalize_store)(writes, zstore) - # TODO: handle consolidation for delayed case - - if consolidate: - import zarr - zarr.consolidate_metadata(store) - # do we need to reload ztore now that we have consolidated? - + return zstore diff --git a/xarray/backends/zarr.py b/xarray/backends/zarr.py index 83ec8d24eaf..e67968f25ec 100644 --- a/xarray/backends/zarr.py +++ b/xarray/backends/zarr.py @@ -225,7 +225,7 @@ class ZarrStore(AbstractWritableDataStore): @classmethod def open_group(cls, store, mode='r', synchronizer=None, group=None, - consolidated=False): + consolidated=False, consolidate_on_close=False): import zarr min_zarr = '2.2' @@ -236,19 +236,26 @@ def open_group(cls, store, mode='r', synchronizer=None, group=None, "http://zarr.readthedocs.io/en/stable/" "#installation" % min_zarr) + if consolidated or consolidate_on_close: + if LooseVersion(zarr.__version__) <= '2.2': # pragma: no cover + raise NotImplementedError("Zarr version 2.3 or greater is " + "required by for consolidated " + "metadata.") + open_kwargs = dict(mode=mode, synchronizer=synchronizer, path=group) if consolidated: # TODO: an option to pass the metadata_key keyword zarr_group = zarr.open_consolidated(store, **open_kwargs) else: zarr_group = zarr.open_group(store, **open_kwargs) - return cls(zarr_group) + return cls(zarr_group, consolidate_on_close) - def __init__(self, zarr_group): + def __init__(self, zarr_group, consolidate_on_close=False): self.ds = zarr_group self._read_only = self.ds.read_only self._synchronizer = self.ds.synchronizer self._group = self.ds.path + self._consolidate_on_close = consolidate_on_close def open_store_variable(self, name, zarr_array): data = indexing.LazilyOuterIndexedArray(ZarrArrayWrapper(name, self)) @@ -339,6 +346,11 @@ def store(self, variables, attributes, *args, **kwargs): def sync(self): pass + def close(self): + if self._consolidate_on_close: + import zarr + zarr.consolidate_metadata(self.ds.store) + def open_zarr(store, group=None, synchronizer=None, auto_chunk=True, decode_cf=True, mask_and_scale=True, decode_times=True, diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index 1837a0fe4ef..81a99455093 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -130,6 +130,16 @@ def test_dask_distributed_zarr_integration_test(loop): assert isinstance(restored.var1.data, da.Array) computed = restored.compute() assert_allclose(original, computed) + # repeat with consolidated metadata and delayed compute + with create_tmp_file(allow_cleanup_failure=ON_WINDOWS, + suffix='.zarrc') as filename: + futures = original.to_zarr(filename, consolidate=True, + compute=False) + futures.compute() + with xr.open_zarr(filename, consolidated=True) as restored: + assert isinstance(restored.var1.data, da.Array) + computed = restored.compute() + assert_allclose(original, computed) @requires_rasterio From 95ac3b9df92575b5b806184a8b00ede3705d0369 Mon Sep 17 00:00:00 2001 From: Ryan Abernathey Date: Sat, 24 Nov 2018 15:41:10 -0500 Subject: [PATCH 05/14] doc updates --- doc/io.rst | 27 ++++++++++++++++++++++++++- doc/whats-new.rst | 15 +++++++++------ 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/doc/io.rst b/doc/io.rst index e841e665308..44a3c485f76 100644 --- a/doc/io.rst +++ b/doc/io.rst @@ -635,6 +635,31 @@ For example: Not all native zarr compression and filtering options have been tested with xarray. +Consolidated Metadata +~~~~~~~~~~~~~~~~~~~~~ + +Xarray needs to read all of the zarr metadata when it opens a dataset. +In some storage mediums, such as with cloud object storage (e.g. amazon S3), +this can introduce significant overhead, because two separate HTTP calls to the +object store for each variable in the dataset. +With version 2.3, zarr will support an feature called *consolidated metadata*, +which allows all metadata for the entire dataset to be stored with a single +key (by default called ``.zmetadata``). This can drastically reduce the latency +opening the store. (For more information on this feature, consult the +`zarr docs `_.) + +If you have zarr version 2.3 or greater, xarray can write and read stores +with consolidated metadata. To write consolidated metadata, pass the +``consolidate=True`` option to the +:py:attr:`Dataset.to_zarr ` method:: + + ds.to_zarr('foo.zarr', consolidate=True) + +To read a consolidated store, pass the ``consolidated=True`` option to +:py:func:`~xarray.open_zarr`:: + + ds = xr.open_zarr('foo.zarr', consolidated=True) + .. _io.cfgrib: GRIB format via cfgrib @@ -678,7 +703,7 @@ Formats supported by PseudoNetCDF --------------------------------- xarray can also read CAMx, BPCH, ARL PACKED BIT, and many other file -formats supported by PseudoNetCDF_, if PseudoNetCDF is installed. +formats supported by PseudoNetCDF_, if PseudoNetCDF is installed. PseudoNetCDF can also provide Climate Forecasting Conventions to CMAQ files. In addition, PseudoNetCDF can automatically register custom readers that subclass PseudoNetCDF.PseudoNetCDFFile. PseudoNetCDF can diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 1da1da700e7..b8a63f6c9f7 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -36,6 +36,9 @@ Breaking changes Enhancements ~~~~~~~~~~~~ +- Ability to read and write consolidated metadata in zarr stores. + By `Ryan Abernathey `_. + Bug fixes ~~~~~~~~~ @@ -52,15 +55,15 @@ Breaking changes - ``Dataset.T`` has been removed as a shortcut for :py:meth:`Dataset.transpose`. Call :py:meth:`Dataset.transpose` directly instead. - Iterating over a ``Dataset`` now includes only data variables, not coordinates. - Similarily, calling ``len`` and ``bool`` on a ``Dataset`` now + Similarily, calling ``len`` and ``bool`` on a ``Dataset`` now includes only data variables. - ``DataArray.__contains__`` (used by Python's ``in`` operator) now checks - array data, not coordinates. + array data, not coordinates. - The old resample syntax from before xarray 0.10, e.g., ``data.resample('1D', dim='time', how='mean')``, is no longer supported will raise an error in most cases. You need to use the new resample syntax instead, e.g., ``data.resample(time='1D').mean()`` or - ``data.resample({'time': '1D'}).mean()``. + ``data.resample({'time': '1D'}).mean()``. - New deprecations (behavior will be changed in xarray 0.12): @@ -97,13 +100,13 @@ Breaking changes than by default trying to coerce them into ``np.datetime64[ns]`` objects. A :py:class:`~xarray.CFTimeIndex` will be used for indexing along time coordinates in these cases. - - A new method :py:meth:`~xarray.CFTimeIndex.to_datetimeindex` has been added + - A new method :py:meth:`~xarray.CFTimeIndex.to_datetimeindex` has been added to aid in converting from a :py:class:`~xarray.CFTimeIndex` to a :py:class:`pandas.DatetimeIndex` for the remaining use-cases where using a :py:class:`~xarray.CFTimeIndex` is still a limitation (e.g. for resample or plotting). - Setting the ``enable_cftimeindex`` option is now a no-op and emits a - ``FutureWarning``. + ``FutureWarning``. Enhancements ~~~~~~~~~~~~ @@ -190,7 +193,7 @@ Bug fixes the dates must be encoded using cftime rather than NumPy (:issue:`2272`). By `Spencer Clark `_. -- Chunked datasets can now roundtrip to Zarr storage continually +- Chunked datasets can now roundtrip to Zarr storage continually with `to_zarr` and ``open_zarr`` (:issue:`2300`). By `Lily Wang `_. From cfa0a08fb88e2dab70aa1f7787bd8f64c417b2f9 Mon Sep 17 00:00:00 2001 From: Ryan Abernathey Date: Sat, 24 Nov 2018 15:56:29 -0500 Subject: [PATCH 06/14] skip tests based on zarr version --- xarray/tests/test_backends.py | 6 ++++++ xarray/tests/test_distributed.py | 4 ++++ 2 files changed, 10 insertions(+) diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index 8a40d01a49f..6276db7bbf9 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -1,6 +1,7 @@ from __future__ import absolute_import, division, print_function import contextlib +from distutils.version import LooseVersion import itertools import math import os.path @@ -1320,7 +1321,12 @@ def roundtrip_append(self, data, save_kwargs={}, open_kwargs={}, allow_cleanup_failure=False): pytest.skip("zarr backend does not support appending") + def test_roundtrip_consolidated(self): + # my understanding is that pytest.mark.skipif is broken here + import zarr + if LooseVersion(zarr.__version__) <= '2.2': + pytest.skip('zarr version 2.3 or greater needed for consolidated') expected = create_test_data() with self.roundtrip(expected, save_kwargs={'consolidate': True}, diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index 81a99455093..a55e0642d4d 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -1,5 +1,6 @@ """ isort:skip_file """ from __future__ import absolute_import, division, print_function +from distutils.version import LooseVersion import os import sys import pickle @@ -131,6 +132,9 @@ def test_dask_distributed_zarr_integration_test(loop): computed = restored.compute() assert_allclose(original, computed) # repeat with consolidated metadata and delayed compute + import zarr + if LooseVersion(zarr.__version__) <= '2.2': + return with create_tmp_file(allow_cleanup_failure=ON_WINDOWS, suffix='.zarrc') as filename: futures = original.to_zarr(filename, consolidate=True, From 9b4a8aaad098aba966414859a3ee8053cc00a107 Mon Sep 17 00:00:00 2001 From: Ryan Abernathey Date: Sat, 24 Nov 2018 16:05:08 -0500 Subject: [PATCH 07/14] fix doc typos --- doc/io.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/doc/io.rst b/doc/io.rst index 44a3c485f76..d76806bfdaf 100644 --- a/doc/io.rst +++ b/doc/io.rst @@ -641,10 +641,10 @@ Consolidated Metadata Xarray needs to read all of the zarr metadata when it opens a dataset. In some storage mediums, such as with cloud object storage (e.g. amazon S3), this can introduce significant overhead, because two separate HTTP calls to the -object store for each variable in the dataset. -With version 2.3, zarr will support an feature called *consolidated metadata*, +object store must be made for each variable in the dataset. +With version 2.3, zarr will support a feature called *consolidated metadata*, which allows all metadata for the entire dataset to be stored with a single -key (by default called ``.zmetadata``). This can drastically reduce the latency +key (by default called ``.zmetadata``). This can drastically speed up opening the store. (For more information on this feature, consult the `zarr docs `_.) From c00ef82736d4b47c7923c591e245566b603f644d Mon Sep 17 00:00:00 2001 From: Ryan Abernathey Date: Mon, 26 Nov 2018 12:52:39 -0500 Subject: [PATCH 08/14] fix PEP8 issues --- xarray/backends/api.py | 8 ++++---- xarray/backends/zarr.py | 2 +- xarray/tests/test_backends.py | 1 - 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/xarray/backends/api.py b/xarray/backends/api.py index ee673bf4801..94f2d078553 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -877,9 +877,9 @@ def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None, _validate_attrs(dataset) zstore = backends.ZarrStore.open_group(store=store, mode=mode, - synchronizer=synchronizer, - group=group, - consolidate_on_close=consolidate) + synchronizer=synchronizer, + group=group, + consolidate_on_close=consolidate) writer = ArrayWriter() # TODO: figure out how to properly handle unlimited_dims @@ -891,5 +891,5 @@ def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None, else: import dask return dask.delayed(_finalize_store)(writes, zstore) - + return zstore diff --git a/xarray/backends/zarr.py b/xarray/backends/zarr.py index e67968f25ec..cd2832645ad 100644 --- a/xarray/backends/zarr.py +++ b/xarray/backends/zarr.py @@ -237,7 +237,7 @@ def open_group(cls, store, mode='r', synchronizer=None, group=None, "#installation" % min_zarr) if consolidated or consolidate_on_close: - if LooseVersion(zarr.__version__) <= '2.2': # pragma: no cover + if LooseVersion(zarr.__version__) <= '2.2': # pragma: no cover raise NotImplementedError("Zarr version 2.3 or greater is " "required by for consolidated " "metadata.") diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index 6276db7bbf9..ea11c8dd552 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -1321,7 +1321,6 @@ def roundtrip_append(self, data, save_kwargs={}, open_kwargs={}, allow_cleanup_failure=False): pytest.skip("zarr backend does not support appending") - def test_roundtrip_consolidated(self): # my understanding is that pytest.mark.skipif is broken here import zarr From e3579a8cbb696e4bff899182a03f4af9a5b64b99 Mon Sep 17 00:00:00 2001 From: Ryan Abernathey Date: Wed, 28 Nov 2018 11:09:07 -0500 Subject: [PATCH 09/14] fix test skipping --- xarray/tests/test_backends.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index ea11c8dd552..6c7905d06e2 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -1,7 +1,6 @@ from __future__ import absolute_import, division, print_function import contextlib -from distutils.version import LooseVersion import itertools import math import os.path @@ -1322,10 +1321,7 @@ def roundtrip_append(self, data, save_kwargs={}, open_kwargs={}, pytest.skip("zarr backend does not support appending") def test_roundtrip_consolidated(self): - # my understanding is that pytest.mark.skipif is broken here - import zarr - if LooseVersion(zarr.__version__) <= '2.2': - pytest.skip('zarr version 2.3 or greater needed for consolidated') + zarr = pytest.importorskip('zarr', minversion="2.2.1.dev2") expected = create_test_data() with self.roundtrip(expected, save_kwargs={'consolidate': True}, From 6ef6d6369806251efaf697b0bf06cb1c8300c893 Mon Sep 17 00:00:00 2001 From: Ryan Abernathey Date: Wed, 28 Nov 2018 11:50:52 -0500 Subject: [PATCH 10/14] fixed integration test --- xarray/tests/test_distributed.py | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index a55e0642d4d..a6dcb35ca7b 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -119,28 +119,26 @@ def test_dask_distributed_read_netcdf_integration_test( @requires_zarr -def test_dask_distributed_zarr_integration_test(loop): +@pytest.mark.parametrize('consolidated', [True, False]) +@pytest.mark.parametrize('compute', [True, False]) +def test_dask_distributed_zarr_integration_test(loop, consolidated, compute): + if consolidated: + zarr = pytest.importorskip('zarr', minversion="2.2.1.dev2") + write_kwargs = dict(consolidate=True) + read_kwargs = dict(consolidated=True) + else: + write_kwargs = read_kwargs = {} chunks = {'dim1': 4, 'dim2': 3, 'dim3': 5} with cluster() as (s, [a, b]): with Client(s['address'], loop=loop) as c: original = create_test_data().chunk(chunks) - with create_tmp_file(allow_cleanup_failure=ON_WINDOWS, - suffix='.zarr') as filename: - original.to_zarr(filename) - with xr.open_zarr(filename) as restored: - assert isinstance(restored.var1.data, da.Array) - computed = restored.compute() - assert_allclose(original, computed) - # repeat with consolidated metadata and delayed compute - import zarr - if LooseVersion(zarr.__version__) <= '2.2': - return with create_tmp_file(allow_cleanup_failure=ON_WINDOWS, suffix='.zarrc') as filename: - futures = original.to_zarr(filename, consolidate=True, - compute=False) - futures.compute() - with xr.open_zarr(filename, consolidated=True) as restored: + maybe_futures = original.to_zarr(filename, compute=compute, + **write_kwargs) + if not compute: + maybe_futures.compute() + with xr.open_zarr(filename, **read_kwargs) as restored: assert isinstance(restored.var1.data, da.Array) computed = restored.compute() assert_allclose(original, computed) From fa9cc418e85d45e113941c413b79e8d68c778c23 Mon Sep 17 00:00:00 2001 From: Ryan Abernathey Date: Wed, 28 Nov 2018 11:58:14 -0500 Subject: [PATCH 11/14] update version check --- xarray/backends/zarr.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/xarray/backends/zarr.py b/xarray/backends/zarr.py index cd2832645ad..05e445a1e88 100644 --- a/xarray/backends/zarr.py +++ b/xarray/backends/zarr.py @@ -237,9 +237,9 @@ def open_group(cls, store, mode='r', synchronizer=None, group=None, "#installation" % min_zarr) if consolidated or consolidate_on_close: - if LooseVersion(zarr.__version__) <= '2.2': # pragma: no cover - raise NotImplementedError("Zarr version 2.3 or greater is " - "required by for consolidated " + if LooseVersion(zarr.__version__) <= '2.2.1.dev2': # pragma: no cover + raise NotImplementedError("Zarr version 2.2.1.dev2 or greater " + "is required by for consolidated " "metadata.") open_kwargs = dict(mode=mode, synchronizer=synchronizer, path=group) From 09eee44c7e8507eceb2fc618ac63d2c6d1e75873 Mon Sep 17 00:00:00 2001 From: Ryan Abernathey Date: Fri, 30 Nov 2018 10:33:43 -0500 Subject: [PATCH 12/14] rename keyword arg --- doc/io.rst | 4 ++-- xarray/backends/api.py | 4 ++-- xarray/core/dataset.py | 6 +++--- xarray/tests/test_backends.py | 2 +- xarray/tests/test_distributed.py | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/doc/io.rst b/doc/io.rst index d76806bfdaf..59f32247996 100644 --- a/doc/io.rst +++ b/doc/io.rst @@ -650,10 +650,10 @@ opening the store. (For more information on this feature, consult the If you have zarr version 2.3 or greater, xarray can write and read stores with consolidated metadata. To write consolidated metadata, pass the -``consolidate=True`` option to the +``consolidated=True`` option to the :py:attr:`Dataset.to_zarr ` method:: - ds.to_zarr('foo.zarr', consolidate=True) + ds.to_zarr('foo.zarr', consolidated=True) To read a consolidated store, pass the ``consolidated=True`` option to :py:func:`~xarray.open_zarr`:: diff --git a/xarray/backends/api.py b/xarray/backends/api.py index 94f2d078553..f2b6bc196a0 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -861,7 +861,7 @@ def save_mfdataset(datasets, paths, mode='w', format=None, groups=None, def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None, - encoding=None, compute=True, consolidate=False): + encoding=None, compute=True, consolidated=False): """This function creates an appropriate datastore for writing a dataset to a zarr ztore @@ -879,7 +879,7 @@ def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None, zstore = backends.ZarrStore.open_group(store=store, mode=mode, synchronizer=synchronizer, group=group, - consolidate_on_close=consolidate) + consolidate_on_close=consolidated) writer = ArrayWriter() # TODO: figure out how to properly handle unlimited_dims diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index d17a9168bfa..c289703875d 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -1222,7 +1222,7 @@ def to_netcdf(self, path=None, mode='w', format=None, group=None, compute=compute) def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None, - encoding=None, compute=True, consolidate=False): + encoding=None, compute=True, consolidated=False): """Write dataset contents to a zarr group. .. note:: Experimental @@ -1247,7 +1247,7 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None, compute: bool, optional If True compute immediately, otherwise return a ``dask.delayed.Delayed`` object that can be computed later. - consolidate: bool, optional + consolidated: bool, optional If True, apply zarr's `consolidate_metadata` function to the store after writing. @@ -1264,7 +1264,7 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None, from ..backends.api import to_zarr return to_zarr(self, store=store, mode=mode, synchronizer=synchronizer, group=group, encoding=encoding, compute=compute, - consolidate=consolidate) + consolidated=consolidated) def __unicode__(self): return formatting.dataset_repr(self) diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index 6c7905d06e2..2361b8c2236 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -1324,7 +1324,7 @@ def test_roundtrip_consolidated(self): zarr = pytest.importorskip('zarr', minversion="2.2.1.dev2") expected = create_test_data() with self.roundtrip(expected, - save_kwargs={'consolidate': True}, + save_kwargs={'consolidated': True}, open_kwargs={'consolidated': True}) as actual: self.check_dtypes_roundtripped(expected, actual) assert_identical(expected, actual) diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index a6dcb35ca7b..bd62b8d906d 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -124,7 +124,7 @@ def test_dask_distributed_read_netcdf_integration_test( def test_dask_distributed_zarr_integration_test(loop, consolidated, compute): if consolidated: zarr = pytest.importorskip('zarr', minversion="2.2.1.dev2") - write_kwargs = dict(consolidate=True) + write_kwargs = dict(consolidated=True) read_kwargs = dict(consolidated=True) else: write_kwargs = read_kwargs = {} From 95829f0183f9501871bf2bb5670a390e3117b756 Mon Sep 17 00:00:00 2001 From: Ryan Abernathey Date: Tue, 4 Dec 2018 14:29:48 -0500 Subject: [PATCH 13/14] Update whats-new.rst --- doc/whats-new.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index bf28bb686bb..040f72acb56 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -36,7 +36,7 @@ Breaking changes Enhancements ~~~~~~~~~~~~ -- Ability to read and write consolidated metadata in zarr stores. +- Ability to read and write consolidated metadata in zarr stores (:issue:`2558`). By `Ryan Abernathey `_. - :py:class:`CFTimeIndex` uses slicing for string indexing when possible (like :py:class:`pandas.DatetimeIndex`), which avoids unnecessary copies. From fe4af34732f104f1e5f2b18e25dec1c3b92d6809 Mon Sep 17 00:00:00 2001 From: Ryan Abernathey Date: Tue, 4 Dec 2018 14:33:54 -0500 Subject: [PATCH 14/14] instructions for consolidating existing stores --- doc/io.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/doc/io.rst b/doc/io.rst index 59f32247996..682fbf5202e 100644 --- a/doc/io.rst +++ b/doc/io.rst @@ -659,6 +659,10 @@ To read a consolidated store, pass the ``consolidated=True`` option to :py:func:`~xarray.open_zarr`:: ds = xr.open_zarr('foo.zarr', consolidated=True) + +Xarray can't perform consolidation on pre-existing zarr datasets. This should +be done directly from zarr, as described in the +`zarr docs `_. .. _io.cfgrib: