Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zarr consolidated #2559

Merged
merged 16 commits into from
Dec 4, 2018
17 changes: 12 additions & 5 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ def save_mfdataset(datasets, paths, mode='w', format=None, groups=None,


def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None,
encoding=None, compute=True):
encoding=None, compute=True, consolidate=False):
"""This function creates an appropriate datastore for writing a dataset to
a zarr ztore

Expand All @@ -876,16 +876,23 @@ def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None,
_validate_dataset_names(dataset)
_validate_attrs(dataset)

store = backends.ZarrStore.open_group(store=store, mode=mode,
zstore = backends.ZarrStore.open_group(store=store, mode=mode,
synchronizer=synchronizer,
group=group)

writer = ArrayWriter()
# TODO: figure out how to properly handle unlimited_dims
dump_to_store(dataset, store, writer, encoding=encoding)
dump_to_store(dataset, zstore, writer, encoding=encoding)
writes = writer.sync(compute=compute)

if not compute:
import dask
return dask.delayed(_finalize_store)(writes, store)
return store
return dask.delayed(_finalize_store)(writes, zstore)
# TODO: handle consolidation for delayed case
jhamman marked this conversation as resolved.
Show resolved Hide resolved

if consolidate:
import zarr
zarr.consolidate_metadata(store)
# do we need to reload ztore now that we have consolidated?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense for zarr to handle this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant reloading the zarr store automatically.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would be hard to achieve. And I'm not sure it's necessary. Frankly I don't know why we return a store object from to_zarr at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

zarr.consolidate_metadata returns the output of open_consolidated on the same store, so this is already happening


return zstore
21 changes: 15 additions & 6 deletions xarray/backends/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ class ZarrStore(AbstractWritableDataStore):
"""

@classmethod
def open_group(cls, store, mode='r', synchronizer=None, group=None):
def open_group(cls, store, mode='r', synchronizer=None, group=None,
consolidated=False):
import zarr
min_zarr = '2.2'

Expand All @@ -234,8 +235,13 @@ def open_group(cls, store, mode='r', synchronizer=None, group=None):
"installation "
"http://zarr.readthedocs.io/en/stable/"
"#installation" % min_zarr)
zarr_group = zarr.open_group(store=store, mode=mode,
synchronizer=synchronizer, path=group)

open_kwargs = dict(mode=mode, synchronizer=synchronizer, path=group)
if consolidated:
# TODO: an option to pass the metadata_key keyword
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to consider this TODO here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anything to do here now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we feel that it's important to expose this functionality from within xarray? I don't.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't.
I think it's ok for xarray to have an opinion on what the special key is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose we just leave these TODO's here as is. If anyone ever needs this feature from the xarray side, this will help guide them on how to implement it.

zarr_group = zarr.open_consolidated(store, **open_kwargs)
else:
zarr_group = zarr.open_group(store, **open_kwargs)
return cls(zarr_group)

def __init__(self, zarr_group):
Expand Down Expand Up @@ -337,7 +343,7 @@ def sync(self):
def open_zarr(store, group=None, synchronizer=None, auto_chunk=True,
decode_cf=True, mask_and_scale=True, decode_times=True,
concat_characters=True, decode_coords=True,
drop_variables=None):
drop_variables=None, consolidated=False):
"""Load and decode a dataset from a Zarr store.

.. note:: Experimental
Expand Down Expand Up @@ -383,10 +389,13 @@ def open_zarr(store, group=None, synchronizer=None, auto_chunk=True,
decode_coords : bool, optional
If True, decode the 'coordinates' attribute to identify coordinates in
the resulting dataset.
drop_variables: string or iterable, optional
drop_variables : string or iterable, optional
A variable or list of variables to exclude from being parsed from the
dataset. This may be useful to drop variables with problems or
inconsistent values.
consolidated : bool, optional
Whether to open the store using zarr's consolidated metadata
capability. Only works for stores that have already been consolidated.

Returns
-------
Expand Down Expand Up @@ -423,7 +432,7 @@ def maybe_decode_store(store, lock=False):
mode = 'r'
zarr_store = ZarrStore.open_group(store, mode=mode,
synchronizer=synchronizer,
group=group)
group=group, consolidated=consolidated)
ds = maybe_decode_store(zarr_store)

# auto chunking needs to be here and not in ZarrStore because variable
Expand Down
20 changes: 14 additions & 6 deletions xarray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1222,7 +1222,7 @@ def to_netcdf(self, path=None, mode='w', format=None, group=None,
compute=compute)

def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None,
encoding=None, compute=True):
encoding=None, compute=True, consolidate=False):
"""Write dataset contents to a zarr group.

.. note:: Experimental
Expand All @@ -1244,9 +1244,16 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None,
Nested dictionary with variable names as keys and dictionaries of
variable specific encodings as values, e.g.,
``{'my_variable': {'dtype': 'int16', 'scale_factor': 0.1,}, ...}``
compute: boolean
If true compute immediately, otherwise return a
compute: bool, optional
If True compute immediately, otherwise return a
``dask.delayed.Delayed`` object that can be computed later.
consolidate: bool, optional
If True, apply zarr's `consolidate_metadata` function to the store
after writing.

References
----------
https://zarr.readthedocs.io/
"""
if encoding is None:
encoding = {}
Expand All @@ -1256,7 +1263,8 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None,
"and 'w-'.")
from ..backends.api import to_zarr
return to_zarr(self, store=store, mode=mode, synchronizer=synchronizer,
group=group, encoding=encoding, compute=compute)
group=group, encoding=encoding, compute=compute,
consolidate=consolidate)

def __unicode__(self):
return formatting.dataset_repr(self)
Expand Down Expand Up @@ -1380,7 +1388,7 @@ def _validate_indexers(self, indexers):
""" Here we make sure
+ indexer has a valid keys
+ indexer is in a valid data type
+ string indexers are cast to the appropriate date type if the
+ string indexers are cast to the appropriate date type if the
associated index is a DatetimeIndex or CFTimeIndex
"""
from .dataarray import DataArray
Expand Down Expand Up @@ -1963,7 +1971,7 @@ def _validate_interp_indexer(x, new_x):
'Instead got\n{}'.format(new_x))
else:
return (x, new_x)

variables = OrderedDict()
for name, var in iteritems(obj._variables):
if name not in indexers:
Expand Down
8 changes: 8 additions & 0 deletions xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,14 @@ def roundtrip_append(self, data, save_kwargs={}, open_kwargs={},
allow_cleanup_failure=False):
pytest.skip("zarr backend does not support appending")

def test_roundtrip_consolidated(self):
expected = create_test_data()
with self.roundtrip(expected,
save_kwargs={'consolidate': True},
open_kwargs={'consolidated': True}) as actual:
self.check_dtypes_roundtripped(expected, actual)
assert_identical(expected, actual)

def test_auto_chunk(self):
original = create_test_data().chunk()

Expand Down