From 1112247a846ea02a784d41d37a2ff5dfb07e30f8 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 23 Jun 2021 21:55:13 -0400 Subject: [PATCH 1/3] rewrite MultiZarrToZarr to always use xr.concat --- fsspec_reference_maker/combine.py | 63 +++++++++++++++++++------------ fsspec_reference_maker/grib2.py | 3 +- fsspec_reference_maker/hdf.py | 2 +- 3 files changed, 41 insertions(+), 27 deletions(-) diff --git a/fsspec_reference_maker/combine.py b/fsspec_reference_maker/combine.py index 0a3e9eab..d530f64a 100644 --- a/fsspec_reference_maker/combine.py +++ b/fsspec_reference_maker/combine.py @@ -8,25 +8,25 @@ import numcodecs import xarray as xr import zarr -logging = logging.getLogger('reference-combine') +logger = logging.getLogger('reference-combine') class MultiZarrToZarr: def __init__(self, path, remote_protocol, - remote_options=None, xarray_kwargs=None, storage_options=None, - with_mf=True): + remote_options=None, xarray_open_kwargs=None, xarray_concat_args=None, + preprocess=None, storage_options=None): """ :param path: a URL containing multiple JSONs :param xarray_kwargs: :param storage_options: """ - xarray_kwargs = xarray_kwargs or {} self.path = path - self.with_mf = with_mf - self.xr_kwargs = xarray_kwargs + self.xr_kwargs = xarray_open_kwargs or {} + self.concat_kwargs = xarray_concat_args or {} self.storage_options = storage_options or {} + self.preprocess = preprocess self.remote_protocol = remote_protocol self.remote_options = remote_options or {} @@ -36,6 +36,7 @@ def translate(self, outpath): self.output = self._consolidate(out) self._write(self.output, outpath) + # TODO: return new zarr dataset? @staticmethod def _write(refs, outpath, filetype=None): @@ -86,7 +87,7 @@ def _write(refs, outpath, filetype=None): compression="ZSTD" ) - def _consolidate(self, mapping, inline_threashold=100, template_count=5): + def _consolidate(self, mapping, inline_threshold=100, template_count=5): counts = Counter(v[0] for v in mapping.values() if isinstance(v, list)) def letter_sets(): @@ -104,7 +105,7 @@ def letter_sets(): out = {} for k, v in mapping.items(): - if isinstance(v, list) and v[2] < inline_threashold: + if isinstance(v, list) and v[2] < inline_threshold: v = self.fs.cat_file(v[0], start=v[1], end=v[1] + v[2]) if isinstance(v, bytes): try: @@ -158,15 +159,17 @@ def _determine_dims(self): self.fs = fss[0].fs mappers = [fs.get_mapper("") for fs in fss] - if self.with_mf is True: - ds = xr.open_mfdataset(mappers, engine="zarr", chunks={}, **self.xr_kwargs) - ds0 = xr.open_mfdataset(mappers[:1], engine="zarr", chunks={}, **self.xr_kwargs) - else: - dss = [xr.open_dataset(m, engine="zarr", chunks={}, **self.xr_kwargs) for m in mappers] - ds = xr.concat(dss, dim=self.with_mf) - ds0 = dss[0] + dss = [xr.open_dataset(m, engine="zarr", chunks={}, **self.xr_kwargs) + for m in mappers] + if self.preprocess: + dss = [self.preprocess(d) for d in dss] + ds = xr.concat(dss, **self.concat_kwargs) + ds0 = dss[0] self.extra_dims = set(ds.dims) - set(ds0.dims) - self.concat_dims = set(k for k, v in ds.dims.items() if k in ds0.dims and v / ds0.dims[k] == len(mappers)) + self.concat_dims = set( + k for k, v in ds.dims.items() + if k in ds0.dims and v / ds0.dims[k] == len(mappers) + ) self.same_dims = set(ds.dims) - self.extra_dims - self.concat_dims return ds, ds0, fss @@ -180,19 +183,29 @@ def drop_coords(ds): ds = ds.drop(['reference_time', 'crs']) return ds.reset_coords(drop=True) + xarray_open_kwargs = { + "decode_cf": False, + "mask_and_scale": False, + "decode_times": False, + "decode_timedelta": False, + "use_cftime": False, + "decode_coords": False + } + concat_kwargs = { + "data_vars": "minimal", + "coords": "minimal", + "compat": "override", + "join": "override", + "combine_attrs": "override", + "dim": "time" + } mzz = MultiZarrToZarr( "zip://*.json::out.zip", remote_protocol="s3", remote_options={'anon': True}, - xarray_kwargs={ - "preprocess": drop_coords, - "decode_cf": False, - "mask_and_scale": False, - "decode_times": False, - "decode_timedelta": False, - "use_cftime": False, - "decode_coords": False - }, + preprocess=drop_coords, + xarray_open_kwargs=xarray_open_kwargs, + xarray_concat_args=concat_kwargs ) mzz.translate("output.zarr") diff --git a/fsspec_reference_maker/grib2.py b/fsspec_reference_maker/grib2.py index a8e73b80..344cccca 100644 --- a/fsspec_reference_maker/grib2.py +++ b/fsspec_reference_maker/grib2.py @@ -197,5 +197,6 @@ def example_multi(filter={'typeOfLevel': 'heightAboveGround', 'level': 2}): # 'hrrr.t04z.wrfsfcf01.json', # 'hrrr.t05z.wrfsfcf01.json', # 'hrrr.t06z.wrfsfcf01.json'] - # mzz = MultiZarrToZarr(files, remote_protocol="s3", remote_options={"anon": True}, with_mf='time') + # mzz = MultiZarrToZarr(files, remote_protocol="s3", remote_options={"anon": True} + # concat_kwargs={"dim": 'time'}) # mzz.translate("hrrr.total.json") diff --git a/fsspec_reference_maker/hdf.py b/fsspec_reference_maker/hdf.py index cc60820a..25201d8b 100644 --- a/fsspec_reference_maker/hdf.py +++ b/fsspec_reference_maker/hdf.py @@ -290,7 +290,7 @@ def example_single(): ) fsspec.utils.setup_logging(logger=lggr) with fsspec.open(url, **so) as f: - h5chunks = SingleHdf5ToZarr(f, url, xarray=True) + h5chunks = SingleHdf5ToZarr(f, url) return h5chunks.translate() From 2f0e356737bc40b298183e0972765fef76d81f77 Mon Sep 17 00:00:00 2001 From: Ted Habermann Date: Wed, 30 Jun 2021 11:38:23 -0600 Subject: [PATCH 2/3] Improved error messages We are testing fsspec-reference-maker on many HDF5 files and most of the break. It is helpful to know more about why they break so I added test-specific error messages. --- fsspec_reference_maker/hdf.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/fsspec_reference_maker/hdf.py b/fsspec_reference_maker/hdf.py index 25201d8b..9ca2e39a 100644 --- a/fsspec_reference_maker/hdf.py +++ b/fsspec_reference_maker/hdf.py @@ -154,10 +154,18 @@ def _translator(self, name: str, h5obj: Union[h5py.Dataset, h5py.Group]): f'{h5obj.shape} {h5obj.dtype} {h5obj.nbytes} bytes>') return - if (h5obj.scaleoffset or h5obj.fletcher32 or - h5obj.compression in ('szip', 'lzf')): + # + # check for unsupported dataset properties + # + if h5obj.scaleoffset: raise RuntimeError( - f'{h5obj.name} uses unsupported HDF5 filters') + f'{h5obj.name} uses HDF5 scaleoffset filter - not supported by reference-maker') + if h5obj.fletcher32: + raise RuntimeError( + f'{h5obj.name} uses fletcher32 checksum - not supported by reference-maker') + if h5obj.compression in ('szip', 'lzf'): + raise RuntimeError( + f'{h5obj.name} uses szip or lzf compression - not supported by reference-maker') if h5obj.compression == 'gzip': compression = numcodecs.Zlib(level=h5obj.compression_opts) else: From cf5030e9a47c620b477a2c1383f44a0558894a01 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 6 Jul 2021 09:45:26 -0400 Subject: [PATCH 3/3] Update hdf.py --- fsspec_reference_maker/hdf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fsspec_reference_maker/hdf.py b/fsspec_reference_maker/hdf.py index 9ca2e39a..1636327b 100644 --- a/fsspec_reference_maker/hdf.py +++ b/fsspec_reference_maker/hdf.py @@ -155,7 +155,7 @@ def _translator(self, name: str, h5obj: Union[h5py.Dataset, h5py.Group]): return # - # check for unsupported dataset properties + # check for unsupported HDF encoding/filters # if h5obj.scaleoffset: raise RuntimeError(