Skip to content

Commit

Permalink
Consolidate Metadata (#142)
Browse files Browse the repository at this point in the history
* Consolidate Metadata

* checkpoint

* checkpoint

* abstract

* checkpoint

* open update

* flake

* static and some test

* unfinished tests

* working basic tests

* assert

* cleaned

* Update backend.py

* Update backend.py

* Update CHANGELOG.md

* Update storage.rst

* bool and updated tests

* storage rst

* tutorial

* remove

* review

* Update docs/source/storage.rst

* Update src/hdmf_zarr/backend.py

* Update src/hdmf_zarr/backend.py

* Update src/hdmf_zarr/backend.py

---------

Co-authored-by: Oliver Ruebel <[email protected]>
  • Loading branch information
mavaylon1 and oruebel authored Dec 5, 2023
1 parent 7555512 commit 9c3b4be
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 0.5.0 (Upcoming)

### Enhancements
* Added a new default to consolidate metadata in order more efficeintly traverse storage contents. @mavaylon1 [#142](https://github.com/hdmf-dev/hdmf-zarr/pull/142)
* Fix linking for FSSpec and support passing of `storage_options` required reading data from S3 #138. @alejoe91 [#120](https://github.com/hdmf-dev/hdmf-zarr/pull/138)

## 0.4.0 (October 3, 2023)
Expand Down
13 changes: 13 additions & 0 deletions docs/gallery/plot_nwb_zarrio.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from datetime import datetime
from dateutil.tz import tzlocal
import zarr

import numpy as np
from pynwb import NWBFile
Expand Down Expand Up @@ -142,3 +143,15 @@
# relative ``path`` here instead is fine.
with NWBZarrIO(path=absolute_path, mode="r") as io:
infile = io.read()

###############################################################################
# Consolidating Metadata
# ----------------------
# When writing to Zarr, the metadata within the file will be consolidated into a single
# file within the root group, `.zmetadata`. Users who do not wish to consolidate the
# metadata can set the boolean parameter `consolidate_metadata` to `False` within `write`.
# Even when the metadata is consolidated, the metadata natively within the file can be altered.
# Any alterations within would require the user to call `zarr.convenience.consolidate_metadata()`
# to sync the file with the changes. Please refer to the Zarr documentation for more details:
# https://zarr.readthedocs.io/en/stable/tutorial.html#storage-alternatives
zarr.consolidate_metadata(path)
14 changes: 14 additions & 0 deletions docs/source/storage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -394,3 +394,17 @@ data type. The specification of the namespace is stored in
``/specifications/<namespace-name>/<version>/<source-filename>``. Here ``<source-filename>`` refers to the main name
of the source-file without file extension (e.g., the core namespace defines ``nwb.ephys.yaml`` as source which would
be stored in ``/specifications/core/2.0.1/nwb.ecephys``).

Consolidating Metadata
======================

Zarr allows users to consolidate all metadata for groups and arrays within the given store. By default, every file
will consolidate all metadata within into a single `.zmetadata` file, stored in the root group. This reduces the number of read
operations when retrieving certain metadata in read mode.

.. note::

When updating a file, the consolidated metadata will also need to be updated via
`zarr.consolidate_metadata(path)` to ensure the consolidated metadata is consistent
with the file.

97 changes: 84 additions & 13 deletions src/hdmf_zarr/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,18 @@ def object_codec_class(self):
def open(self):
"""Open the Zarr file"""
if self.__file is None:
self.__file = zarr.open(store=self.path,
mode=self.__mode,
synchronizer=self.__synchronizer,
storage_options=self.__storage_options)
# Within zarr, open_consolidated only allows the mode to be 'r' or 'r+'.
# As a result, when in other modes, the file will not use consolidated metadata.
if self.__mode not in ['r', 'r+']:
self.__file = zarr.open(store=self.path,
mode=self.__mode,
synchronizer=self.__synchronizer,
storage_options=self.__storage_options)
else:
self.__file = self.__open_file_consolidated(store=self.path,
mode=self.__mode,
synchronizer=self.__synchronizer,
storage_options=self.__storage_options)

def close(self):
"""Close the Zarr file"""
Expand Down Expand Up @@ -238,6 +246,14 @@ def load_namespaces(cls, namespace_catalog, path, namespaces=None):
),
"default": None,
},
{
"name": "consolidate_metadata",
"type": bool,
"doc": (
"Consolidate metadata into a single .zmetadata file in the root group to accelerate read."
),
"default": True,
}
)
def write(self, **kwargs):
"""Overwrite the write method to add support for caching the specification and parallelization."""
Expand Down Expand Up @@ -398,11 +414,19 @@ def get_builder_disk_path(self, **kwargs):
'doc': 'The source of the builders when exporting',
'default': None,
},
{
"name": "consolidate_metadata",
"type": bool,
"doc": (
"Consolidate metadata into a single .zmetadata file in the root group to accelerate read."
),
"default": True,
}
)
def write_builder(self, **kwargs):
"""Write a builder to disk."""
f_builder, link_data, exhaust_dci, export_source = getargs(
'builder', 'link_data', 'exhaust_dci', 'export_source', kwargs
f_builder, link_data, exhaust_dci, export_source, consolidate_metadata = getargs(
'builder', 'link_data', 'exhaust_dci', 'export_source', 'consolidate_metadata', kwargs
)
for name, gbldr in f_builder.groups.items():
self.write_group(
Expand All @@ -426,6 +450,50 @@ def write_builder(self, **kwargs):
self.logger.debug("Done writing %s '%s' to path '%s'" %
(f_builder.__class__.__qualname__, f_builder.name, self.source))

# Consolidate metadata for the entire file after everything has been written
if consolidate_metadata:
zarr.consolidate_metadata(store=self.path)

@staticmethod
def __get_store_path(store):
"""
Method to retrieve the path from the Zarr storage.
ConsolidatedMetadataStore wraps around other Zarr Store objects, requiring a check to
retrieve the path.
"""
if isinstance(store, zarr.storage.ConsolidatedMetadataStore):
fpath = store.store.path
else:
fpath = store.path

return fpath

def __open_file_consolidated(self,
store,
mode,
synchronizer=None,
storage_options=None):
"""
This method will check to see if the metadata has been consolidated.
If so, use open_consolidated.
"""
# self.path can be both a string or a one of the `SUPPORTED_ZARR_STORES`.
if isinstance(self.path, str):
path = self.path
else:
path = self.path.path

if os.path.isfile(path+'/.zmetadata'):
return zarr.open_consolidated(store=store,
mode=mode,
synchronizer=synchronizer,
storage_options=storage_options)
else:
return zarr.open(store=self.path,
mode=self.__mode,
synchronizer=self.__synchronizer,
storage_options=self.__storage_options)

@docval({'name': 'parent', 'type': Group, 'doc': 'the parent Zarr object'},
{'name': 'builder', 'type': GroupBuilder, 'doc': 'the GroupBuilder to write'},
{'name': 'link_data', 'type': bool,
Expand Down Expand Up @@ -575,7 +643,8 @@ def get_zarr_paths(zarr_object):
# In Zarr the path is a combination of the path of the store and the path of the object. So we first need to
# merge those two paths, then remove the path of the file, add the missing leading "/" and then compute the
# directory name to get the path of the parent
fullpath = os.path.normpath(os.path.join(zarr_object.store.path, zarr_object.path)).replace("\\", "/")
fpath = ZarrIO._ZarrIO__get_store_path(zarr_object.store)
fullpath = os.path.normpath(os.path.join(fpath, zarr_object.path)).replace("\\", "/")
# To determine the filepath we now iterate over the path and check if the .zgroup object exists at
# a level, indicating that we are still within the Zarr file. The first level we hit where the parent
# directory does not have a .zgroup means we have found the main file
Expand Down Expand Up @@ -653,7 +722,7 @@ def resolve_ref(self, zarr_ref):
else:
target_name = ROOT_NAME

target_zarr_obj = zarr.open(source_file, mode='r', storage_options=self.__storage_options)
target_zarr_obj = self.__open_file_consolidated(source_file, mode='r', storage_options=self.__storage_options)
if object_path is not None:
try:
target_zarr_obj = target_zarr_obj[object_path]
Expand Down Expand Up @@ -886,7 +955,8 @@ def write_dataset(self, **kwargs): # noqa: C901
if isinstance(data, Array):
# copy the dataset
if link_data:
self.__add_link__(parent, data.store.path, data.name, name)
path = self.__get_store_path(data.store)
self.__add_link__(parent, path, data.name, name)
linked = True
dset = None
else:
Expand Down Expand Up @@ -1202,7 +1272,7 @@ def read_builder(self):
return f_builder

def __set_built(self, zarr_obj, builder):
fpath = zarr_obj.store.path
fpath = self.__get_store_path(zarr_obj.store)
path = zarr_obj.path
path = os.path.join(fpath, path)
self.__built.setdefault(path, builder)
Expand Down Expand Up @@ -1242,7 +1312,8 @@ def __get_built(self, zarr_obj):
:type zarr_obj: Zarr Group or Dataset
:return: Builder in the self.__built cache or None
"""
fpath = zarr_obj.store.path

fpath = self.__get_store_path(zarr_obj.store)
path = zarr_obj.path
path = os.path.join(fpath, path)
return self.__built.get(path, None)
Expand All @@ -1258,7 +1329,7 @@ def __read_group(self, zarr_obj, name=None):
# Create the GroupBuilder
attributes = self.__read_attrs(zarr_obj)
ret = GroupBuilder(name=name, source=self.source, attributes=attributes)
ret.location = self.get_zarr_parent_path(zarr_obj)
ret.location = ZarrIO.get_zarr_parent_path(zarr_obj)

# read sub groups
for sub_name, sub_group in zarr_obj.groups():
Expand Down Expand Up @@ -1353,7 +1424,7 @@ def __read_dataset(self, zarr_obj, name):
if name is None:
name = str(os.path.basename(zarr_obj.name))
ret = DatasetBuilder(name, **kwargs) # create builder object for dataset
ret.location = self.get_zarr_parent_path(zarr_obj)
ret.location = ZarrIO.get_zarr_parent_path(zarr_obj)
self._written_builders.set_written(ret) # record that the builder has been written
self.__set_built(zarr_obj, ret)
return ret
Expand Down
36 changes: 36 additions & 0 deletions tests/unit/base_tests_zarrio.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,42 @@ def tearDown(self):
warnings.warn("Could not remove: %s" % path)


class ZarrStoreTestCase(TestCase):
"""
Class that creates a zarr file containing groups, datasets, and references for
general purpose testing.
"""
def setUp(self):
self.store = "tests/unit/test_io.zarr"

def tearDown(self):
shutil.rmtree(self.store)

def createReferenceBuilder(self):
data_1 = np.arange(100, 200, 10).reshape(2, 5)
data_2 = np.arange(0, 200, 10).reshape(4, 5)
dataset_1 = DatasetBuilder('dataset_1', data_1)
dataset_2 = DatasetBuilder('dataset_2', data_2)

ref_dataset_1 = ReferenceBuilder(dataset_1)
ref_dataset_2 = ReferenceBuilder(dataset_2)
ref_data = [ref_dataset_1, ref_dataset_2]
dataset_ref = DatasetBuilder('ref_dataset', ref_data, dtype='object')

builder = GroupBuilder('root',
source=self.store,
datasets={'dataset_1': dataset_1,
'dataset_2': dataset_2,
'ref_dataset': dataset_ref})
return builder

def create_zarr(self, consolidate_metadata=True):
builder = self.createReferenceBuilder()
writer = ZarrIO(self.store, mode='a')
writer.write_builder(builder, consolidate_metadata)
writer.close()


class BaseTestZarrWriter(BaseZarrWriterTestCase):
"""
Test writing of builder with ZarrIO
Expand Down
30 changes: 30 additions & 0 deletions tests/unit/test_zarrio.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,18 @@
need to implement the tests separately for the different backends.
"""
from tests.unit.base_tests_zarrio import (BaseTestZarrWriter,
ZarrStoreTestCase,
BaseTestZarrWriteUnit,
BaseTestExportZarrToZarr)
from zarr.storage import (DirectoryStore,
TempStore,
NestedDirectoryStore)
import zarr
from hdmf_zarr.backend import ZarrIO
import os


CUR_DIR = os.path.dirname(os.path.realpath(__file__))


######################################################
Expand Down Expand Up @@ -122,3 +129,26 @@ class TestExportZarrToZarrNestedDirectoryStore(BaseTestExportZarrToZarr):
def setUp(self):
super().setUp()
self.store = [NestedDirectoryStore(p) for p in self.store_path]


#########################################
# Consolidate Metadata tests
#########################################
class TestConsolidateMetadata(ZarrStoreTestCase):
"""
Tests for consolidated metadata and corresponding helper methods.
"""
def test_get_store_path_shallow(self):
self.create_zarr(consolidate_metadata=False)
store = DirectoryStore(self.store)
path = ZarrIO._ZarrIO__get_store_path(store)
expected_path = os.path.normpath(os.path.join(CUR_DIR, 'test_io.zarr'))
self.assertEqual(path, expected_path)

def test_get_store_path_deep(self):
self.create_zarr()
zarr_obj = zarr.open_consolidated(self.store, mode='r')
store = zarr_obj.store
path = ZarrIO._ZarrIO__get_store_path(store)
expected_path = os.path.normpath(os.path.join(CUR_DIR, 'test_io.zarr'))
self.assertEqual(path, expected_path)

0 comments on commit 9c3b4be

Please sign in to comment.