From 647d1759ffa06742ce7c7aa177aa7ea9b073b3ce Mon Sep 17 00:00:00 2001 From: Sean Harkins Date: Mon, 18 Nov 2024 19:59:56 -0500 Subject: [PATCH] Non-kerchunk backend for HDF5/netcdf4 files. (#87) * Generate chunk manifest backed variable from HDF5 dataset. * Transfer dataset attrs to variable. * Get virtual variables dict from HDF5 file. * Update virtual_vars_from_hdf to use fsspec and drop_variables arg. * mypy fix to use ChunkKey and empty dimensions list. * Extract attributes from hdf5 root group. * Use hdf reader for netcdf4 files. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix ruff complaints. * First steps for handling HDF5 filters. * Initial step for hdf5plugin supported codecs. * Small commit to check compression support in CI environment. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix mypy complaints for hdf_filters. * Local pre-commit fix for hdf_filters. * Use fsspec reader_options introduced in #37. * Fix incorrect zarr_v3 if block position from merge commit ef0d7a8. * Fix early return from hdf _extract_attrs. * Test that _extract_attrs correctly handles multiple attributes. * Initial attempt at scale and offset via numcodecs. * Tests for cfcodec_from_dataset. * Temporarily relax integration tests to assert_allclose. * Add blosc_lz4 fixture parameterization to confirm libnetcdf environment. * Check for compatability with netcdf4 engine. * Use separate fixtures for h5netcdf and netcdf4 compression styles. * Print libhdf5 and libnetcdf4 versions to confirm compiled environment. * Skip netcdf4 style compression tests when libhdf5 < 1.14. * Include imagecodecs.numcodecs to support HDF5 lzf filters. * Remove test that verifies call to read_kerchunk_references_from_file. * Add additional codec support structures for imagecodecs and numcodecs. * Add codec config test for Zstd. * Include initial cf decoding tests. * Revert typo for scale_factor retrieval. * Update reader to use new numpy manifest representation. * Temporarily skip test until blosc netcdf4 issue is solved. * Fix Pydantic 2 migration warnings. * Include hdf5plugin and imagecodecs-numcodecs in mamba test environment. * Mamba attempt with imagecodecs rather than imagecodecs-numcodecs. * Mamba attempt with latest imagecodecs release. * Use correct iter_chunks callback function signtature. * Include pip based imagecodecs-numcodecs until conda-forge availability. * Handle non-coordinate dims which are serialized to hdf as empty dataset. * Use reader_options for filetype check and update failing kerchunk call. * Fix chunkmanifest shaping for chunked datasets. * Handle scale_factor attribute serialization for compressed files. * Include chunked roundtrip fixture. * Standardize xarray integration tests for hdf filters. * Update reader selection logic for new filetype determination. * Use decode_times for integration test. * Standardize fixture names for hdf5 vs netcdf4 file types. * Handle array add_offset property for compressed data. * Include h5py shuffle filter. * Make ScaleAndOffset codec last in filters list. * Apply ScaleAndOffset codec to _FillValue since it's value is now downstream. * Coerce scale and add_offset values to native float for JSON serialization. * Temporarily xfail integration tests for main * Remove pydantic dependency as per pull/210. * Update test for new kerchunk reader module location. * Fix branch typing errors. * Re-include automatic file type determination. * Handle various hdf flavors of _FillValue storage. * Include loadable variables in drop variables list. * Mock readers.hdf.virtual_vars_from_hdf to verify option passing. * Convert numpy _FillValue to native Python for serialization support. * Support groups with HDF5 reader. * Handle empty variables with a shape. * Import top-level version of xarray classes. * Add option to explicitly specify use of an experimental hdf backend. * Include imagecodecs and hdf5plugin in all CI environments. * Add test_hdf_integration tests to be skipped for non-kerchunk env. * Include imagecodecs in dependencies. * Diagnose imagecodecs-numcodecs installation failures in CI. * Ignore mypy complaints for VirtualBackend. * Remove checksum assert which varies across different zstd versions. * Temporarily xfail integration tests with coordinate inconsistency. * Remove backend arg for non-hdf network file tests. * Fix mypy comment moved by ruff formatting. * Make HDR reader dependencies optional. * Handle optional imagecodecs and hdf5plugin dependency imports for tests. * Prevent conflicts with explicit filetype and backend args. * Correctly convert root coordinate attributes to a list. * Clarify that method extracts attrs from any specified group. * Restructure hdf reader and codec filters into a module namespace. * Improve docstrings for hdf and filter modules. * Explicitly specify HDF5VirtualBackend for test parameter. * Include isssue references for xfailed tests. * Use soft import strategy for optional dependencies see xarray/issues/9554. * Handle mypy for soft imports. * Attempt at nested optional depedency usage. * Handle use of soft import sub modules for typing. --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- ci/environment.yml | 6 + ci/min-deps.yml | 1 - ci/upstream.yml | 4 + pyproject.toml | 11 +- virtualizarr/backend.py | 16 +- virtualizarr/readers/__init__.py | 2 + virtualizarr/readers/hdf/__init__.py | 11 + virtualizarr/readers/hdf/filters.py | 195 ++++++++++ virtualizarr/readers/hdf/hdf.py | 364 ++++++++++++++++++ virtualizarr/tests/__init__.py | 2 + virtualizarr/tests/test_backend.py | 93 +++-- virtualizarr/tests/test_integration.py | 58 ++- virtualizarr/tests/test_readers/conftest.py | 324 ++++++++++++++++ virtualizarr/tests/test_readers/test_hdf.py | 180 +++++++++ .../tests/test_readers/test_hdf_filters.py | 134 +++++++ .../test_readers/test_hdf_integration.py | 52 +++ virtualizarr/tests/test_xarray.py | 36 +- virtualizarr/utils.py | 14 + 18 files changed, 1439 insertions(+), 64 deletions(-) create mode 100644 virtualizarr/readers/hdf/__init__.py create mode 100644 virtualizarr/readers/hdf/filters.py create mode 100644 virtualizarr/readers/hdf/hdf.py create mode 100644 virtualizarr/tests/test_readers/conftest.py create mode 100644 virtualizarr/tests/test_readers/test_hdf.py create mode 100644 virtualizarr/tests/test_readers/test_hdf_filters.py create mode 100644 virtualizarr/tests/test_readers/test_hdf_integration.py diff --git a/ci/environment.yml b/ci/environment.yml index 0bb5b366..b26bb440 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -13,6 +13,8 @@ dependencies: - ujson - packaging - universal_pathlib + - hdf5plugin + - numcodecs # Testing - codecov - pre-commit @@ -27,7 +29,11 @@ dependencies: - fsspec - s3fs - fastparquet + - imagecodecs>=2024.6.1 # for opening tiff files - tifffile # for opening FITS files - astropy + - pip + - pip: + - imagecodecs-numcodecs==2024.6.1 diff --git a/ci/min-deps.yml b/ci/min-deps.yml index 05778382..344a4595 100644 --- a/ci/min-deps.yml +++ b/ci/min-deps.yml @@ -3,7 +3,6 @@ channels: - conda-forge - nodefaults dependencies: - - h5netcdf - h5py - hdf5 - netcdf4 diff --git a/ci/upstream.yml b/ci/upstream.yml index 51b5b8dc..91272fab 100644 --- a/ci/upstream.yml +++ b/ci/upstream.yml @@ -12,6 +12,9 @@ dependencies: - packaging - ujson - universal_pathlib + - hdf5plugin + - numcodecs + - imagecodecs>=2024.6.1 # Testing - codecov - pre-commit @@ -27,3 +30,4 @@ dependencies: - pip: - icechunk # Installs zarr v3 as dependency # - git+https://github.com/fsspec/kerchunk@main # kerchunk is currently incompatible with zarr-python v3 (https://github.com/fsspec/kerchunk/pull/516) + - imagecodecs-numcodecs==2024.6.1 diff --git a/pyproject.toml b/pyproject.toml index 749afb94..77998076 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,15 +30,23 @@ dependencies = [ ] [project.optional-dependencies] +hdf_reader = [ + "fsspec", + "h5py", + "hdf5plugin", + "imagecodecs", + "imagecodecs-numcodecs==2024.6.1", + "numcodecs" +] test = [ "codecov", "fastparquet", "fsspec", - "h5netcdf", "h5py", "kerchunk>=0.2.5", "mypy", "netcdf4", + "numcodecs", "pandas-stubs", "pooch", "pre-commit", @@ -48,6 +56,7 @@ test = [ "ruff", "s3fs", "scipy", + "virtualizarr[hdf_reader]" ] diff --git a/virtualizarr/backend.py b/virtualizarr/backend.py index 3b7195cb..fed2b756 100644 --- a/virtualizarr/backend.py +++ b/virtualizarr/backend.py @@ -16,8 +16,10 @@ HDF5VirtualBackend, KerchunkVirtualBackend, NetCDF3VirtualBackend, + TIFFVirtualBackend, ZarrV3VirtualBackend, ) +from virtualizarr.readers.common import VirtualBackend from virtualizarr.utils import _FsspecFSFromFilepath, check_for_collisions # TODO add entrypoint to allow external libraries to add to this mapping @@ -26,10 +28,10 @@ "zarr_v3": ZarrV3VirtualBackend, "dmrpp": DMRPPVirtualBackend, # all the below call one of the kerchunk backends internally (https://fsspec.github.io/kerchunk/reference.html#file-format-backends) - "netcdf3": NetCDF3VirtualBackend, "hdf5": HDF5VirtualBackend, "netcdf4": HDF5VirtualBackend, # note this is the same as for hdf5 - # "tiff": TIFFVirtualBackend, + "netcdf3": NetCDF3VirtualBackend, + "tiff": TIFFVirtualBackend, "fits": FITSVirtualBackend, } @@ -112,6 +114,7 @@ def open_virtual_dataset( indexes: Mapping[str, Index] | None = None, virtual_array_class=ManifestArray, reader_options: Optional[dict] = None, + backend: Optional[VirtualBackend] = None, ) -> Dataset: """ Open a file or store as an xarray Dataset wrapping virtualized zarr arrays. @@ -173,6 +176,9 @@ def open_virtual_dataset( if reader_options is None: reader_options = {} + if backend and filetype: + raise ValueError("Cannot pass both a filetype and an explicit VirtualBackend") + if filetype is not None: # if filetype is user defined, convert to FileType filetype = FileType(filetype) @@ -180,8 +186,10 @@ def open_virtual_dataset( filetype = automatically_determine_filetype( filepath=filepath, reader_options=reader_options ) - - backend_cls = VIRTUAL_BACKENDS.get(filetype.name.lower()) + if backend: + backend_cls = backend + else: + backend_cls = VIRTUAL_BACKENDS.get(filetype.name.lower()) # type: ignore if backend_cls is None: raise NotImplementedError(f"Unsupported file type: {filetype.name}") diff --git a/virtualizarr/readers/__init__.py b/virtualizarr/readers/__init__.py index 0f83ba39..c776a9ae 100644 --- a/virtualizarr/readers/__init__.py +++ b/virtualizarr/readers/__init__.py @@ -1,5 +1,6 @@ from virtualizarr.readers.dmrpp import DMRPPVirtualBackend from virtualizarr.readers.fits import FITSVirtualBackend +from virtualizarr.readers.hdf import HDFVirtualBackend from virtualizarr.readers.hdf5 import HDF5VirtualBackend from virtualizarr.readers.kerchunk import KerchunkVirtualBackend from virtualizarr.readers.netcdf3 import NetCDF3VirtualBackend @@ -9,6 +10,7 @@ __all__ = [ "DMRPPVirtualBackend", "FITSVirtualBackend", + "HDFVirtualBackend", "HDF5VirtualBackend", "KerchunkVirtualBackend", "NetCDF3VirtualBackend", diff --git a/virtualizarr/readers/hdf/__init__.py b/virtualizarr/readers/hdf/__init__.py new file mode 100644 index 00000000..d1519383 --- /dev/null +++ b/virtualizarr/readers/hdf/__init__.py @@ -0,0 +1,11 @@ +from .hdf import ( + HDFVirtualBackend, + construct_virtual_dataset, + open_loadable_vars_and_indexes, +) + +__all__ = [ + "HDFVirtualBackend", + "construct_virtual_dataset", + "open_loadable_vars_and_indexes", +] diff --git a/virtualizarr/readers/hdf/filters.py b/virtualizarr/readers/hdf/filters.py new file mode 100644 index 00000000..f0d1e8eb --- /dev/null +++ b/virtualizarr/readers/hdf/filters.py @@ -0,0 +1,195 @@ +import dataclasses +from typing import TYPE_CHECKING, List, Tuple, TypedDict, Union + +import numcodecs.registry as registry +import numpy as np +from numcodecs.abc import Codec +from numcodecs.fixedscaleoffset import FixedScaleOffset +from xarray.coding.variables import _choose_float_dtype + +from virtualizarr.utils import soft_import + +if TYPE_CHECKING: + import h5py # type: ignore + from h5py import Dataset, Group # type: ignore + +h5py = soft_import("h5py", "For reading hdf files", strict=False) +if h5py: + Dataset = h5py.Dataset + Group = h5py.Group +else: + Dataset = dict() + Group = dict() + +hdf5plugin = soft_import( + "hdf5plugin", "For reading hdf files with filters", strict=False +) +imagecodecs = soft_import( + "imagecodecs", "For reading hdf files with filters", strict=False +) + + +_non_standard_filters = { + "gzip": "zlib", + "lzf": "imagecodecs_lzf", +} + +_hdf5plugin_imagecodecs = {"lz4": "imagecodecs_lz4h5", "bzip2": "imagecodecs_bz2"} + + +@dataclasses.dataclass +class BloscProperties: + blocksize: int + clevel: int + shuffle: int + cname: str + + def __post_init__(self): + blosc_compressor_codes = { + value: key + for key, value in hdf5plugin._filters.Blosc._Blosc__COMPRESSIONS.items() + } + self.cname = blosc_compressor_codes[self.cname] + + +@dataclasses.dataclass +class ZstdProperties: + level: int + + +@dataclasses.dataclass +class ShuffleProperties: + elementsize: int + + +@dataclasses.dataclass +class ZlibProperties: + level: int + + +class CFCodec(TypedDict): + target_dtype: np.dtype + codec: Codec + + +def _filter_to_codec( + filter_id: str, filter_properties: Union[int, None, Tuple] = None +) -> Codec: + """ + Convert an h5py filter to an equivalent numcodec + + Parameters + ---------- + filter_id: str + An h5py filter id code. + filter_properties : int or None or Tuple + A single or Tuple of h5py filter configuration codes. + + Returns + ------- + A numcodec codec + """ + id_int = None + id_str = None + try: + id_int = int(filter_id) + except ValueError: + id_str = filter_id + conf = {} + if id_str: + if id_str in _non_standard_filters.keys(): + id = _non_standard_filters[id_str] + else: + id = id_str + if id == "zlib": + zlib_props = ZlibProperties(level=filter_properties) # type: ignore + conf = dataclasses.asdict(zlib_props) + if id == "shuffle" and isinstance(filter_properties, tuple): + shuffle_props = ShuffleProperties(elementsize=filter_properties[0]) + conf = dataclasses.asdict(shuffle_props) + conf["id"] = id # type: ignore[assignment] + if id_int: + filter = hdf5plugin.get_filters(id_int)[0] + id = filter.filter_name + if id in _hdf5plugin_imagecodecs.keys(): + id = _hdf5plugin_imagecodecs[id] + if id == "blosc" and isinstance(filter_properties, tuple): + blosc_fields = [field.name for field in dataclasses.fields(BloscProperties)] + blosc_props = BloscProperties( + **{k: v for k, v in zip(blosc_fields, filter_properties[-4:])} + ) + conf = dataclasses.asdict(blosc_props) + if id == "zstd" and isinstance(filter_properties, tuple): + zstd_props = ZstdProperties(level=filter_properties[0]) + conf = dataclasses.asdict(zstd_props) + conf["id"] = id + codec = registry.get_codec(conf) + return codec + + +def cfcodec_from_dataset(dataset: Dataset) -> Codec | None: + """ + Converts select h5py dataset CF convention attrs to CFCodec + + Parameters + ---------- + dataset: h5py.Dataset + An h5py dataset. + + Returns + ------- + CFCodec + A CFCodec. + """ + attributes = {attr: dataset.attrs[attr] for attr in dataset.attrs} + mapping = {} + if "scale_factor" in attributes: + try: + scale_factor = attributes["scale_factor"][0] + except IndexError: + scale_factor = attributes["scale_factor"] + mapping["scale_factor"] = float(1 / scale_factor) + else: + mapping["scale_factor"] = 1 + if "add_offset" in attributes: + try: + offset = attributes["add_offset"][0] + except IndexError: + offset = attributes["add_offset"] + mapping["add_offset"] = float(offset) + else: + mapping["add_offset"] = 0 + if mapping["scale_factor"] != 1 or mapping["add_offset"] != 0: + float_dtype = _choose_float_dtype(dtype=dataset.dtype, mapping=mapping) + target_dtype = np.dtype(float_dtype) + codec = FixedScaleOffset( + offset=mapping["add_offset"], + scale=mapping["scale_factor"], + dtype=target_dtype, + astype=dataset.dtype, + ) + cfcodec = CFCodec(target_dtype=target_dtype, codec=codec) + return cfcodec + else: + return None + + +def codecs_from_dataset(dataset: Dataset) -> List[Codec]: + """ + Extracts a list of numcodecs from an h5py dataset + + Parameters + ---------- + dataset: h5py.Dataset + An h5py dataset. + + Returns + ------- + list + A list of numcodecs codecs. + """ + codecs = [] + for filter_id, filter_properties in dataset._filters.items(): + codec = _filter_to_codec(filter_id, filter_properties) + codecs.append(codec) + return codecs diff --git a/virtualizarr/readers/hdf/hdf.py b/virtualizarr/readers/hdf/hdf.py new file mode 100644 index 00000000..0eee63d5 --- /dev/null +++ b/virtualizarr/readers/hdf/hdf.py @@ -0,0 +1,364 @@ +import math +from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping, Optional, Union + +import numpy as np +import xarray as xr +from xarray import Index, Variable + +from virtualizarr.manifests import ChunkEntry, ChunkManifest, ManifestArray +from virtualizarr.readers.common import ( + VirtualBackend, + construct_virtual_dataset, + open_loadable_vars_and_indexes, +) +from virtualizarr.readers.hdf.filters import cfcodec_from_dataset, codecs_from_dataset +from virtualizarr.types import ChunkKey +from virtualizarr.utils import _FsspecFSFromFilepath, check_for_collisions, soft_import +from virtualizarr.zarr import ZArray + +if TYPE_CHECKING: + import h5py # type: ignore + from h5py import Dataset, Group # type: ignore + +h5py = soft_import("h5py", "For reading hdf files", strict=False) +if h5py: + Dataset = h5py.Dataset + Group = h5py.Group +else: + Dataset = dict() + Group = dict() + + +class HDFVirtualBackend(VirtualBackend): + @staticmethod + def open_virtual_dataset( + filepath: str, + group: str | None = None, + drop_variables: Iterable[str] | None = None, + loadable_variables: Iterable[str] | None = None, + decode_times: bool | None = None, + indexes: Mapping[str, Index] | None = None, + reader_options: Optional[dict] = None, + ) -> xr.Dataset: + drop_variables, loadable_variables = check_for_collisions( + drop_variables, + loadable_variables, + ) + + virtual_vars = HDFVirtualBackend._virtual_vars_from_hdf( + path=filepath, + group=group, + drop_variables=drop_variables + loadable_variables, + reader_options=reader_options, + ) + + loadable_vars, indexes = open_loadable_vars_and_indexes( + filepath, + loadable_variables=loadable_variables, + reader_options=reader_options, + drop_variables=drop_variables, + indexes=indexes, + group=group, + decode_times=decode_times, + ) + + attrs = HDFVirtualBackend._get_group_attrs( + path=filepath, reader_options=reader_options, group=group + ) + coordinates_attr = attrs.pop("coordinates", "") + coord_names = coordinates_attr.split() + return construct_virtual_dataset( + virtual_vars=virtual_vars, + loadable_vars=loadable_vars, + indexes=indexes, + coord_names=coord_names, + attrs=attrs, + ) + + @staticmethod + def _dataset_chunk_manifest(path: str, dataset: Dataset) -> Optional[ChunkManifest]: + """ + Generate ChunkManifest for HDF5 dataset. + + Parameters + ---------- + path: str + The path of the HDF5 file + dataset : h5py.Dataset + h5py dataset for which to create a ChunkManifest + + Returns + ------- + ChunkManifest + A Virtualizarr ChunkManifest + """ + dsid = dataset.id + if dataset.chunks is None: + if dsid.get_offset() is None: + return None + else: + key_list = [0] * (len(dataset.shape) or 1) + key = ".".join(map(str, key_list)) + chunk_entry = ChunkEntry( + path=path, offset=dsid.get_offset(), length=dsid.get_storage_size() + ) + chunk_key = ChunkKey(key) + chunk_entries = {chunk_key: chunk_entry.dict()} + chunk_manifest = ChunkManifest(entries=chunk_entries) + return chunk_manifest + else: + num_chunks = dsid.get_num_chunks() + if num_chunks == 0: + raise ValueError("The dataset is chunked but contains no chunks") + shape = tuple( + math.ceil(a / b) for a, b in zip(dataset.shape, dataset.chunks) + ) + paths = np.empty(shape, dtype=np.dtypes.StringDType) # type: ignore + offsets = np.empty(shape, dtype=np.uint64) + lengths = np.empty(shape, dtype=np.uint64) + + def get_key(blob): + return tuple( + [a // b for a, b in zip(blob.chunk_offset, dataset.chunks)] + ) + + def add_chunk_info(blob): + key = get_key(blob) + paths[key] = path + offsets[key] = blob.byte_offset + lengths[key] = blob.size + + has_chunk_iter = callable(getattr(dsid, "chunk_iter", None)) + if has_chunk_iter: + dsid.chunk_iter(add_chunk_info) + else: + for index in range(num_chunks): + add_chunk_info(dsid.get_chunk_info(index)) + + chunk_manifest = ChunkManifest.from_arrays( + paths=paths, # type: ignore + offsets=offsets, + lengths=lengths, + ) + return chunk_manifest + + @staticmethod + def _dataset_dims(dataset: Dataset) -> Union[List[str], List[None]]: + """ + Get a list of dimension scale names attached to input HDF5 dataset. + + This is required by the xarray package to work with Zarr arrays. Only + one dimension scale per dataset dimension is allowed. If dataset is + dimension scale, it will be considered as the dimension to itself. + + Parameters + ---------- + dataset : h5py.Dataset + An h5py dataset. + + Returns + ------- + list + List with HDF5 path names of dimension scales attached to input + dataset. + """ + dims = list() + rank = len(dataset.shape) + if rank: + for n in range(rank): + num_scales = len(dataset.dims[n]) + if num_scales == 1: + dims.append(dataset.dims[n][0].name[1:]) + elif h5py.h5ds.is_scale(dataset.id): + dims.append(dataset.name[1:]) + elif num_scales > 1: + raise ValueError( + f"{dataset.name}: {len(dataset.dims[n])} " + f"dimension scales attached to dimension #{n}" + ) + elif num_scales == 0: + # Some HDF5 files do not have dimension scales. + # If this is the case, `num_scales` will be 0. + # In this case, we mimic netCDF4 and assign phony dimension names. + # See https://github.com/fsspec/kerchunk/issues/41 + dims.append(f"phony_dim_{n}") + return dims + + @staticmethod + def _extract_attrs(h5obj: Union[Dataset, Group]): + """ + Extract attributes from an HDF5 group or dataset. + + Parameters + ---------- + h5obj : h5py.Group or h5py.Dataset + An h5py group or dataset. + """ + _HIDDEN_ATTRS = { + "REFERENCE_LIST", + "CLASS", + "DIMENSION_LIST", + "NAME", + "_Netcdf4Dimid", + "_Netcdf4Coordinates", + "_nc3_strict", + "_NCProperties", + } + attrs = {} + for n, v in h5obj.attrs.items(): + if n in _HIDDEN_ATTRS: + continue + # Fix some attribute values to avoid JSON encoding exceptions... + if isinstance(v, bytes): + v = v.decode("utf-8") or " " + elif isinstance(v, (np.ndarray, np.number, np.bool_)): + if v.dtype.kind == "S": + v = v.astype(str) + if n == "_FillValue": + continue + elif v.size == 1: + v = v.flatten()[0] + if isinstance(v, (np.ndarray, np.number, np.bool_)): + v = v.tolist() + else: + v = v.tolist() + elif isinstance(v, h5py._hl.base.Empty): + v = "" + if v == "DIMENSION_SCALE": + continue + + attrs[n] = v + return attrs + + @staticmethod + def _dataset_to_variable(path: str, dataset: Dataset) -> Optional[Variable]: + """ + Extract an xarray Variable with ManifestArray data from an h5py dataset + + Parameters + ---------- + dataset : h5py.Dataset + An h5py dataset. + + Returns + ------- + list: xarray.Variable + A list of xarray variables. + + """ + # This chunk determination logic mirrors zarr-python's create + # https://github.com/zarr-developers/zarr-python/blob/main/zarr/creation.py#L62-L66 + + chunks = dataset.chunks if dataset.chunks else dataset.shape + codecs = codecs_from_dataset(dataset) + cfcodec = cfcodec_from_dataset(dataset) + attrs = HDFVirtualBackend._extract_attrs(dataset) + if cfcodec: + codecs.insert(0, cfcodec["codec"]) + dtype = cfcodec["target_dtype"] + attrs.pop("scale_factor", None) + attrs.pop("add_offset", None) + fill_value = cfcodec["codec"].decode(dataset.fillvalue) + else: + dtype = dataset.dtype + fill_value = dataset.fillvalue + if isinstance(fill_value, np.ndarray): + fill_value = fill_value[0] + if np.isnan(fill_value): + fill_value = float("nan") + if isinstance(fill_value, np.generic): + fill_value = fill_value.item() + filters = [codec.get_config() for codec in codecs] + zarray = ZArray( + chunks=chunks, + compressor=None, + dtype=dtype, + fill_value=fill_value, + filters=filters, + order="C", + shape=dataset.shape, + zarr_format=2, + ) + dims = HDFVirtualBackend._dataset_dims(dataset) + manifest = HDFVirtualBackend._dataset_chunk_manifest(path, dataset) + if manifest: + marray = ManifestArray(zarray=zarray, chunkmanifest=manifest) + variable = Variable(data=marray, dims=dims, attrs=attrs) + else: + variable = Variable(data=np.empty(dataset.shape), dims=dims, attrs=attrs) + return variable + + @staticmethod + def _virtual_vars_from_hdf( + path: str, + group: Optional[str] = None, + drop_variables: Optional[List[str]] = None, + reader_options: Optional[dict] = { + "storage_options": {"key": "", "secret": "", "anon": True} + }, + ) -> Dict[str, Variable]: + """ + Extract xarray Variables with ManifestArray data from an HDF file or group + + Parameters + ---------- + path: str + The path of the hdf5 file. + group: str + The name of the group for which to extract variables. + drop_variables: list of str + A list of variable names to skip extracting. + reader_options: dict + A dictionary of reader options passed to fsspec when opening the + file. + + Returns + ------- + dict + A dictionary of Xarray Variables with the variable names as keys. + + """ + if drop_variables is None: + drop_variables = [] + open_file = _FsspecFSFromFilepath( + filepath=path, reader_options=reader_options + ).open_file() + f = h5py.File(open_file, mode="r") + if group: + g = f[group] + if not isinstance(g, h5py.Group): + raise ValueError("The provided group is not an HDF group") + else: + g = f + variables = {} + for key in g.keys(): + if key not in drop_variables: + if isinstance(g[key], Dataset): + variable = HDFVirtualBackend._dataset_to_variable(path, g[key]) + if variable is not None: + variables[key] = variable + else: + raise NotImplementedError("Nested groups are not yet supported") + + return variables + + @staticmethod + def _get_group_attrs( + path: str, + group: Optional[str] = None, + reader_options: Optional[dict] = { + "storage_options": {"key": "", "secret": "", "anon": True} + }, + ): + open_file = _FsspecFSFromFilepath( + filepath=path, reader_options=reader_options + ).open_file() + f = h5py.File(open_file, mode="r") + if group: + g = f[group] + if not isinstance(g, h5py.Group): + raise ValueError("The provided group is not an HDF group") + else: + g = f + attrs = HDFVirtualBackend._extract_attrs(g) + return attrs diff --git a/virtualizarr/tests/__init__.py b/virtualizarr/tests/__init__.py index 70f613ce..aee82542 100644 --- a/virtualizarr/tests/__init__.py +++ b/virtualizarr/tests/__init__.py @@ -37,6 +37,8 @@ def _importorskip( has_s3fs, requires_s3fs = _importorskip("s3fs") has_scipy, requires_scipy = _importorskip("scipy") has_tifffile, requires_tifffile = _importorskip("tifffile") +has_imagecodecs, requires_imagecodecs = _importorskip("imagecodecs") +has_hdf5plugin, requires_hdf5plugin = _importorskip("hdf5plugin") def create_manifestarray( diff --git a/virtualizarr/tests/test_backend.py b/virtualizarr/tests/test_backend.py index b1ddeee4..414ba807 100644 --- a/virtualizarr/tests/test_backend.py +++ b/virtualizarr/tests/test_backend.py @@ -11,6 +11,8 @@ from virtualizarr import open_virtual_dataset from virtualizarr.backend import FileType, automatically_determine_filetype from virtualizarr.manifests import ManifestArray +from virtualizarr.readers import HDF5VirtualBackend +from virtualizarr.readers.hdf import HDFVirtualBackend from virtualizarr.tests import ( has_astropy, network, @@ -81,14 +83,15 @@ def test_FileType(): @requires_kerchunk +@pytest.mark.parametrize("hdf_backend", [HDF5VirtualBackend, HDFVirtualBackend]) class TestOpenVirtualDatasetIndexes: - def test_no_indexes(self, netcdf4_file): - vds = open_virtual_dataset(netcdf4_file, indexes={}) + def test_no_indexes(self, netcdf4_file, hdf_backend): + vds = open_virtual_dataset(netcdf4_file, indexes={}, backend=hdf_backend) assert vds.indexes == {} - def test_create_default_indexes(self, netcdf4_file): + def test_create_default_indexes(self, netcdf4_file, hdf_backend): with pytest.warns(UserWarning, match="will create in-memory pandas indexes"): - vds = open_virtual_dataset(netcdf4_file, indexes=None) + vds = open_virtual_dataset(netcdf4_file, indexes=None, backend=hdf_backend) ds = open_dataset(netcdf4_file, decode_times=True) # TODO use xr.testing.assert_identical(vds.indexes, ds.indexes) instead once class supported by assertion comparison, see https://github.com/pydata/xarray/issues/5812 @@ -112,7 +115,8 @@ def index_mappings_equal(indexes1: Mapping[str, Index], indexes2: Mapping[str, I @requires_kerchunk -def test_cftime_index(tmpdir): +@pytest.mark.parametrize("hdf_backend", [HDF5VirtualBackend, HDFVirtualBackend]) +def test_cftime_index(tmpdir, hdf_backend): """Ensure a virtual dataset contains the same indexes as an Xarray dataset""" # Note: Test was created to debug: https://github.com/zarr-developers/VirtualiZarr/issues/168 ds = xr.Dataset( @@ -128,7 +132,10 @@ def test_cftime_index(tmpdir): ) ds.to_netcdf(f"{tmpdir}/tmp.nc") vds = open_virtual_dataset( - f"{tmpdir}/tmp.nc", loadable_variables=["time", "lat", "lon"], indexes={} + f"{tmpdir}/tmp.nc", + loadable_variables=["time", "lat", "lon"], + indexes={}, + backend=hdf_backend, ) # TODO use xr.testing.assert_identical(vds.indexes, ds.indexes) instead once class supported by assertion comparison, see https://github.com/pydata/xarray/issues/5812 assert index_mappings_equal(vds.xindexes, ds.xindexes) @@ -138,15 +145,16 @@ def test_cftime_index(tmpdir): @requires_kerchunk +@pytest.mark.parametrize("hdf_backend", [HDF5VirtualBackend, HDFVirtualBackend]) class TestOpenVirtualDatasetAttrs: - def test_drop_array_dimensions(self, netcdf4_file): + def test_drop_array_dimensions(self, netcdf4_file, hdf_backend): # regression test for GH issue #150 - vds = open_virtual_dataset(netcdf4_file, indexes={}) + vds = open_virtual_dataset(netcdf4_file, indexes={}, backend=hdf_backend) assert "_ARRAY_DIMENSIONS" not in vds["air"].attrs - def test_coordinate_variable_attrs_preserved(self, netcdf4_file): + def test_coordinate_variable_attrs_preserved(self, netcdf4_file, hdf_backend): # regression test for GH issue #155 - vds = open_virtual_dataset(netcdf4_file, indexes={}) + vds = open_virtual_dataset(netcdf4_file, indexes={}, backend=hdf_backend) assert vds["lat"].attrs == { "standard_name": "latitude", "long_name": "Latitude", @@ -180,21 +188,19 @@ def test_var_attr_coords(self, netcdf4_file_with_2d_coords): @network @requires_s3fs class TestReadFromS3: - @pytest.mark.parametrize( - "filetype", ["netcdf4", None], ids=["netcdf4 filetype", "None filetype"] - ) @pytest.mark.parametrize( "indexes", [None, {}], ids=["None index", "empty dict index"] ) - def test_anon_read_s3(self, filetype, indexes): + @pytest.mark.parametrize("hdf_backend", [HDF5VirtualBackend, HDFVirtualBackend]) + def test_anon_read_s3(self, indexes, hdf_backend): """Parameterized tests for empty vs supplied indexes and filetypes.""" # TODO: Switch away from this s3 url after minIO is implemented. fpath = "s3://carbonplan-share/virtualizarr/local.nc" vds = open_virtual_dataset( fpath, - filetype=filetype, indexes=indexes, reader_options={"storage_options": {"anon": True}}, + backend=hdf_backend, ) assert vds.dims == {"time": 2920, "lat": 25, "lon": 53} @@ -203,6 +209,7 @@ def test_anon_read_s3(self, filetype, indexes): @network +@pytest.mark.parametrize("hdf_backend", [HDF5VirtualBackend, HDFVirtualBackend]) class TestReadFromURL: @pytest.mark.parametrize( "filetype, url", @@ -247,10 +254,14 @@ class TestReadFromURL: ), ], ) - def test_read_from_url(self, filetype, url): + def test_read_from_url(self, hdf_backend, filetype, url): if filetype in ["grib", "jpg", "hdf4"]: with pytest.raises(NotImplementedError): - vds = open_virtual_dataset(url, reader_options={}, indexes={}) + vds = open_virtual_dataset( + url, + reader_options={}, + indexes={}, + ) elif filetype == "hdf5": vds = open_virtual_dataset( url, @@ -258,13 +269,14 @@ def test_read_from_url(self, filetype, url): drop_variables=["listOfCovarianceTerms", "listOfPolarizations"], indexes={}, reader_options={}, + backend=hdf_backend, ) assert isinstance(vds, xr.Dataset) else: vds = open_virtual_dataset(url, indexes={}) assert isinstance(vds, xr.Dataset) - def test_virtualizarr_vs_local_nisar(self): + def test_virtualizarr_vs_local_nisar(self, hdf_backend): import fsspec # Open group directly from locally cached file with xarray @@ -287,6 +299,7 @@ def test_virtualizarr_vs_local_nisar(self): group=hdf_group, indexes={}, drop_variables=["listOfCovarianceTerms", "listOfPolarizations"], + backend=hdf_backend, ) tmpref = "/tmp/cmip6.json" vds.virtualize.to_kerchunk(tmpref, format="json") @@ -298,10 +311,14 @@ def test_virtualizarr_vs_local_nisar(self): @requires_kerchunk class TestLoadVirtualDataset: - def test_loadable_variables(self, netcdf4_file): + @pytest.mark.parametrize("hdf_backend", [HDF5VirtualBackend, HDFVirtualBackend]) + def test_loadable_variables(self, netcdf4_file, hdf_backend): vars_to_load = ["air", "time"] vds = open_virtual_dataset( - netcdf4_file, loadable_variables=vars_to_load, indexes={} + netcdf4_file, + loadable_variables=vars_to_load, + indexes={}, + backend=hdf_backend, ) for name in vds.variables: @@ -323,11 +340,26 @@ def test_explicit_filetype(self, netcdf4_file): with pytest.raises(NotImplementedError): open_virtual_dataset(netcdf4_file, filetype="grib") - def test_group_kwarg(self, hdf5_groups_file): - with pytest.raises(ValueError, match="Multiple HDF Groups found"): - open_virtual_dataset(hdf5_groups_file) - with pytest.raises(ValueError, match="not found in"): - open_virtual_dataset(hdf5_groups_file, group="doesnt_exist") + def test_explicit_filetype_and_backend(self, netcdf4_file): + with pytest.raises(ValueError): + open_virtual_dataset( + netcdf4_file, filetype="hdf", backend=HDFVirtualBackend + ) + + @pytest.mark.parametrize("hdf_backend", [HDF5VirtualBackend, HDFVirtualBackend]) + def test_group_kwarg(self, hdf5_groups_file, hdf_backend): + if hdf_backend == HDFVirtualBackend: + with pytest.raises(NotImplementedError, match="Nested groups"): + open_virtual_dataset(hdf5_groups_file, backend=hdf_backend) + with pytest.raises(KeyError, match="doesn't exist"): + open_virtual_dataset( + hdf5_groups_file, group="doesnt_exist", backend=hdf_backend + ) + if hdf_backend == HDF5VirtualBackend: + with pytest.raises(ValueError, match="Multiple HDF Groups found"): + open_virtual_dataset(hdf5_groups_file) + with pytest.raises(ValueError, match="not found in"): + open_virtual_dataset(hdf5_groups_file, group="doesnt_exist") vars_to_load = ["air", "time"] vds = open_virtual_dataset( @@ -335,6 +367,7 @@ def test_group_kwarg(self, hdf5_groups_file): group="test/group", loadable_variables=vars_to_load, indexes={}, + backend=hdf_backend, ) full_ds = xr.open_dataset( hdf5_groups_file, @@ -359,13 +392,15 @@ def test_open_virtual_dataset_passes_expected_args( } mock_read_kerchunk.assert_called_once_with(**args) - def test_open_dataset_with_empty(self, hdf5_empty, tmpdir): - vds = open_virtual_dataset(hdf5_empty) + @pytest.mark.parametrize("hdf_backend", [HDF5VirtualBackend, HDFVirtualBackend]) + def test_open_dataset_with_empty(self, hdf5_empty, tmpdir, hdf_backend): + vds = open_virtual_dataset(hdf5_empty, backend=hdf_backend) assert vds.empty.dims == () assert vds.empty.attrs == {"empty": "true"} - def test_open_dataset_with_scalar(self, hdf5_scalar, tmpdir): - vds = open_virtual_dataset(hdf5_scalar) + @pytest.mark.parametrize("hdf_backend", [HDF5VirtualBackend, HDFVirtualBackend]) + def test_open_dataset_with_scalar(self, hdf5_scalar, tmpdir, hdf_backend): + vds = open_virtual_dataset(hdf5_scalar, backend=hdf_backend) assert vds.scalar.dims == () assert vds.scalar.attrs == {"scalar": "true"} diff --git a/virtualizarr/tests/test_integration.py b/virtualizarr/tests/test_integration.py index 09d0c0a8..11b5cc20 100644 --- a/virtualizarr/tests/test_integration.py +++ b/virtualizarr/tests/test_integration.py @@ -5,6 +5,8 @@ from virtualizarr import open_virtual_dataset from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.readers import HDF5VirtualBackend +from virtualizarr.readers.hdf import HDFVirtualBackend from virtualizarr.tests import requires_kerchunk from virtualizarr.translators.kerchunk import ( dataset_from_kerchunk_refs, @@ -63,8 +65,9 @@ def test_no_duplicates_find_var_names(): ), ], ) +@pytest.mark.parametrize("hdf_backend", [HDF5VirtualBackend, HDFVirtualBackend]) def test_numpy_arrays_to_inlined_kerchunk_refs( - netcdf4_file, inline_threshold, vars_to_inline + netcdf4_file, inline_threshold, vars_to_inline, hdf_backend ): from kerchunk.hdf import SingleHdf5ToZarr @@ -75,7 +78,7 @@ def test_numpy_arrays_to_inlined_kerchunk_refs( # loading the variables should produce same result as inlining them using kerchunk vds = open_virtual_dataset( - netcdf4_file, loadable_variables=vars_to_inline, indexes={} + netcdf4_file, loadable_variables=vars_to_inline, indexes={}, backend=hdf_backend ) refs = vds.virtualize.to_kerchunk(format="dict") @@ -90,7 +93,8 @@ def test_numpy_arrays_to_inlined_kerchunk_refs( @requires_kerchunk @pytest.mark.parametrize("format", ["dict", "json", "parquet"]) class TestKerchunkRoundtrip: - def test_kerchunk_roundtrip_no_concat(self, tmpdir, format): + @pytest.mark.parametrize("hdf_backend", [HDF5VirtualBackend, HDFVirtualBackend]) + def test_kerchunk_roundtrip_no_concat(self, tmpdir, format, hdf_backend): # set up example xarray dataset ds = xr.tutorial.open_dataset("air_temperature", decode_times=False) @@ -98,7 +102,7 @@ def test_kerchunk_roundtrip_no_concat(self, tmpdir, format): ds.to_netcdf(f"{tmpdir}/air.nc") # use open_dataset_via_kerchunk to read it as references - vds = open_virtual_dataset(f"{tmpdir}/air.nc", indexes={}) + vds = open_virtual_dataset(f"{tmpdir}/air.nc", indexes={}, backend=hdf_backend) if format == "dict": # write those references to an in-memory kerchunk-formatted references dictionary @@ -115,11 +119,18 @@ def test_kerchunk_roundtrip_no_concat(self, tmpdir, format): f"{tmpdir}/refs.{format}", engine="kerchunk", decode_times=False ) - # assert identical to original dataset - xrt.assert_identical(roundtrip, ds) + # assert all_close to original dataset + xrt.assert_allclose(roundtrip, ds) + # assert coordinate attributes are maintained + for coord in ds.coords: + assert ds.coords[coord].attrs == roundtrip.coords[coord].attrs + + @pytest.mark.parametrize("hdf_backend", [HDF5VirtualBackend, HDFVirtualBackend]) @pytest.mark.parametrize("decode_times,time_vars", [(False, []), (True, ["time"])]) - def test_kerchunk_roundtrip_concat(self, tmpdir, format, decode_times, time_vars): + def test_kerchunk_roundtrip_concat( + self, tmpdir, format, hdf_backend, decode_times, time_vars + ): # set up example xarray dataset ds = xr.tutorial.open_dataset("air_temperature", decode_times=decode_times) @@ -135,11 +146,13 @@ def test_kerchunk_roundtrip_concat(self, tmpdir, format, decode_times, time_vars f"{tmpdir}/air1.nc", indexes={}, loadable_variables=time_vars, + backend=hdf_backend, ) vds2 = open_virtual_dataset( f"{tmpdir}/air2.nc", indexes={}, loadable_variables=time_vars, + backend=hdf_backend, ) if decode_times is False: @@ -168,9 +181,14 @@ def test_kerchunk_roundtrip_concat(self, tmpdir, format, decode_times, time_vars roundtrip = xr.open_dataset( f"{tmpdir}/refs.{format}", engine="kerchunk", decode_times=decode_times ) + if decode_times is False: - # assert identical to original dataset - xrt.assert_identical(roundtrip, ds) + # assert all_close to original dataset + xrt.assert_allclose(roundtrip, ds) + + # assert coordinate attributes are maintained + for coord in ds.coords: + assert ds.coords[coord].attrs == roundtrip.coords[coord].attrs else: # they are very very close! But assert_allclose doesn't seem to work on datetimes assert (roundtrip.time - ds.time).sum() == 0 @@ -178,17 +196,22 @@ def test_kerchunk_roundtrip_concat(self, tmpdir, format, decode_times, time_vars assert roundtrip.time.encoding["units"] == ds.time.encoding["units"] assert roundtrip.time.encoding["calendar"] == ds.time.encoding["calendar"] - def test_non_dimension_coordinates(self, tmpdir, format): + @pytest.mark.parametrize("hdf_backend", [HDF5VirtualBackend, HDFVirtualBackend]) + def test_non_dimension_coordinates(self, tmpdir, format, hdf_backend): # regression test for GH issue #105 + if hdf_backend: + pytest.xfail("To fix coordinate behavior with HDF reader") + # set up example xarray dataset containing non-dimension coordinate variables ds = xr.Dataset(coords={"lat": (["x", "y"], np.arange(6.0).reshape(2, 3))}) # save it to disk as netCDF (in temporary directory) ds.to_netcdf(f"{tmpdir}/non_dim_coords.nc") - vds = open_virtual_dataset(f"{tmpdir}/non_dim_coords.nc", indexes={}) - + vds = open_virtual_dataset( + f"{tmpdir}/non_dim_coords.nc", indexes={}, backend=hdf_backend + ) assert "lat" in vds.coords assert "coordinates" not in vds.attrs @@ -208,7 +231,11 @@ def test_non_dimension_coordinates(self, tmpdir, format): ) # assert equal to original dataset - xrt.assert_identical(roundtrip, ds) + xrt.assert_allclose(roundtrip, ds) + + # assert coordinate attributes are maintained + for coord in ds.coords: + assert ds.coords[coord].attrs == roundtrip.coords[coord].attrs def test_datetime64_dtype_fill_value(self, tmpdir, format): chunks_dict = { @@ -256,11 +283,12 @@ def test_datetime64_dtype_fill_value(self, tmpdir, format): @requires_kerchunk -def test_open_scalar_variable(tmpdir): +@pytest.mark.parametrize("hdf_backend", [HDF5VirtualBackend, HDFVirtualBackend]) +def test_open_scalar_variable(tmpdir, hdf_backend): # regression test for GH issue #100 ds = xr.Dataset(data_vars={"a": 0}) ds.to_netcdf(f"{tmpdir}/scalar.nc") - vds = open_virtual_dataset(f"{tmpdir}/scalar.nc", indexes={}) + vds = open_virtual_dataset(f"{tmpdir}/scalar.nc", indexes={}, backend=hdf_backend) assert vds["a"].shape == () diff --git a/virtualizarr/tests/test_readers/conftest.py b/virtualizarr/tests/test_readers/conftest.py new file mode 100644 index 00000000..f96447db --- /dev/null +++ b/virtualizarr/tests/test_readers/conftest.py @@ -0,0 +1,324 @@ +import warnings + +import h5py # type: ignore +import numpy as np +import pytest +import xarray as xr +from packaging.version import Version +from xarray.tests.test_dataset import create_test_data +from xarray.util.print_versions import netcdf_and_hdf5_versions + +try: + import hdf5plugin # type: ignore +except ModuleNotFoundError: + hdf5plugin = None # type: ignore + warnings.warn("hdf5plugin is required for HDF reader") + + +@pytest.fixture +def empty_chunks_hdf5_file(tmpdir): + ds = xr.Dataset({"data": []}) + filepath = f"{tmpdir}/empty_chunks.nc" + ds.to_netcdf(filepath, engine="h5netcdf") + return filepath + + +@pytest.fixture +def empty_dataset_hdf5_file(tmpdir): + filepath = f"{tmpdir}/empty_dataset.nc" + f = h5py.File(filepath, "w") + f.create_dataset("data", shape=(0,), dtype="f") + return filepath + + +@pytest.fixture +def no_chunks_hdf5_file(tmpdir): + filepath = f"{tmpdir}/no_chunks.nc" + f = h5py.File(filepath, "w") + data = np.random.random((10, 10)) + f.create_dataset(name="data", data=data, chunks=None) + return filepath + + +@pytest.fixture +def chunked_hdf5_file(tmpdir): + filepath = f"{tmpdir}/chunks.nc" + f = h5py.File(filepath, "w") + data = np.random.random((100, 100)) + f.create_dataset(name="data", data=data, chunks=(50, 50)) + return filepath + + +@pytest.fixture +def single_dimension_scale_hdf5_file(tmpdir): + filepath = f"{tmpdir}/single_dimension_scale.nc" + f = h5py.File(filepath, "w") + data = [1, 2] + x = [0, 1] + f.create_dataset(name="data", data=data) + f.create_dataset(name="x", data=x) + f["x"].make_scale() + f["data"].dims[0].attach_scale(f["x"]) + return filepath + + +@pytest.fixture +def is_scale_hdf5_file(tmpdir): + filepath = f"{tmpdir}/is_scale.nc" + f = h5py.File(filepath, "w") + data = [1, 2] + f.create_dataset(name="data", data=data) + f["data"].make_scale() + return filepath + + +@pytest.fixture +def multiple_dimension_scales_hdf5_file(tmpdir): + filepath = f"{tmpdir}/multiple_dimension_scales.nc" + f = h5py.File(filepath, "w") + data = [1, 2] + f.create_dataset(name="data", data=data) + f.create_dataset(name="x", data=[0, 1]) + f.create_dataset(name="y", data=[0, 1]) + f["x"].make_scale() + f["y"].make_scale() + f["data"].dims[0].attach_scale(f["x"]) + f["data"].dims[0].attach_scale(f["y"]) + return filepath + + +@pytest.fixture +def chunked_dimensions_netcdf4_file(tmpdir): + filepath = f"{tmpdir}/chunks_dimension.nc" + f = h5py.File(filepath, "w") + data = np.random.random((100, 100)) + x = np.random.random((100)) + y = np.random.random((100)) + f.create_dataset(name="data", data=data, chunks=(50, 50)) + f.create_dataset(name="x", data=x) + f.create_dataset(name="y", data=y) + f["data"].dims[0].attach_scale(f["x"]) + f["data"].dims[1].attach_scale(f["y"]) + return filepath + + +@pytest.fixture +def string_attributes_hdf5_file(tmpdir): + filepath = f"{tmpdir}/attributes.nc" + f = h5py.File(filepath, "w") + data = np.random.random((10, 10)) + f.create_dataset(name="data", data=data, chunks=None) + f["data"].attrs["attribute_name"] = "attribute_name" + f["data"].attrs["attribute_name2"] = "attribute_name2" + return filepath + + +@pytest.fixture +def root_attributes_hdf5_file(tmpdir): + filepath = f"{tmpdir}/root_attributes.nc" + f = h5py.File(filepath, "w") + f.attrs["attribute_name"] = "attribute_name" + return filepath + + +@pytest.fixture +def group_hdf5_file(tmpdir): + filepath = f"{tmpdir}/group.nc" + f = h5py.File(filepath, "w") + g = f.create_group("group") + data = np.random.random((10, 10)) + g.create_dataset("data", data=data) + return filepath + + +@pytest.fixture +def nested_group_hdf5_file(tmpdir): + filepath = f"{tmpdir}/nested_group.nc" + f = h5py.File(filepath, "w") + g = f.create_group("group") + data = np.random.random((10, 10)) + g.create_dataset("data", data=data) + g.create_group("nested_group") + return filepath + + +@pytest.fixture +def multiple_datasets_hdf5_file(tmpdir): + filepath = f"{tmpdir}/multiple_datasets.nc" + f = h5py.File(filepath, "w") + data = np.random.random((10, 10)) + f.create_dataset(name="data", data=data, chunks=None) + f.create_dataset(name="data2", data=data, chunks=None) + return filepath + + +@pytest.fixture +def np_uncompressed(): + return np.arange(100) + + +@pytest.fixture(params=["gzip", "blosc_lz4", "lz4", "bzip2", "zstd", "shuffle"]) +def filter_encoded_hdf5_file(tmpdir, np_uncompressed, request): + filepath = f"{tmpdir}/{request.param}.nc" + f = h5py.File(filepath, "w") + if request.param == "gzip": + f.create_dataset( + name="data", data=np_uncompressed, compression="gzip", compression_opts=1 + ) + if request.param == "blosc_lz4": + f.create_dataset( + name="data", + data=np_uncompressed, + **hdf5plugin.Blosc(cname="lz4", clevel=9, shuffle=hdf5plugin.Blosc.SHUFFLE), + ) + if request.param == "lz4": + f.create_dataset(name="data", data=np_uncompressed, **hdf5plugin.LZ4(nbytes=0)) + if request.param == "bzip2": + f.create_dataset(name="data", data=np_uncompressed, **hdf5plugin.BZip2()) + if request.param == "zstd": + f.create_dataset(name="data", data=np_uncompressed, **hdf5plugin.Zstd(clevel=2)) + if request.param == "shuffle": + f.create_dataset(name="data", data=np_uncompressed, shuffle=True) + + return filepath + + +@pytest.fixture(params=["gzip"]) +def filter_encoded_roundtrip_hdf5_file(tmpdir, request): + ds = xr.tutorial.open_dataset("air_temperature") + encoding = {} + if request.param == "gzip": + encoding_config = {"zlib": True, "complevel": 1} + + for var_name in ds.variables: + encoding[var_name] = encoding_config + + filepath = f"{tmpdir}/{request.param}_xarray.nc" + ds.to_netcdf(filepath, engine="h5netcdf", encoding=encoding) + return filepath + + +@pytest.fixture() +def skip_test_for_libhdf5_version(): + versions = netcdf_and_hdf5_versions() + libhdf5_version = Version(versions[0][1]) + return libhdf5_version < Version("1.14") + + +@pytest.fixture(params=["blosc_zlib", ""]) +def filter_encoded_roundtrip_netcdf4_file( + tmpdir, request, skip_test_for_libhdf5_version +): + if skip_test_for_libhdf5_version: + pytest.skip("Requires libhdf5 >= 1.14") + ds = create_test_data(dim_sizes=(20, 80, 10)) + encoding_config = { + "chunksizes": (20, 40), + "original_shape": ds.var2.shape, + "blosc_shuffle": 1, + "fletcher32": False, + } + if "blosc" in request.param: + encoding_config["compression"] = request.param + # Check on how handle scalar dim. + ds = ds.drop_dims("dim3") + ds["var2"].encoding.update(encoding_config) + filepath = f"{tmpdir}/{request.param}_xarray.nc" + ds.to_netcdf(filepath, engine="netcdf4") + return {"filepath": filepath, "compressor": request.param} + + +@pytest.fixture +def np_uncompressed_int16(): + return np.arange(100, dtype=np.int16) + + +@pytest.fixture +def offset(): + return np.float32(5.0) + + +@pytest.fixture +def add_offset_hdf5_file(tmpdir, np_uncompressed_int16, offset): + filepath = f"{tmpdir}/offset.nc" + f = h5py.File(filepath, "w") + data = np_uncompressed_int16 - offset + f.create_dataset(name="data", data=data, chunks=True) + f["data"].attrs.create(name="add_offset", data=offset) + return filepath + + +@pytest.fixture +def scale_factor(): + return 0.01 + + +@pytest.fixture +def scale_add_offset_hdf5_file(tmpdir, np_uncompressed_int16, offset, scale_factor): + filepath = f"{tmpdir}/scale_offset.nc" + f = h5py.File(filepath, "w") + data = (np_uncompressed_int16 - offset) / scale_factor + f.create_dataset(name="data", data=data, chunks=True) + f["data"].attrs.create(name="add_offset", data=offset) + f["data"].attrs.create(name="scale_factor", data=np.array([scale_factor])) + return filepath + + +@pytest.fixture() +def chunked_roundtrip_hdf5_file(tmpdir): + ds = create_test_data(dim_sizes=(20, 80, 10)) + ds = ds.drop_dims("dim3") + filepath = f"{tmpdir}/chunked_xarray.nc" + ds.to_netcdf( + filepath, engine="netcdf4", encoding={"var2": {"chunksizes": (10, 10)}} + ) + return filepath + + +@pytest.fixture(params=["gzip", "zlib"]) +def filter_and_cf_roundtrip_hdf5_file(tmpdir, request): + x = np.arange(100) + y = np.arange(100) + fill_value = np.int16(-9999) + temperature = 0.1 * x[:, None] + 0.1 * y[None, :] + temperature[0][0] = fill_value + ds = xr.Dataset( + {"temperature": (["x", "y"], temperature)}, + coords={"x": np.arange(100), "y": np.arange(100)}, + ) + encoding = { + "temperature": { + "dtype": "int16", + "scale_factor": 0.1, + "add_offset": 273.15, + "_FillValue": fill_value, + }, + "x": {"_FillValue": fill_value}, + "y": {"_FillValue": fill_value}, + } + if request.param == "gzip": + encoding["temperature"]["compression"] = "gzip" + encoding["temperature"]["compression_opts"] = 7 + + if request.param == "zlib": + encoding["temperature"]["zlib"] = True + encoding["temperature"]["complevel"] = 9 + + from random import randint + + filepath = f"{tmpdir}/{request.param}_{randint(0,100)}_cf_roundtrip.nc" + ds.to_netcdf(filepath, engine="h5netcdf", encoding=encoding) + + return filepath + + +@pytest.fixture +def root_coordinates_hdf5_file(tmpdir, np_uncompressed_int16): + filepath = f"{tmpdir}/coordinates.nc" + f = h5py.File(filepath, "w") + data = np.random.random((100, 100)) + f.create_dataset(name="data", data=data, chunks=True) + f.create_dataset(name="lat", data=data) + f.create_dataset(name="lon", data=data) + f.attrs.create(name="coordinates", data="lat lon") + return filepath diff --git a/virtualizarr/tests/test_readers/test_hdf.py b/virtualizarr/tests/test_readers/test_hdf.py new file mode 100644 index 00000000..b75c56b0 --- /dev/null +++ b/virtualizarr/tests/test_readers/test_hdf.py @@ -0,0 +1,180 @@ +from unittest.mock import patch + +import h5py # type: ignore +import pytest + +from virtualizarr.readers.hdf import HDFVirtualBackend +from virtualizarr.tests import ( + requires_hdf5plugin, + requires_imagecodecs, +) + + +@requires_hdf5plugin +@requires_imagecodecs +class TestDatasetChunkManifest: + def test_empty_chunks(self, empty_chunks_hdf5_file): + f = h5py.File(empty_chunks_hdf5_file) + ds = f["data"] + with pytest.raises(ValueError, match="chunked but contains no chunks"): + HDFVirtualBackend._dataset_chunk_manifest( + path=empty_chunks_hdf5_file, dataset=ds + ) + + @pytest.mark.skip("Need to differentiate non coordinate dimensions from empty") + def test_empty_dataset(self, empty_dataset_hdf5_file): + f = h5py.File(empty_dataset_hdf5_file) + ds = f["data"] + with pytest.raises(ValueError, match="no space allocated in the file"): + HDFVirtualBackend._dataset_chunk_manifest( + path=empty_dataset_hdf5_file, dataset=ds + ) + + def test_no_chunking(self, no_chunks_hdf5_file): + f = h5py.File(no_chunks_hdf5_file) + ds = f["data"] + manifest = HDFVirtualBackend._dataset_chunk_manifest( + path=no_chunks_hdf5_file, dataset=ds + ) + assert manifest.shape_chunk_grid == (1, 1) + + def test_chunked(self, chunked_hdf5_file): + f = h5py.File(chunked_hdf5_file) + ds = f["data"] + manifest = HDFVirtualBackend._dataset_chunk_manifest( + path=chunked_hdf5_file, dataset=ds + ) + assert manifest.shape_chunk_grid == (2, 2) + + def test_chunked_roundtrip(self, chunked_roundtrip_hdf5_file): + f = h5py.File(chunked_roundtrip_hdf5_file) + ds = f["var2"] + manifest = HDFVirtualBackend._dataset_chunk_manifest( + path=chunked_roundtrip_hdf5_file, dataset=ds + ) + assert manifest.shape_chunk_grid == (2, 8) + + +@requires_hdf5plugin +@requires_imagecodecs +class TestDatasetDims: + def test_single_dimension_scale(self, single_dimension_scale_hdf5_file): + f = h5py.File(single_dimension_scale_hdf5_file) + ds = f["data"] + dims = HDFVirtualBackend._dataset_dims(ds) + assert dims[0] == "x" + + def test_is_dimension_scale(self, is_scale_hdf5_file): + f = h5py.File(is_scale_hdf5_file) + ds = f["data"] + dims = HDFVirtualBackend._dataset_dims(ds) + assert dims[0] == "data" + + def test_multiple_dimension_scales(self, multiple_dimension_scales_hdf5_file): + f = h5py.File(multiple_dimension_scales_hdf5_file) + ds = f["data"] + with pytest.raises(ValueError, match="dimension scales attached"): + HDFVirtualBackend._dataset_dims(ds) + + def test_no_dimension_scales(self, no_chunks_hdf5_file): + f = h5py.File(no_chunks_hdf5_file) + ds = f["data"] + dims = HDFVirtualBackend._dataset_dims(ds) + assert dims == ["phony_dim_0", "phony_dim_1"] + + +@requires_hdf5plugin +@requires_imagecodecs +class TestDatasetToVariable: + def test_chunked_dataset(self, chunked_dimensions_netcdf4_file): + f = h5py.File(chunked_dimensions_netcdf4_file) + ds = f["data"] + var = HDFVirtualBackend._dataset_to_variable( + chunked_dimensions_netcdf4_file, ds + ) + assert var.chunks == (50, 50) + + def test_not_chunked_dataset(self, single_dimension_scale_hdf5_file): + f = h5py.File(single_dimension_scale_hdf5_file) + ds = f["data"] + var = HDFVirtualBackend._dataset_to_variable( + single_dimension_scale_hdf5_file, ds + ) + assert var.chunks == (2,) + + def test_dataset_attributes(self, string_attributes_hdf5_file): + f = h5py.File(string_attributes_hdf5_file) + ds = f["data"] + var = HDFVirtualBackend._dataset_to_variable(string_attributes_hdf5_file, ds) + assert var.attrs["attribute_name"] == "attribute_name" + + +@requires_hdf5plugin +@requires_imagecodecs +class TestExtractAttributes: + def test_string_attribute(self, string_attributes_hdf5_file): + f = h5py.File(string_attributes_hdf5_file) + ds = f["data"] + attrs = HDFVirtualBackend._extract_attrs(ds) + assert attrs["attribute_name"] == "attribute_name" + + def test_root_attribute(self, root_attributes_hdf5_file): + f = h5py.File(root_attributes_hdf5_file) + attrs = HDFVirtualBackend._extract_attrs(f) + assert attrs["attribute_name"] == "attribute_name" + + def test_multiple_attributes(self, string_attributes_hdf5_file): + f = h5py.File(string_attributes_hdf5_file) + ds = f["data"] + attrs = HDFVirtualBackend._extract_attrs(ds) + assert len(attrs.keys()) == 2 + + +@requires_hdf5plugin +@requires_imagecodecs +class TestVirtualVarsFromHDF: + def test_variable_with_dimensions(self, chunked_dimensions_netcdf4_file): + variables = HDFVirtualBackend._virtual_vars_from_hdf( + chunked_dimensions_netcdf4_file + ) + assert len(variables) == 3 + + def test_nested_groups_not_implemented(self, nested_group_hdf5_file): + with pytest.raises(NotImplementedError): + HDFVirtualBackend._virtual_vars_from_hdf( + path=nested_group_hdf5_file, group="group" + ) + + def test_drop_variables(self, multiple_datasets_hdf5_file): + variables = HDFVirtualBackend._virtual_vars_from_hdf( + path=multiple_datasets_hdf5_file, drop_variables=["data2"] + ) + assert "data2" not in variables.keys() + + def test_dataset_in_group(self, group_hdf5_file): + variables = HDFVirtualBackend._virtual_vars_from_hdf( + path=group_hdf5_file, group="group" + ) + assert len(variables) == 1 + + def test_non_group_error(self, group_hdf5_file): + with pytest.raises(ValueError): + HDFVirtualBackend._virtual_vars_from_hdf( + path=group_hdf5_file, group="group/data" + ) + + +@requires_hdf5plugin +@requires_imagecodecs +class TestOpenVirtualDataset: + @patch("virtualizarr.readers.hdf.hdf.construct_virtual_dataset") + @patch("virtualizarr.readers.hdf.hdf.open_loadable_vars_and_indexes") + def test_coord_names( + self, + open_loadable_vars_and_indexes, + construct_virtual_dataset, + root_coordinates_hdf5_file, + ): + open_loadable_vars_and_indexes.return_value = (0, 0) + HDFVirtualBackend.open_virtual_dataset(root_coordinates_hdf5_file) + assert construct_virtual_dataset.call_args[1]["coord_names"] == ["lat", "lon"] diff --git a/virtualizarr/tests/test_readers/test_hdf_filters.py b/virtualizarr/tests/test_readers/test_hdf_filters.py new file mode 100644 index 00000000..7c0fd666 --- /dev/null +++ b/virtualizarr/tests/test_readers/test_hdf_filters.py @@ -0,0 +1,134 @@ +import warnings + +import h5py # type: ignore +import numcodecs +import numpy as np + +try: + import imagecodecs # noqa +except ModuleNotFoundError: + imagecodecs = None # type: ignore + warnings.warn("imagecodecs is required for HDF reader") + + +from virtualizarr.readers.hdf.filters import ( + _filter_to_codec, + cfcodec_from_dataset, + codecs_from_dataset, +) +from virtualizarr.tests import ( + requires_hdf5plugin, + requires_imagecodecs, +) + + +@requires_hdf5plugin +@requires_imagecodecs +class TestFilterToCodec: + def test_gzip_uses_zlib_numcodec(self): + codec = _filter_to_codec("gzip", 1) + assert isinstance(codec, numcodecs.zlib.Zlib) + + def test_lzf(self): + codec = _filter_to_codec("lzf") + assert isinstance(codec, imagecodecs.numcodecs.Lzf) + + def test_blosc(self): + codec = _filter_to_codec("32001", (2, 2, 8, 800, 9, 2, 1)) + assert isinstance(codec, numcodecs.blosc.Blosc) + expected_config = { + "id": "blosc", + "blocksize": 800, + "clevel": 9, + "shuffle": 2, + "cname": "lz4", + } + assert codec.get_config() == expected_config + + def test_zstd(self): + codec = _filter_to_codec("32015", (5,)) + assert isinstance(codec, numcodecs.zstd.Zstd) + config = codec.get_config() + assert config["id"] == "zstd" + assert config["level"] == 5 + + def test_shuffle(self): + codec = _filter_to_codec("shuffle", (7,)) + assert isinstance(codec, numcodecs.shuffle.Shuffle) + expected_config = {"id": "shuffle", "elementsize": 7} + assert codec.get_config() == expected_config + + +@requires_hdf5plugin +@requires_imagecodecs +class TestCodecsFromDataSet: + def test_numcodec_decoding(self, np_uncompressed, filter_encoded_hdf5_file): + f = h5py.File(filter_encoded_hdf5_file) + ds = f["data"] + chunk_info = ds.id.get_chunk_info(0) + codecs = codecs_from_dataset(ds) + with open(filter_encoded_hdf5_file, "rb") as file: + file.seek(chunk_info.byte_offset) + bytes_read = file.read(chunk_info.size) + decoded = codecs[0].decode(bytes_read) + if isinstance(decoded, np.ndarray): + assert decoded.tobytes() == np_uncompressed.tobytes() + else: + assert decoded == np_uncompressed.tobytes() + + +@requires_hdf5plugin +@requires_imagecodecs +class TestCFCodecFromDataset: + def test_no_cf_convention(self, filter_encoded_hdf5_file): + f = h5py.File(filter_encoded_hdf5_file) + ds = f["data"] + cf_codec = cfcodec_from_dataset(ds) + assert cf_codec is None + + def test_cf_scale_factor(self, netcdf4_file): + f = h5py.File(netcdf4_file) + ds = f["air"] + cf_codec = cfcodec_from_dataset(ds) + assert cf_codec["target_dtype"] == np.dtype(np.float64) + assert cf_codec["codec"].scale == 100.0 + assert cf_codec["codec"].offset == 0 + assert cf_codec["codec"].dtype == " str: from pathlib import Path @@ -284,15 +287,20 @@ def local_to_s3_url(old_local_path: str) -> str: == "s3://bucket/air.nc" ) - def test_invalid_type(self, netcdf4_file): - vds = open_virtual_dataset(netcdf4_file, indexes={}) + def test_invalid_type(self, netcdf4_file, hdf_backend): + vds = open_virtual_dataset(netcdf4_file, indexes={}, backend=hdf_backend) with pytest.raises(TypeError): vds.virtualize.rename_paths(["file1.nc", "file2.nc"]) - def test_mixture_of_manifestarrays_and_numpy_arrays(self, netcdf4_file): + def test_mixture_of_manifestarrays_and_numpy_arrays( + self, netcdf4_file, hdf_backend + ): vds = open_virtual_dataset( - netcdf4_file, indexes={}, loadable_variables=["lat", "lon"] + netcdf4_file, + indexes={}, + loadable_variables=["lat", "lon"], + backend=hdf_backend, ) renamed_vds = vds.virtualize.rename_paths("s3://bucket/air.nc") assert ( diff --git a/virtualizarr/utils.py b/virtualizarr/utils.py index c9260aa6..b5ae3447 100644 --- a/virtualizarr/utils.py +++ b/virtualizarr/utils.py @@ -1,5 +1,6 @@ from __future__ import annotations +import importlib import io from typing import TYPE_CHECKING, Iterable, Optional, Union @@ -86,3 +87,16 @@ def check_for_collisions( raise ValueError(f"Cannot both load and drop variables {common}") return drop_variables, loadable_variables + + +def soft_import(name: str, reason: str, strict: Optional[bool] = True): + try: + return importlib.import_module(name) + except (ImportError, ModuleNotFoundError): + if strict: + raise ImportError( + f"for {reason}, the {name} package is required. " + f"Please install it via pip or conda." + ) + else: + return None