diff --git a/fsspec_reference_maker/combine.py b/fsspec_reference_maker/combine.py index 0a3e9eab..d530f64a 100644 --- a/fsspec_reference_maker/combine.py +++ b/fsspec_reference_maker/combine.py @@ -8,25 +8,25 @@ import numcodecs import xarray as xr import zarr -logging = logging.getLogger('reference-combine') +logger = logging.getLogger('reference-combine') class MultiZarrToZarr: def __init__(self, path, remote_protocol, - remote_options=None, xarray_kwargs=None, storage_options=None, - with_mf=True): + remote_options=None, xarray_open_kwargs=None, xarray_concat_args=None, + preprocess=None, storage_options=None): """ :param path: a URL containing multiple JSONs :param xarray_kwargs: :param storage_options: """ - xarray_kwargs = xarray_kwargs or {} self.path = path - self.with_mf = with_mf - self.xr_kwargs = xarray_kwargs + self.xr_kwargs = xarray_open_kwargs or {} + self.concat_kwargs = xarray_concat_args or {} self.storage_options = storage_options or {} + self.preprocess = preprocess self.remote_protocol = remote_protocol self.remote_options = remote_options or {} @@ -36,6 +36,7 @@ def translate(self, outpath): self.output = self._consolidate(out) self._write(self.output, outpath) + # TODO: return new zarr dataset? @staticmethod def _write(refs, outpath, filetype=None): @@ -86,7 +87,7 @@ def _write(refs, outpath, filetype=None): compression="ZSTD" ) - def _consolidate(self, mapping, inline_threashold=100, template_count=5): + def _consolidate(self, mapping, inline_threshold=100, template_count=5): counts = Counter(v[0] for v in mapping.values() if isinstance(v, list)) def letter_sets(): @@ -104,7 +105,7 @@ def letter_sets(): out = {} for k, v in mapping.items(): - if isinstance(v, list) and v[2] < inline_threashold: + if isinstance(v, list) and v[2] < inline_threshold: v = self.fs.cat_file(v[0], start=v[1], end=v[1] + v[2]) if isinstance(v, bytes): try: @@ -158,15 +159,17 @@ def _determine_dims(self): self.fs = fss[0].fs mappers = [fs.get_mapper("") for fs in fss] - if self.with_mf is True: - ds = xr.open_mfdataset(mappers, engine="zarr", chunks={}, **self.xr_kwargs) - ds0 = xr.open_mfdataset(mappers[:1], engine="zarr", chunks={}, **self.xr_kwargs) - else: - dss = [xr.open_dataset(m, engine="zarr", chunks={}, **self.xr_kwargs) for m in mappers] - ds = xr.concat(dss, dim=self.with_mf) - ds0 = dss[0] + dss = [xr.open_dataset(m, engine="zarr", chunks={}, **self.xr_kwargs) + for m in mappers] + if self.preprocess: + dss = [self.preprocess(d) for d in dss] + ds = xr.concat(dss, **self.concat_kwargs) + ds0 = dss[0] self.extra_dims = set(ds.dims) - set(ds0.dims) - self.concat_dims = set(k for k, v in ds.dims.items() if k in ds0.dims and v / ds0.dims[k] == len(mappers)) + self.concat_dims = set( + k for k, v in ds.dims.items() + if k in ds0.dims and v / ds0.dims[k] == len(mappers) + ) self.same_dims = set(ds.dims) - self.extra_dims - self.concat_dims return ds, ds0, fss @@ -180,19 +183,29 @@ def drop_coords(ds): ds = ds.drop(['reference_time', 'crs']) return ds.reset_coords(drop=True) + xarray_open_kwargs = { + "decode_cf": False, + "mask_and_scale": False, + "decode_times": False, + "decode_timedelta": False, + "use_cftime": False, + "decode_coords": False + } + concat_kwargs = { + "data_vars": "minimal", + "coords": "minimal", + "compat": "override", + "join": "override", + "combine_attrs": "override", + "dim": "time" + } mzz = MultiZarrToZarr( "zip://*.json::out.zip", remote_protocol="s3", remote_options={'anon': True}, - xarray_kwargs={ - "preprocess": drop_coords, - "decode_cf": False, - "mask_and_scale": False, - "decode_times": False, - "decode_timedelta": False, - "use_cftime": False, - "decode_coords": False - }, + preprocess=drop_coords, + xarray_open_kwargs=xarray_open_kwargs, + xarray_concat_args=concat_kwargs ) mzz.translate("output.zarr") diff --git a/fsspec_reference_maker/grib2.py b/fsspec_reference_maker/grib2.py index a8e73b80..344cccca 100644 --- a/fsspec_reference_maker/grib2.py +++ b/fsspec_reference_maker/grib2.py @@ -197,5 +197,6 @@ def example_multi(filter={'typeOfLevel': 'heightAboveGround', 'level': 2}): # 'hrrr.t04z.wrfsfcf01.json', # 'hrrr.t05z.wrfsfcf01.json', # 'hrrr.t06z.wrfsfcf01.json'] - # mzz = MultiZarrToZarr(files, remote_protocol="s3", remote_options={"anon": True}, with_mf='time') + # mzz = MultiZarrToZarr(files, remote_protocol="s3", remote_options={"anon": True} + # concat_kwargs={"dim": 'time'}) # mzz.translate("hrrr.total.json") diff --git a/fsspec_reference_maker/hdf.py b/fsspec_reference_maker/hdf.py index cf7534e1..aa9a9f68 100644 --- a/fsspec_reference_maker/hdf.py +++ b/fsspec_reference_maker/hdf.py @@ -154,9 +154,15 @@ def _translator(self, name: str, h5obj: Union[h5py.Dataset, h5py.Group]): f'{h5obj.shape} {h5obj.dtype} {h5obj.nbytes} bytes>') return - if (h5obj.scaleoffset or h5obj.compression in ('szip', 'lzf')): + # + # check for unsupported HDF encoding/filters + # + if h5obj.scaleoffset: raise RuntimeError( - f'{h5obj.name} uses unsupported HDF5 filters') + f'{h5obj.name} uses HDF5 scaleoffset filter - not supported by reference-maker') + if h5obj.compression in ('szip', 'lzf'): + raise RuntimeError( + f'{h5obj.name} uses szip or lzf compression - not supported by reference-maker') if h5obj.compression == 'gzip': compression = numcodecs.Zlib(level=h5obj.compression_opts) else: @@ -191,6 +197,7 @@ def _translator(self, name: str, h5obj: Union[h5py.Dataset, h5py.Group]): if cinfo: for k, v in cinfo.items(): if h5obj.fletcher32: + logging.info("Discarding fletcher32 checksum") v['size'] -= 4 self.store[za._chunk_key(k)] = [self._uri, v['offset'], v['size']] @@ -291,7 +298,7 @@ def example_single(): ) fsspec.utils.setup_logging(logger=lggr) with fsspec.open(url, **so) as f: - h5chunks = SingleHdf5ToZarr(f, url, xarray=True) + h5chunks = SingleHdf5ToZarr(f, url) return h5chunks.translate()