From 9f5db681fa44cbaa03cf131e6d4b8fa2ac120753 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Mon, 19 Aug 2024 09:18:15 -0400 Subject: [PATCH 01/13] Scratch hdf4 --- kerchunk/hdf.py | 258 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 258 insertions(+) diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index 549923d4..761dd427 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -681,3 +681,261 @@ def _is_netcdf_variable(dataset: h5py.Dataset): def has_visititems_links(): return hasattr(h5py.Group, "visititems_links") + + +decoders = {} + + +def reg(name): + def f(func): + decoders[name] = func + return func + + return f + + +class HDF4ToZarr: + def __init__( + self, + path, + storage_options=None, + inline_threshold=100, + out=None, + remote_protocol=None, + remote_options=None, + ): + self.path = path + self.st = storage_options + self.thresh = inline_threshold + self.out = out or {} + self.remote_protocol = remote_protocol + self.remote_options = remote_options + + def read_int(self, n): + return int.from_bytes(self.f.read(n), "big") + + def read_ddh(self): + return {"ndd": self.read_int(2), "next": self.read_int(4)} + + def read_dd(self): + loc = self.f.tell() + i = int.from_bytes(self.f.read(2), "big") + if i & 0x4000: + extended = True + i = i - 0x4000 + else: + extended = False + tag = tags.get(i, i) + no_data = tag not in {"NULL"} + ref = (tag, int.from_bytes(self.f.read(2), "big")) + info = { + "offset": int.from_bytes(self.f.read(4), "big") * no_data, + "length": int.from_bytes(self.f.read(4), "big") * no_data, + "extended": extended, + "loc": loc, + } + return ref, info + + def decode(self, tag, info): + self.f.seek(info["offset"]) + ident = lambda _, __: info + return decoders.get(tag, ident)(self, info) + + def translate(self): + import zarr + + self.f = fsspec.open(self.path, **(self.st or {})).open() + fs = fsspec.filesystem( + "reference", + fo=self.out, + remote_protocol=self.remote_protocol, + remote_options=self.remote_options, + ) + g = zarr.open_group("reference://", storage_options=dict(fs=fs)) + assert self.f.read(4) == b"\x0e\x03\x13\x01" + self.tags = {} + while True: + ddh = self.read_ddh() + + for _ in range(ddh["ndd"]): + ident, info = self.read_dd() + self.tags[ident] = info + if ddh["next"] == 0: + # "finished" sentry + break + # or continue + self.f.seek(ddh["next"]) + + for (tag, ref), info in self.tags.items(): + if info["offset"]: + self.f.seek(info["offset"]) + if info["extended"]: + self._dec_extended() + else: + info.update(self.decode(tag, info)) + return self.tags + + def _dec_extended(self): + ext_type = spec[self.read_int(2)] + if ext_type == "CHUNKED": + return self._dec_chunked() + elif ext_type == "LINKED": + return self._dec_linked_header() + + def _dec_linked_header(self): + length = self.read_int(4) + blk_len = self.read_int(4) + num_blk = self.read_int(4) + next_ref = self.read_int(2) + out = [] + while next_ref: + next_ref, data = self._dec_linked_block( + self.tags[("LINKED", next_ref)], num_blk + ) + out.extend(out) + return + + def _dec_linked_block(self, block, num_blk): + self.f.seek(block["offset"]) + next_ref = self.read_int(2) + refs = [self.read_int(2) for _ in range(num_blk)] + return next_ref, refs + + def _dec_chunked(self): + tag_head_len = self.read_int(4) + version = self.f.read(1)[0] + flag = self.read_int(4) + elem_tot_len = self.read_int(4) + chunk_size = self.read_int(4) + return _pl(locals()) + + +@reg("VERSION") +def _dec_version(self, info): + return { + "major": self.read_int(4), + "minor": self.read_int(4), + "release": self.read_int(4), + "string:": _null_str(self.f.read(info["length"] - 10).decode()), + } + + +@reg("VH") +def _dec_vh(self, info): + interface = self.read_int(2) + nvert = self.read_int(4) + ivsize = self.read_int(2) + nfields = self.read_int(2) + types = [self.read_int(2) for _ in range(nfields)] + isize = [self.read_int(2) for _ in range(nfields)] + offset = [self.read_int(2) for _ in range(nfields)] + order = [self.read_int(2) for _ in range(nfields)] + names = [self.f.read(self.read_int(2)).decode() for _ in range(nfields)] + namelen = self.read_int(2) + name = self.f.read(namelen).decode() + classlen = self.read_int(2) + cls = self.f.read(classlen).decode() + ref = (self.read_int(2), self.read_int(2)) + return _pl(locals()) + + +def _null_str(s): + return s.split("\00", 1)[0] + + +def _pl(l): + return {k: v for k, v in l.items() if k not in {"info", "f", "self"}} + + +# hdf/src/htags.h +tags = { + 1: "NULL", + 20: "LINKED", + 30: "VERSION", + 40: "COMPRESSED", + 50: "VLINKED", + 51: "VLINKED_DATA", + 60: "CHUNKED", + 61: "CHUNK", + 100: "FID", + 101: "FD", + 102: "TID", + 103: "TD", + 104: "DIL", + 105: "DIA", + 106: "NT", + 107: "MT", + 108: "FREE", + 200: "ID8", + 201: "IP8", + 202: "RI8", + 203: "CI8", + 204: "II8", + 300: "ID", + 301: "LUT", + 302: "RI", + 303: "CI", + 304: "NRI", + 306: "RIG", + 307: "LD", + 308: "MD", + 309: "MA", + 310: "CCN", + 311: "CFM", + 312: "AR", + 400: "DRAW", + 401: "RUN", + 500: "XYP", + 501: "MTO", + 602: "T14", + 603: "T105", + 700: "SDG", + 701: "SDD", + 702: "SD", + 703: "SDS", + 704: "SDL", + 705: "SDU", + 706: "SDF", + 707: "SDM", + 708: "SDC", + 709: "SDT", + 710: "SDLNK", + 720: "NDG", + 731: "CAL", + 732: "FV", + 799: "BREQ", + 781: "SDRAG", + 780: "EREQ", + 1965: "VG", + 1962: "VH", + 1963: "VS", + 11: "RLE", + 12: "IMCOMP", + 13: "JPEG", + 14: "GREYJPEG", + 15: "JPEG5", + 16: "GREYJPEG5", +} +spec = { + 1: "LINKED", + 2: "EXT", + 3: "COMP", + 4: "VLINKED", + 5: "CHUNKED", + 6: "BUFFERED", + 7: "COMPRAS", +} + +# hdf4/hdf/src/hntdefs.h +dtypes = { + 5: "f4", + 6: "f8", + 20: "i1", # = CHAR, 3? + 21: "u1", # = UCHAR, 4? + 22: "i2", + 23: "u2", + 24: "i4", + 25: "u4", + 26: "i8", + 27: "u8", +} From 3240a8045df4abf88e1f953f16dd94dbc9516fe2 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Mon, 19 Aug 2024 11:50:39 -0400 Subject: [PATCH 02/13] fix up linked --- kerchunk/hdf.py | 61 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 46 insertions(+), 15 deletions(-) diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index 761dd427..4c90ce1b 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -766,15 +766,20 @@ def translate(self): # or continue self.f.seek(ddh["next"]) - for (tag, ref), info in self.tags.items(): - if info["offset"]: - self.f.seek(info["offset"]) - if info["extended"]: - self._dec_extended() - else: - info.update(self.decode(tag, info)) + for tag, ref in self.tags: + self._dec(tag, ref) return self.tags + def _dec(self, tag, ref): + info = self.tags[(tag, ref)] + if not set(info) - {"length", "offset", "extended", "loc"}: + self.f.seek(info["offset"]) + if info["extended"]: + info["data"] = self._dec_extended() + else: + info.update(self.decode(tag, info)) + return info + def _dec_extended(self): ext_type = spec[self.read_int(2)] if ext_type == "CHUNKED": @@ -783,30 +788,55 @@ def _dec_extended(self): return self._dec_linked_header() def _dec_linked_header(self): + # get the bytes of a linked set - these will always be inlined length = self.read_int(4) blk_len = self.read_int(4) num_blk = self.read_int(4) next_ref = self.read_int(2) out = [] while next_ref: - next_ref, data = self._dec_linked_block( - self.tags[("LINKED", next_ref)], num_blk - ) - out.extend(out) - return - - def _dec_linked_block(self, block, num_blk): + next_ref, data = self._dec_linked_block(self.tags[("LINKED", next_ref)]) + out.extend([d for d in data if d]) + bits = [] + for ref in out: + info = self.tags[("LINKED", ref)] + self.f.seek(info["offset"]) + bits.append(self.f.read(info["length"])) + return b"".join(bits) + + def _dec_linked_block(self, block): self.f.seek(block["offset"]) next_ref = self.read_int(2) - refs = [self.read_int(2) for _ in range(num_blk)] + refs = [self.read_int(2) for _ in range((block["length"] // 2) - 1)] return next_ref, refs def _dec_chunked(self): + # we want to turn the chunks table into references tag_head_len = self.read_int(4) version = self.f.read(1)[0] flag = self.read_int(4) elem_tot_len = self.read_int(4) chunk_size = self.read_int(4) + nt_size = self.read_int(4) + chk_tbl_tag = tags[self.read_int(2)] # should be VH + chk_tbl_ref = self.read_int(2) + sp_tag = tags[self.read_int(2)] + sp_ref = self.read_int(2) + ndims = self.read_int(4) + dims = [ + { + "flag": self.read_int(4), + "dim_length": self.read_int(4), + "chunk_length": self.read_int(4), + } + for _ in range(ndims) + ] + fill_value = self.f.read( + self.read_int(4) + ) # to be interpreted as a number later + header = self._dec(chk_tbl_tag, chk_tbl_ref) + data = self._dec("VS", chk_tbl_ref)["data"] # corresponding table + # header gives the field pattern for the rows of data, one per chunk return _pl(locals()) @@ -822,6 +852,7 @@ def _dec_version(self, info): @reg("VH") def _dec_vh(self, info): + # virtual group ("table") header interface = self.read_int(2) nvert = self.read_int(4) ivsize = self.read_int(2) From d761239b4184150a2ad636c29ae36d8f3182dfd4 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Mon, 19 Aug 2024 14:19:10 -0400 Subject: [PATCH 03/13] comp --- kerchunk/hdf.py | 51 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index 4c90ce1b..b3939d52 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -6,6 +6,8 @@ import fsspec.core from fsspec.implementations.reference import LazyReferenceMapper import numpy as np +from numba.core.pythonapi import reflect + import zarr import numcodecs @@ -786,6 +788,8 @@ def _dec_extended(self): return self._dec_chunked() elif ext_type == "LINKED": return self._dec_linked_header() + elif ext_type == "COMP": + return self._dec_comp() def _dec_linked_header(self): # get the bytes of a linked set - these will always be inlined @@ -837,7 +841,41 @@ def _dec_chunked(self): header = self._dec(chk_tbl_tag, chk_tbl_ref) data = self._dec("VS", chk_tbl_ref)["data"] # corresponding table # header gives the field pattern for the rows of data, one per chunk - return _pl(locals()) + dt = [(f"ind{i}", ">u4") for i in range(len(dims))] + [ + ("tag", ">u2"), + ("ref", ">u2"), + ] + rows = np.frombuffer(data, dtype=dt, count=header["nvert"]) + refs = [] + for *ind, tag, ref in rows: + # maybe ind needs reversing since everything is FORTRAN + chunk_tag = self.tags[(tags[tag], ref)] + if chunk_tag["extended"]: + self.f.seek(chunk_tag["offset"]) + # these are always COMP? + ctype, offset, length = self._dec_extended() + refs.append([".".join(str(_) for _ in ind), offset, length, ctype]) + else: + refs.append( + [ + ".".join(str(_) for _ in ind), + chunk_tag["offset"], + chunk_tag["length"], + ] + ) + # ref["tag"] should always be 61 -> CHUNK + return refs + + def _dec_comp(self): + version = self.read_int(2) # always 0 + len_uncomp = self.read_int(4) + data_ref = self.read_int(2) + model = self.read_int(2) # always 0 + ctype = comp[self.read_int(2)] + tag = self.tags[("COMPRESSED", data_ref)] + offset = tag["offset"] + length = tag["length"] + return ctype, offset, length @reg("VERSION") @@ -970,3 +1008,14 @@ def _pl(l): 26: "i8", 27: "u8", } + +# hdf4/hdf/src/hcomp.h +comp = { + 0: "NONE", + 1: "RLE", + 2: "NBIT", + 3: "SKPHUFF", + 4: "DEFLATE", + 5: "SZIP", + 7: "JPEG", +} From 10f086afd3ba01f9b7f31d6b6c101373f4eedb0c Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Mon, 19 Aug 2024 21:07:04 -0400 Subject: [PATCH 04/13] a couple of more tags --- kerchunk/hdf.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index b3939d52..516e21ea 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -846,6 +846,7 @@ def _dec_chunked(self): ("ref", ">u2"), ] rows = np.frombuffer(data, dtype=dt, count=header["nvert"]) + # rows["tag"] should always be 61 -> CHUNK refs = [] for *ind, tag, ref in rows: # maybe ind needs reversing since everything is FORTRAN @@ -863,7 +864,6 @@ def _dec_chunked(self): chunk_tag["length"], ] ) - # ref["tag"] should always be 61 -> CHUNK return refs def _dec_comp(self): @@ -878,6 +878,21 @@ def _dec_comp(self): return ctype, offset, length +@reg("NDG") +def _dec_ndg(self, info): + # links together these things as a Data Group + return [(tags[self.read_int(2)], self.read_int(2)) for _ in range(0, info["length"], 4)] + + +@reg("SDD") +def _dec_sdd(self, info): + rank = self.read_int(2) + dims = [self.read_int(4) for _ in range(rank)] + data_tag = (tags[self.read_int(2)], self.read_int(2)) + scale_tags = [(tags[self.read_int(2)], self.read_int(2)) for _ in range(rank)] + return _pl(locals()) + + @reg("VERSION") def _dec_version(self, info): return { @@ -970,6 +985,9 @@ def _pl(l): 709: "SDT", 710: "SDLNK", 720: "NDG", + 721: "RESERVED", +# "Objects of tag 721 are never actually written to the file. The tag is +# needed to make things easier mixing DFSD and SD style objects in the same file" 731: "CAL", 732: "FV", 799: "BREQ", @@ -1015,7 +1033,7 @@ def _pl(l): 1: "RLE", 2: "NBIT", 3: "SKPHUFF", - 4: "DEFLATE", + 4: "DEFLATE", # called deflate, but code says "gzip" and doc says "GNU zip" 5: "SZIP", 7: "JPEG", } From 5b483b1b385e9e1bbc2f8715fe8e4d7e03ac9abb Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 20 Aug 2024 09:23:51 -0400 Subject: [PATCH 05/13] Update kerchunk/hdf.py --- kerchunk/hdf.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index 516e21ea..1d2e85f6 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -6,8 +6,6 @@ import fsspec.core from fsspec.implementations.reference import LazyReferenceMapper import numpy as np -from numba.core.pythonapi import reflect - import zarr import numcodecs From 25463f0bb7a729a67eea3310778732b1f67a2bad Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 20 Aug 2024 13:52:22 -0400 Subject: [PATCH 06/13] global attributes --- kerchunk/hdf.py | 51 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index 516e21ea..1f3a1e5d 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -754,7 +754,11 @@ def translate(self): remote_options=self.remote_options, ) g = zarr.open_group("reference://", storage_options=dict(fs=fs)) + + # magic header assert self.f.read(4) == b"\x0e\x03\x13\x01" + + # all the data descriptors in a linked list self.tags = {} while True: ddh = self.read_ddh() @@ -768,8 +772,32 @@ def translate(self): # or continue self.f.seek(ddh["next"]) + # basic decode for tag, ref in self.tags: self._dec(tag, ref) + + # attributes + for (tag, ref), info in self.tags.items(): + if tag == "VH" and info["names"][0].upper() == "VALUES": + dtype = dtypes[info["types"][0]] + inf2 = self.tags[("VS", ref)] + self.f.seek(inf2["offset"]) + data = self.f.read(inf2["length"]) + if info["name"] == "CoreMetadata.0": + obj = None + for line in data.decode().split("\n"): + if "OBJECT" in line: + obj = line.split()[-1] + if "VALUE" in line: + g.attrs[obj] = line.split()[-1].lstrip('"').rstrip('"') + elif dtype == "str": + g.attrs[info["name"]] = ( + data.decode().lstrip('"').rstrip('"') + ) # decode() ? + else: + g.attrs[info["name"]] = np.frombuffer(data, dtype)[0] + g.attrs.pop("ArchiveMetadata.0") + print(dict(g.attrs)) return self.tags def _dec(self, tag, ref): @@ -881,7 +909,9 @@ def _dec_comp(self): @reg("NDG") def _dec_ndg(self, info): # links together these things as a Data Group - return [(tags[self.read_int(2)], self.read_int(2)) for _ in range(0, info["length"], 4)] + return [ + (tags[self.read_int(2)], self.read_int(2)) for _ in range(0, info["length"], 4) + ] @reg("SDD") @@ -923,6 +953,16 @@ def _dec_vh(self, info): return _pl(locals()) +@reg("VG") +def _dec_vg(self, info): + nelt = self.read_int(2) + tags = [self.read_int(2) for _ in range(nelt)] + refs = [self.read_int(2) for _ in range(nelt)] + name = self.f.read(self.read_int(2)).decode() + cls = self.f.read(self.read_int(2)).decode() + return _pl(locals()) + + def _null_str(s): return s.split("\00", 1)[0] @@ -986,8 +1026,8 @@ def _pl(l): 710: "SDLNK", 720: "NDG", 721: "RESERVED", -# "Objects of tag 721 are never actually written to the file. The tag is -# needed to make things easier mixing DFSD and SD style objects in the same file" + # "Objects of tag 721 are never actually written to the file. The tag is + # needed to make things easier mixing DFSD and SD style objects in the same file" 731: "CAL", 732: "FV", 799: "BREQ", @@ -1017,8 +1057,9 @@ def _pl(l): dtypes = { 5: "f4", 6: "f8", - 20: "i1", # = CHAR, 3? - 21: "u1", # = UCHAR, 4? + 20: "i1", + 21: "u1", + 4: "str", # special case, size given in header 22: "i2", 23: "u2", 24: "i4", From df61060869e367da9674d33962631d81ead76865 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Wed, 21 Aug 2024 20:42:20 -0400 Subject: [PATCH 07/13] Make group nesting --- kerchunk/hdf.py | 158 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 118 insertions(+), 40 deletions(-) diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index b298c66a..fccbe69d 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -774,29 +774,76 @@ def translate(self): for tag, ref in self.tags: self._dec(tag, ref) - # attributes + # global attributes + attrs = {} for (tag, ref), info in self.tags.items(): if tag == "VH" and info["names"][0].upper() == "VALUES": dtype = dtypes[info["types"][0]] inf2 = self.tags[("VS", ref)] self.f.seek(inf2["offset"]) data = self.f.read(inf2["length"]) - if info["name"] == "CoreMetadata.0": + # NASA conventions + if info["name"].startswith(("CoreMetadata.", "ArchiveMetadata.")): obj = None for line in data.decode().split("\n"): if "OBJECT" in line: obj = line.split()[-1] if "VALUE" in line: - g.attrs[obj] = line.split()[-1].lstrip('"').rstrip('"') - elif dtype == "str": - g.attrs[info["name"]] = ( - data.decode().lstrip('"').rstrip('"') - ) # decode() ? - else: - g.attrs[info["name"]] = np.frombuffer(data, dtype)[0] - g.attrs.pop("ArchiveMetadata.0") - print(dict(g.attrs)) - return self.tags + attrs[obj] = line.split()[-1].lstrip('"').rstrip('"') + g.attrs.update(attrs) + + # there should be only one root, and it's probably the last VG + # so maybe this loop isn't needed + roots = set() + children = set() + child = {} + for (tag, ref), info in self.tags.items(): + if tag == "VG": + here = child.setdefault((tag, ref), set()) + for t, r in zip(info["tag"], info["refs"]): + if t == "VG": + children.add((t, r)) + roots.discard((t, r)) + here.add((t, r)) + if tag not in children: + roots.add((tag, ref)) + for t, r in roots: + self.tags[(t, r)] = self._descend_vg(t, r) + return self.tags, roots + + def _descend_vg(self, tag, ref): + info = self.tags[(tag, ref)] + out = {} + for t, r in zip(info["tag"], info["refs"]): + inf2 = self.tags[(t, r)] + if t == "VG": + tmp = self._descend_vg(t, r) + if list(tmp)[0] == inf2["name"]: + tmp = tmp[inf2["name"]] + out[inf2["name"]] = tmp + elif t == "VH": + if len(inf2["names"]) == 1 and inf2["names"][0].lower() == "values": + dtype = dtypes[inf2["types"][0]] + inf2 = self.tags[("VS", r)] + self.f.seek(inf2["offset"]) + data = self.f.read(inf2["length"]) + if dtype == "str": + out[info["name"]] = ( + data.decode().lstrip('"').rstrip('"') + ) # decode() ? + else: + out[info["name"]] = np.frombuffer(data, dtype)[0] + elif t == "NT": + out["dtype"] = inf2["typ"] + elif t == "SD": + out["refs"] = inf2["data"][:-1] + out["chunks"] = [_["chunk_length"] for _ in inf2["data"][-1]] + elif t == "SDD": + out["dims"] = inf2["dims"] + else: + # NDGs contain same info as NT, SD and SDD + pass + return out def _dec(self, tag, ref): info = self.tags[(tag, ref)] @@ -842,18 +889,20 @@ def _dec_linked_block(self, block): def _dec_chunked(self): # we want to turn the chunks table into references - tag_head_len = self.read_int(4) - version = self.f.read(1)[0] - flag = self.read_int(4) - elem_tot_len = self.read_int(4) - chunk_size = self.read_int(4) - nt_size = self.read_int(4) + # tag_head_len = self.read_int(4) + # version = self.f.read(1)[0] + # flag = self.read_int(4) + # elem_tot_len = self.read_int(4) + # chunk_size = self.read_int(4) + # nt_size = self.read_int(4) + self.f.seek(21, 1) chk_tbl_tag = tags[self.read_int(2)] # should be VH chk_tbl_ref = self.read_int(2) sp_tag = tags[self.read_int(2)] sp_ref = self.read_int(2) ndims = self.read_int(4) - dims = [ + + dims = [ # we don't use these, could { "flag": self.read_int(4), "dim_length": self.read_int(4), @@ -863,11 +912,15 @@ def _dec_chunked(self): ] fill_value = self.f.read( self.read_int(4) - ) # to be interpreted as a number later + ) # to be interpreted as a number later; but chunk table probs has no fill + # self.f.seek(12*ndims + 4, 1) + header = self._dec(chk_tbl_tag, chk_tbl_ref) data = self._dec("VS", chk_tbl_ref)["data"] # corresponding table + # header gives the field pattern for the rows of data, one per chunk - dt = [(f"ind{i}", ">u4") for i in range(len(dims))] + [ + # maybe faster to use struct and iter than numpy, since we iterate anyway + dt = [(f"ind{i}", ">u4") for i in range(ndims)] + [ ("tag", ">u2"), ("ref", ">u2"), ] @@ -876,7 +929,7 @@ def _dec_chunked(self): refs = [] for *ind, tag, ref in rows: # maybe ind needs reversing since everything is FORTRAN - chunk_tag = self.tags[(tags[tag], ref)] + chunk_tag = self.tags[("CHUNK", ref)] if chunk_tag["extended"]: self.f.seek(chunk_tag["offset"]) # these are always COMP? @@ -890,26 +943,30 @@ def _dec_chunked(self): chunk_tag["length"], ] ) + refs.append(dims) return refs def _dec_comp(self): - version = self.read_int(2) # always 0 - len_uncomp = self.read_int(4) + # version = self.read_int(2) # always 0 + # len_uncomp = self.read_int(4) + self.f.seek(6, 1) + data_ref = self.read_int(2) - model = self.read_int(2) # always 0 - ctype = comp[self.read_int(2)] + # model = self.read_int(2) # always 0 + ctype = "DEFLATE" # comp[self.read_int(2)] tag = self.tags[("COMPRESSED", data_ref)] - offset = tag["offset"] - length = tag["length"] - return ctype, offset, length + return ctype, tag["offset"], tag["length"] @reg("NDG") def _dec_ndg(self, info): # links together these things as a Data Group - return [ - (tags[self.read_int(2)], self.read_int(2)) for _ in range(0, info["length"], 4) - ] + return { + "tags": [ + (tags[self.read_int(2)], self.read_int(2)) + for _ in range(0, info["length"], 4) + ] + } @reg("SDD") @@ -940,7 +997,7 @@ def _dec_vh(self, info): nfields = self.read_int(2) types = [self.read_int(2) for _ in range(nfields)] isize = [self.read_int(2) for _ in range(nfields)] - offset = [self.read_int(2) for _ in range(nfields)] + offsets = [self.read_int(2) for _ in range(nfields)] order = [self.read_int(2) for _ in range(nfields)] names = [self.f.read(self.read_int(2)).decode() for _ in range(nfields)] namelen = self.read_int(2) @@ -954,13 +1011,23 @@ def _dec_vh(self, info): @reg("VG") def _dec_vg(self, info): nelt = self.read_int(2) - tags = [self.read_int(2) for _ in range(nelt)] + tag = [tags[self.read_int(2)] for _ in range(nelt)] refs = [self.read_int(2) for _ in range(nelt)] name = self.f.read(self.read_int(2)).decode() cls = self.f.read(self.read_int(2)).decode() return _pl(locals()) +import zipfile + + +@reg("NT") +def _dec_nt(self, info): + version, typ, width, cls = list(self.f.read(4)) + typ = dtypes[typ] + return _pl(locals()) + + def _null_str(s): return s.split("\00", 1)[0] @@ -1058,12 +1125,12 @@ def _pl(l): 20: "i1", 21: "u1", 4: "str", # special case, size given in header - 22: "i2", - 23: "u2", - 24: "i4", - 25: "u4", - 26: "i8", - 27: "u8", + 22: ">i2", + 23: ">u2", + 24: ">i4", + 25: ">u4", + 26: ">i8", + 27: ">u8", } # hdf4/hdf/src/hcomp.h @@ -1076,3 +1143,14 @@ def _pl(l): 5: "SZIP", 7: "JPEG", } + + +class FLATECodec: # (numcodecs.abc.Codec) + def __init__(self): + ... + + def decode(self, data): + import zlib + + obj = zlib.decompressobj(-15) + return obj.decompress(data) From 93093e9c386d6509c76bbb9a9a82b72b0e336388 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 27 Aug 2024 14:22:49 -0400 Subject: [PATCH 08/13] mostly working --- kerchunk/codecs.py | 17 ++++++++++ kerchunk/hdf.py | 77 +++++++++++++++++++++++++++++----------------- 2 files changed, 65 insertions(+), 29 deletions(-) diff --git a/kerchunk/codecs.py b/kerchunk/codecs.py index 3d206e71..10b71888 100644 --- a/kerchunk/codecs.py +++ b/kerchunk/codecs.py @@ -5,6 +5,7 @@ from numcodecs.abc import Codec import numpy as np import threading +import zlib class FillStringsCodec(Codec): @@ -238,3 +239,19 @@ def decode(self, buf, out=None): def encode(self, buf): raise NotImplementedError + + +class ZlibCodec(numcodecs.abc.Codec): + codec_id = "zlib" + + def __init__(self): + ... + + def decode(self, data, out=None): + if out: + out[:] = zlib.decompress(data) + return out + return zlib.decompress(data) + + def encode(self, buf): + return zlib.compress(buf) diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index fccbe69d..2ac3d7de 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -743,15 +743,9 @@ def decode(self, tag, info): def translate(self): import zarr + from kerchunk.codecs import ZlibCodec self.f = fsspec.open(self.path, **(self.st or {})).open() - fs = fsspec.filesystem( - "reference", - fo=self.out, - remote_protocol=self.remote_protocol, - remote_options=self.remote_options, - ) - g = zarr.open_group("reference://", storage_options=dict(fs=fs)) # magic header assert self.f.read(4) == b"\x0e\x03\x13\x01" @@ -790,7 +784,6 @@ def translate(self): obj = line.split()[-1] if "VALUE" in line: attrs[obj] = line.split()[-1].lstrip('"').rstrip('"') - g.attrs.update(attrs) # there should be only one root, and it's probably the last VG # so maybe this loop isn't needed @@ -807,9 +800,46 @@ def translate(self): here.add((t, r)) if tag not in children: roots.add((tag, ref)) - for t, r in roots: - self.tags[(t, r)] = self._descend_vg(t, r) - return self.tags, roots + + # hierarchical output + output = self._descend_vg(*list(roots)[0]) + fs = fsspec.filesystem( + "reference", + fo=self.out, + remote_protocol=self.remote_protocol, + remote_options=self.remote_options, + ) + g = zarr.open_group("reference://", storage_options=dict(fs=fs)) + for k, v in output.items(): + if isinstance(v, dict): + compression = ZlibCodec() if "refs" in v else None + arr = g.create_dataset( + name=k, + shape=v["dims"], + dtype=v["dtype"], + chunks=v.get("chunks", v["dims"]), + compressor=compression, + overwrite=True, + ) + arr.attrs.update( + dict( + _ARRAY_DIMENSIONS=[f"{k}_x", f"{k}_y"][: len(v["dims"])] + if "refs" in v + else ["0"], + **{ + i: j + for i, j in v.items() + if i not in {"chunk", "dims", "dtype", "refs"} + }, + ) + ) + for r in v.get("refs", []): + self.out[f"{k}/{r[0]}"] = [self.path, r[1], r[2]] + else: + attrs[k] = v + g.attrs.update(attrs) + + return fs.references def _descend_vg(self, tag, ref): info = self.tags[(tag, ref)] @@ -824,15 +854,14 @@ def _descend_vg(self, tag, ref): elif t == "VH": if len(inf2["names"]) == 1 and inf2["names"][0].lower() == "values": dtype = dtypes[inf2["types"][0]] + name = inf2["name"] inf2 = self.tags[("VS", r)] self.f.seek(inf2["offset"]) data = self.f.read(inf2["length"]) if dtype == "str": - out[info["name"]] = ( - data.decode().lstrip('"').rstrip('"') - ) # decode() ? + out[name] = data.decode().lstrip('"').rstrip('"') # decode() ? else: - out[info["name"]] = np.frombuffer(data, dtype)[0] + out[name] = np.frombuffer(data, dtype)[0] elif t == "NT": out["dtype"] = inf2["typ"] elif t == "SD": @@ -902,7 +931,7 @@ def _dec_chunked(self): sp_ref = self.read_int(2) ndims = self.read_int(4) - dims = [ # we don't use these, could + dims = [ # we don't use these, could skip { "flag": self.read_int(4), "dim_length": self.read_int(4), @@ -913,7 +942,7 @@ def _dec_chunked(self): fill_value = self.f.read( self.read_int(4) ) # to be interpreted as a number later; but chunk table probs has no fill - # self.f.seek(12*ndims + 4, 1) + # self.f.seek(12*ndims + 4, 1) # if skipping header = self._dec(chk_tbl_tag, chk_tbl_ref) data = self._dec("VS", chk_tbl_ref)["data"] # corresponding table @@ -1139,18 +1168,8 @@ def _pl(l): 1: "RLE", 2: "NBIT", 3: "SKPHUFF", - 4: "DEFLATE", # called deflate, but code says "gzip" and doc says "GNU zip" + 4: "DEFLATE", # called deflate, but code says "gzip" and doc says "GNU zip"; actually zlib? + # see codecs.ZlibCodec 5: "SZIP", 7: "JPEG", } - - -class FLATECodec: # (numcodecs.abc.Codec) - def __init__(self): - ... - - def decode(self, data): - import zlib - - obj = zlib.decompressobj(-15) - return obj.decompress(data) From f179b73e12489d75034df64dcb6a8b4a2a1956ff Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 27 Aug 2024 14:33:58 -0400 Subject: [PATCH 09/13] fix refs --- kerchunk/hdf.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index 2ac3d7de..4ddc951f 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -810,6 +810,7 @@ def translate(self): remote_options=self.remote_options, ) g = zarr.open_group("reference://", storage_options=dict(fs=fs)) + refs = {} for k, v in output.items(): if isinstance(v, dict): compression = ZlibCodec() if "refs" in v else None @@ -834,9 +835,10 @@ def translate(self): ) ) for r in v.get("refs", []): - self.out[f"{k}/{r[0]}"] = [self.path, r[1], r[2]] + refs[f"{k}/{r[0]}"] = [self.path, r[1], r[2]] else: attrs[k] = v + fs.references.update(refs) g.attrs.update(attrs) return fs.references From f294ddc1527b22e448182ba2ca0f92a749a21212 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Fri, 30 Aug 2024 11:06:52 -0400 Subject: [PATCH 10/13] import --- kerchunk/hdf.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index 4ddc951f..97aae62f 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -4,6 +4,7 @@ from typing import Union, BinaryIO import fsspec.core +import ujson from fsspec.implementations.reference import LazyReferenceMapper import numpy as np import zarr From 90a353bdc035061bbff1071131364562df31ed5c Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 3 Sep 2024 10:25:12 -0400 Subject: [PATCH 11/13] Move to separate module --- kerchunk/hdf.py | 495 --------------------------------------------- kerchunk/hdf4.py | 508 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 508 insertions(+), 495 deletions(-) create mode 100644 kerchunk/hdf4.py diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index 97aae62f..549923d4 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -4,7 +4,6 @@ from typing import Union, BinaryIO import fsspec.core -import ujson from fsspec.implementations.reference import LazyReferenceMapper import numpy as np import zarr @@ -682,497 +681,3 @@ def _is_netcdf_variable(dataset: h5py.Dataset): def has_visititems_links(): return hasattr(h5py.Group, "visititems_links") - - -decoders = {} - - -def reg(name): - def f(func): - decoders[name] = func - return func - - return f - - -class HDF4ToZarr: - def __init__( - self, - path, - storage_options=None, - inline_threshold=100, - out=None, - remote_protocol=None, - remote_options=None, - ): - self.path = path - self.st = storage_options - self.thresh = inline_threshold - self.out = out or {} - self.remote_protocol = remote_protocol - self.remote_options = remote_options - - def read_int(self, n): - return int.from_bytes(self.f.read(n), "big") - - def read_ddh(self): - return {"ndd": self.read_int(2), "next": self.read_int(4)} - - def read_dd(self): - loc = self.f.tell() - i = int.from_bytes(self.f.read(2), "big") - if i & 0x4000: - extended = True - i = i - 0x4000 - else: - extended = False - tag = tags.get(i, i) - no_data = tag not in {"NULL"} - ref = (tag, int.from_bytes(self.f.read(2), "big")) - info = { - "offset": int.from_bytes(self.f.read(4), "big") * no_data, - "length": int.from_bytes(self.f.read(4), "big") * no_data, - "extended": extended, - "loc": loc, - } - return ref, info - - def decode(self, tag, info): - self.f.seek(info["offset"]) - ident = lambda _, __: info - return decoders.get(tag, ident)(self, info) - - def translate(self): - import zarr - from kerchunk.codecs import ZlibCodec - - self.f = fsspec.open(self.path, **(self.st or {})).open() - - # magic header - assert self.f.read(4) == b"\x0e\x03\x13\x01" - - # all the data descriptors in a linked list - self.tags = {} - while True: - ddh = self.read_ddh() - - for _ in range(ddh["ndd"]): - ident, info = self.read_dd() - self.tags[ident] = info - if ddh["next"] == 0: - # "finished" sentry - break - # or continue - self.f.seek(ddh["next"]) - - # basic decode - for tag, ref in self.tags: - self._dec(tag, ref) - - # global attributes - attrs = {} - for (tag, ref), info in self.tags.items(): - if tag == "VH" and info["names"][0].upper() == "VALUES": - dtype = dtypes[info["types"][0]] - inf2 = self.tags[("VS", ref)] - self.f.seek(inf2["offset"]) - data = self.f.read(inf2["length"]) - # NASA conventions - if info["name"].startswith(("CoreMetadata.", "ArchiveMetadata.")): - obj = None - for line in data.decode().split("\n"): - if "OBJECT" in line: - obj = line.split()[-1] - if "VALUE" in line: - attrs[obj] = line.split()[-1].lstrip('"').rstrip('"') - - # there should be only one root, and it's probably the last VG - # so maybe this loop isn't needed - roots = set() - children = set() - child = {} - for (tag, ref), info in self.tags.items(): - if tag == "VG": - here = child.setdefault((tag, ref), set()) - for t, r in zip(info["tag"], info["refs"]): - if t == "VG": - children.add((t, r)) - roots.discard((t, r)) - here.add((t, r)) - if tag not in children: - roots.add((tag, ref)) - - # hierarchical output - output = self._descend_vg(*list(roots)[0]) - fs = fsspec.filesystem( - "reference", - fo=self.out, - remote_protocol=self.remote_protocol, - remote_options=self.remote_options, - ) - g = zarr.open_group("reference://", storage_options=dict(fs=fs)) - refs = {} - for k, v in output.items(): - if isinstance(v, dict): - compression = ZlibCodec() if "refs" in v else None - arr = g.create_dataset( - name=k, - shape=v["dims"], - dtype=v["dtype"], - chunks=v.get("chunks", v["dims"]), - compressor=compression, - overwrite=True, - ) - arr.attrs.update( - dict( - _ARRAY_DIMENSIONS=[f"{k}_x", f"{k}_y"][: len(v["dims"])] - if "refs" in v - else ["0"], - **{ - i: j - for i, j in v.items() - if i not in {"chunk", "dims", "dtype", "refs"} - }, - ) - ) - for r in v.get("refs", []): - refs[f"{k}/{r[0]}"] = [self.path, r[1], r[2]] - else: - attrs[k] = v - fs.references.update(refs) - g.attrs.update(attrs) - - return fs.references - - def _descend_vg(self, tag, ref): - info = self.tags[(tag, ref)] - out = {} - for t, r in zip(info["tag"], info["refs"]): - inf2 = self.tags[(t, r)] - if t == "VG": - tmp = self._descend_vg(t, r) - if list(tmp)[0] == inf2["name"]: - tmp = tmp[inf2["name"]] - out[inf2["name"]] = tmp - elif t == "VH": - if len(inf2["names"]) == 1 and inf2["names"][0].lower() == "values": - dtype = dtypes[inf2["types"][0]] - name = inf2["name"] - inf2 = self.tags[("VS", r)] - self.f.seek(inf2["offset"]) - data = self.f.read(inf2["length"]) - if dtype == "str": - out[name] = data.decode().lstrip('"').rstrip('"') # decode() ? - else: - out[name] = np.frombuffer(data, dtype)[0] - elif t == "NT": - out["dtype"] = inf2["typ"] - elif t == "SD": - out["refs"] = inf2["data"][:-1] - out["chunks"] = [_["chunk_length"] for _ in inf2["data"][-1]] - elif t == "SDD": - out["dims"] = inf2["dims"] - else: - # NDGs contain same info as NT, SD and SDD - pass - return out - - def _dec(self, tag, ref): - info = self.tags[(tag, ref)] - if not set(info) - {"length", "offset", "extended", "loc"}: - self.f.seek(info["offset"]) - if info["extended"]: - info["data"] = self._dec_extended() - else: - info.update(self.decode(tag, info)) - return info - - def _dec_extended(self): - ext_type = spec[self.read_int(2)] - if ext_type == "CHUNKED": - return self._dec_chunked() - elif ext_type == "LINKED": - return self._dec_linked_header() - elif ext_type == "COMP": - return self._dec_comp() - - def _dec_linked_header(self): - # get the bytes of a linked set - these will always be inlined - length = self.read_int(4) - blk_len = self.read_int(4) - num_blk = self.read_int(4) - next_ref = self.read_int(2) - out = [] - while next_ref: - next_ref, data = self._dec_linked_block(self.tags[("LINKED", next_ref)]) - out.extend([d for d in data if d]) - bits = [] - for ref in out: - info = self.tags[("LINKED", ref)] - self.f.seek(info["offset"]) - bits.append(self.f.read(info["length"])) - return b"".join(bits) - - def _dec_linked_block(self, block): - self.f.seek(block["offset"]) - next_ref = self.read_int(2) - refs = [self.read_int(2) for _ in range((block["length"] // 2) - 1)] - return next_ref, refs - - def _dec_chunked(self): - # we want to turn the chunks table into references - # tag_head_len = self.read_int(4) - # version = self.f.read(1)[0] - # flag = self.read_int(4) - # elem_tot_len = self.read_int(4) - # chunk_size = self.read_int(4) - # nt_size = self.read_int(4) - self.f.seek(21, 1) - chk_tbl_tag = tags[self.read_int(2)] # should be VH - chk_tbl_ref = self.read_int(2) - sp_tag = tags[self.read_int(2)] - sp_ref = self.read_int(2) - ndims = self.read_int(4) - - dims = [ # we don't use these, could skip - { - "flag": self.read_int(4), - "dim_length": self.read_int(4), - "chunk_length": self.read_int(4), - } - for _ in range(ndims) - ] - fill_value = self.f.read( - self.read_int(4) - ) # to be interpreted as a number later; but chunk table probs has no fill - # self.f.seek(12*ndims + 4, 1) # if skipping - - header = self._dec(chk_tbl_tag, chk_tbl_ref) - data = self._dec("VS", chk_tbl_ref)["data"] # corresponding table - - # header gives the field pattern for the rows of data, one per chunk - # maybe faster to use struct and iter than numpy, since we iterate anyway - dt = [(f"ind{i}", ">u4") for i in range(ndims)] + [ - ("tag", ">u2"), - ("ref", ">u2"), - ] - rows = np.frombuffer(data, dtype=dt, count=header["nvert"]) - # rows["tag"] should always be 61 -> CHUNK - refs = [] - for *ind, tag, ref in rows: - # maybe ind needs reversing since everything is FORTRAN - chunk_tag = self.tags[("CHUNK", ref)] - if chunk_tag["extended"]: - self.f.seek(chunk_tag["offset"]) - # these are always COMP? - ctype, offset, length = self._dec_extended() - refs.append([".".join(str(_) for _ in ind), offset, length, ctype]) - else: - refs.append( - [ - ".".join(str(_) for _ in ind), - chunk_tag["offset"], - chunk_tag["length"], - ] - ) - refs.append(dims) - return refs - - def _dec_comp(self): - # version = self.read_int(2) # always 0 - # len_uncomp = self.read_int(4) - self.f.seek(6, 1) - - data_ref = self.read_int(2) - # model = self.read_int(2) # always 0 - ctype = "DEFLATE" # comp[self.read_int(2)] - tag = self.tags[("COMPRESSED", data_ref)] - return ctype, tag["offset"], tag["length"] - - -@reg("NDG") -def _dec_ndg(self, info): - # links together these things as a Data Group - return { - "tags": [ - (tags[self.read_int(2)], self.read_int(2)) - for _ in range(0, info["length"], 4) - ] - } - - -@reg("SDD") -def _dec_sdd(self, info): - rank = self.read_int(2) - dims = [self.read_int(4) for _ in range(rank)] - data_tag = (tags[self.read_int(2)], self.read_int(2)) - scale_tags = [(tags[self.read_int(2)], self.read_int(2)) for _ in range(rank)] - return _pl(locals()) - - -@reg("VERSION") -def _dec_version(self, info): - return { - "major": self.read_int(4), - "minor": self.read_int(4), - "release": self.read_int(4), - "string:": _null_str(self.f.read(info["length"] - 10).decode()), - } - - -@reg("VH") -def _dec_vh(self, info): - # virtual group ("table") header - interface = self.read_int(2) - nvert = self.read_int(4) - ivsize = self.read_int(2) - nfields = self.read_int(2) - types = [self.read_int(2) for _ in range(nfields)] - isize = [self.read_int(2) for _ in range(nfields)] - offsets = [self.read_int(2) for _ in range(nfields)] - order = [self.read_int(2) for _ in range(nfields)] - names = [self.f.read(self.read_int(2)).decode() for _ in range(nfields)] - namelen = self.read_int(2) - name = self.f.read(namelen).decode() - classlen = self.read_int(2) - cls = self.f.read(classlen).decode() - ref = (self.read_int(2), self.read_int(2)) - return _pl(locals()) - - -@reg("VG") -def _dec_vg(self, info): - nelt = self.read_int(2) - tag = [tags[self.read_int(2)] for _ in range(nelt)] - refs = [self.read_int(2) for _ in range(nelt)] - name = self.f.read(self.read_int(2)).decode() - cls = self.f.read(self.read_int(2)).decode() - return _pl(locals()) - - -import zipfile - - -@reg("NT") -def _dec_nt(self, info): - version, typ, width, cls = list(self.f.read(4)) - typ = dtypes[typ] - return _pl(locals()) - - -def _null_str(s): - return s.split("\00", 1)[0] - - -def _pl(l): - return {k: v for k, v in l.items() if k not in {"info", "f", "self"}} - - -# hdf/src/htags.h -tags = { - 1: "NULL", - 20: "LINKED", - 30: "VERSION", - 40: "COMPRESSED", - 50: "VLINKED", - 51: "VLINKED_DATA", - 60: "CHUNKED", - 61: "CHUNK", - 100: "FID", - 101: "FD", - 102: "TID", - 103: "TD", - 104: "DIL", - 105: "DIA", - 106: "NT", - 107: "MT", - 108: "FREE", - 200: "ID8", - 201: "IP8", - 202: "RI8", - 203: "CI8", - 204: "II8", - 300: "ID", - 301: "LUT", - 302: "RI", - 303: "CI", - 304: "NRI", - 306: "RIG", - 307: "LD", - 308: "MD", - 309: "MA", - 310: "CCN", - 311: "CFM", - 312: "AR", - 400: "DRAW", - 401: "RUN", - 500: "XYP", - 501: "MTO", - 602: "T14", - 603: "T105", - 700: "SDG", - 701: "SDD", - 702: "SD", - 703: "SDS", - 704: "SDL", - 705: "SDU", - 706: "SDF", - 707: "SDM", - 708: "SDC", - 709: "SDT", - 710: "SDLNK", - 720: "NDG", - 721: "RESERVED", - # "Objects of tag 721 are never actually written to the file. The tag is - # needed to make things easier mixing DFSD and SD style objects in the same file" - 731: "CAL", - 732: "FV", - 799: "BREQ", - 781: "SDRAG", - 780: "EREQ", - 1965: "VG", - 1962: "VH", - 1963: "VS", - 11: "RLE", - 12: "IMCOMP", - 13: "JPEG", - 14: "GREYJPEG", - 15: "JPEG5", - 16: "GREYJPEG5", -} -spec = { - 1: "LINKED", - 2: "EXT", - 3: "COMP", - 4: "VLINKED", - 5: "CHUNKED", - 6: "BUFFERED", - 7: "COMPRAS", -} - -# hdf4/hdf/src/hntdefs.h -dtypes = { - 5: "f4", - 6: "f8", - 20: "i1", - 21: "u1", - 4: "str", # special case, size given in header - 22: ">i2", - 23: ">u2", - 24: ">i4", - 25: ">u4", - 26: ">i8", - 27: ">u8", -} - -# hdf4/hdf/src/hcomp.h -comp = { - 0: "NONE", - 1: "RLE", - 2: "NBIT", - 3: "SKPHUFF", - 4: "DEFLATE", # called deflate, but code says "gzip" and doc says "GNU zip"; actually zlib? - # see codecs.ZlibCodec - 5: "SZIP", - 7: "JPEG", -} diff --git a/kerchunk/hdf4.py b/kerchunk/hdf4.py new file mode 100644 index 00000000..95dc548f --- /dev/null +++ b/kerchunk/hdf4.py @@ -0,0 +1,508 @@ +import fsspec +import numpy as np +import ujson + + +decoders = {} + + +def reg(name): + def f(func): + decoders[name] = func + return func + + return f + + +class HDF4ToZarr: + def __init__( + self, + path, + storage_options=None, + inline_threshold=100, + out=None, + remote_protocol=None, + remote_options=None, + ): + self.path = path + self.st = storage_options + self.thresh = inline_threshold + self.out = out or {} + self.remote_protocol = remote_protocol + self.remote_options = remote_options + + def read_int(self, n): + return int.from_bytes(self.f.read(n), "big") + + def read_ddh(self): + return {"ndd": self.read_int(2), "next": self.read_int(4)} + + def read_dd(self): + loc = self.f.tell() + i = int.from_bytes(self.f.read(2), "big") + if i & 0x4000: + extended = True + i = i - 0x4000 + else: + extended = False + tag = tags.get(i, i) + no_data = tag not in {"NULL"} + ref = (tag, int.from_bytes(self.f.read(2), "big")) + info = { + "offset": int.from_bytes(self.f.read(4), "big") * no_data, + "length": int.from_bytes(self.f.read(4), "big") * no_data, + "extended": extended, + "loc": loc, + } + return ref, info + + def decode(self, tag, info): + self.f.seek(info["offset"]) + ident = lambda _, __: info + return decoders.get(tag, ident)(self, info) + + def translate(self, filename=None, storage_options=None): + """Scan and return references + + Parameters + ---------- + filename: if given, write to this as JSON + storage_options: to interpret filename + + Returns + ------- + references + """ + import zarr + from kerchunk.codecs import ZlibCodec + + self.f = fsspec.open(self.path, **(self.st or {})).open() + + # magic header + assert self.f.read(4) == b"\x0e\x03\x13\x01" + + # all the data descriptors in a linked list + self.tags = {} + while True: + ddh = self.read_ddh() + + for _ in range(ddh["ndd"]): + ident, info = self.read_dd() + self.tags[ident] = info + if ddh["next"] == 0: + # "finished" sentry + break + # or continue + self.f.seek(ddh["next"]) + + # basic decode + for tag, ref in self.tags: + self._dec(tag, ref) + + # global attributes + attrs = {} + for (tag, ref), info in self.tags.items(): + if tag == "VH" and info["names"][0].upper() == "VALUES": + # dtype = dtypes[info["types"][0]] + inf2 = self.tags[("VS", ref)] + self.f.seek(inf2["offset"]) + data = self.f.read(inf2["length"]) + # NASA conventions + if info["name"].startswith(("CoreMetadata.", "ArchiveMetadata.")): + obj = None + for line in data.decode().split("\n"): + if "OBJECT" in line: + obj = line.split()[-1] + if "VALUE" in line: + attrs[obj] = line.split()[-1].lstrip('"').rstrip('"') + + # there should be only one root, and it's probably the last VG + # so maybe this loop isn't needed + roots = set() + children = set() + child = {} + for (tag, ref), info in self.tags.items(): + if tag == "VG": + here = child.setdefault((tag, ref), set()) + for t, r in zip(info["tag"], info["refs"]): + if t == "VG": + children.add((t, r)) + roots.discard((t, r)) + here.add((t, r)) + if tag not in children: + roots.add((tag, ref)) + + # hierarchical output + output = self._descend_vg(*list(roots)[0]) + fs = fsspec.filesystem( + "reference", + fo=self.out, + remote_protocol=self.remote_protocol, + remote_options=self.remote_options, + ) + g = zarr.open_group("reference://", storage_options=dict(fs=fs)) + refs = {} + for k, v in output.items(): + if isinstance(v, dict): + compression = ZlibCodec() if "refs" in v else None + arr = g.create_dataset( + name=k, + shape=v["dims"], + dtype=v["dtype"], + chunks=v.get("chunks", v["dims"]), + compressor=compression, + overwrite=True, + ) + arr.attrs.update( + dict( + _ARRAY_DIMENSIONS=[f"{k}_x", f"{k}_y"][: len(v["dims"])] + if "refs" in v + else ["0"], + **{ + i: j + for i, j in v.items() + if i not in {"chunk", "dims", "dtype", "refs"} + }, + ) + ) + for r in v.get("refs", []): + refs[f"{k}/{r[0]}"] = [self.path, r[1], r[2]] + else: + attrs[k] = v + fs.references.update(refs) + g.attrs.update(attrs) + + if filename is None: + return fs.references + with fsspec.open(filename, **(storage_options or {})) as f: + ujson.dumps(dict(fs.references), f) + + def _descend_vg(self, tag, ref): + info = self.tags[(tag, ref)] + out = {} + for t, r in zip(info["tag"], info["refs"]): + inf2 = self.tags[(t, r)] + if t == "VG": + tmp = self._descend_vg(t, r) + if list(tmp)[0] == inf2["name"]: + tmp = tmp[inf2["name"]] + out[inf2["name"]] = tmp + elif t == "VH": + if len(inf2["names"]) == 1 and inf2["names"][0].lower() == "values": + dtype = dtypes[inf2["types"][0]] + name = inf2["name"] + inf2 = self.tags[("VS", r)] + self.f.seek(inf2["offset"]) + data = self.f.read(inf2["length"]) + if dtype == "str": + out[name] = data.decode().lstrip('"').rstrip('"') # decode() ? + else: + out[name] = np.frombuffer(data, dtype)[0] + elif t == "NT": + out["dtype"] = inf2["typ"] + elif t == "SD": + out["refs"] = inf2["data"][:-1] + out["chunks"] = [_["chunk_length"] for _ in inf2["data"][-1]] + elif t == "SDD": + out["dims"] = inf2["dims"] + else: + # NDGs contain same info as NT, SD and SDD + pass + return out + + def _dec(self, tag, ref): + info = self.tags[(tag, ref)] + if not set(info) - {"length", "offset", "extended", "loc"}: + self.f.seek(info["offset"]) + if info["extended"]: + info["data"] = self._dec_extended() + else: + info.update(self.decode(tag, info)) + return info + + def _dec_extended(self): + ext_type = spec[self.read_int(2)] + if ext_type == "CHUNKED": + return self._dec_chunked() + elif ext_type == "LINKED": + return self._dec_linked_header() + elif ext_type == "COMP": + return self._dec_comp() + + def _dec_linked_header(self): + # get the bytes of a linked set - these will always be inlined + self.read_int(4) # length + self.read_int(4) # blk_len + self.read_int(4) # num_blk + next_ref = self.read_int(2) + out = [] + while next_ref: + next_ref, data = self._dec_linked_block(self.tags[("LINKED", next_ref)]) + out.extend([d for d in data if d]) + bits = [] + for ref in out: + info = self.tags[("LINKED", ref)] + self.f.seek(info["offset"]) + bits.append(self.f.read(info["length"])) + return b"".join(bits) + + def _dec_linked_block(self, block): + self.f.seek(block["offset"]) + next_ref = self.read_int(2) + refs = [self.read_int(2) for _ in range((block["length"] // 2) - 1)] + return next_ref, refs + + def _dec_chunked(self): + # we want to turn the chunks table into references + # tag_head_len = self.read_int(4) + # version = self.f.read(1)[0] + # flag = self.read_int(4) + # elem_tot_len = self.read_int(4) + # chunk_size = self.read_int(4) + # nt_size = self.read_int(4) + self.f.seek(21, 1) + chk_tbl_tag = tags[self.read_int(2)] # should be VH + chk_tbl_ref = self.read_int(2) + self.read_int(2) # sp_tab = tags[self.read_int(2)] + self.read_int(2) # sp_ref + ndims = self.read_int(4) + + dims = [ # we don't use these, could skip + { + "flag": self.read_int(4), + "dim_length": self.read_int(4), + "chunk_length": self.read_int(4), + } + for _ in range(ndims) + ] + self.f.read( # fill_value + self.read_int(4) + ) # to be interpreted as a number later; but chunk table probs has no fill + # self.f.seek(12*ndims + 4, 1) # if skipping + + header = self._dec(chk_tbl_tag, chk_tbl_ref) + data = self._dec("VS", chk_tbl_ref)["data"] # corresponding table + + # header gives the field pattern for the rows of data, one per chunk + # maybe faster to use struct and iter than numpy, since we iterate anyway + dt = [(f"ind{i}", ">u4") for i in range(ndims)] + [ + ("tag", ">u2"), + ("ref", ">u2"), + ] + rows = np.frombuffer(data, dtype=dt, count=header["nvert"]) + # rows["tag"] should always be 61 -> CHUNK + refs = [] + for *ind, tag, ref in rows: + # maybe ind needs reversing since everything is FORTRAN + chunk_tag = self.tags[("CHUNK", ref)] + if chunk_tag["extended"]: + self.f.seek(chunk_tag["offset"]) + # these are always COMP? + ctype, offset, length = self._dec_extended() + refs.append([".".join(str(_) for _ in ind), offset, length, ctype]) + else: + refs.append( + [ + ".".join(str(_) for _ in ind), + chunk_tag["offset"], + chunk_tag["length"], + ] + ) + refs.append(dims) + return refs + + def _dec_comp(self): + # version = self.read_int(2) # always 0 + # len_uncomp = self.read_int(4) + self.f.seek(6, 1) + + data_ref = self.read_int(2) + # model = self.read_int(2) # always 0 + ctype = "DEFLATE" # comp[self.read_int(2)] + tag = self.tags[("COMPRESSED", data_ref)] + return ctype, tag["offset"], tag["length"] + + +@reg("NDG") +def _dec_ndg(self, info): + # links together these things as a Data Group + return { + "tags": [ + (tags[self.read_int(2)], self.read_int(2)) + for _ in range(0, info["length"], 4) + ] + } + + +@reg("SDD") +def _dec_sdd(self, info): + rank = self.read_int(2) + dims = [self.read_int(4) for _ in range(rank)] + data_tag = (tags[self.read_int(2)], self.read_int(2)) + scale_tags = [(tags[self.read_int(2)], self.read_int(2)) for _ in range(rank)] + return _pl(locals()) + + +@reg("VERSION") +def _dec_version(self, info): + return { + "major": self.read_int(4), + "minor": self.read_int(4), + "release": self.read_int(4), + "string:": _null_str(self.f.read(info["length"] - 10).decode()), + } + + +@reg("VH") +def _dec_vh(self, info): + # virtual group ("table") header + interface = self.read_int(2) + nvert = self.read_int(4) + ivsize = self.read_int(2) + nfields = self.read_int(2) + types = [self.read_int(2) for _ in range(nfields)] + isize = [self.read_int(2) for _ in range(nfields)] + offsets = [self.read_int(2) for _ in range(nfields)] + order = [self.read_int(2) for _ in range(nfields)] + names = [self.f.read(self.read_int(2)).decode() for _ in range(nfields)] + namelen = self.read_int(2) + name = self.f.read(namelen).decode() + classlen = self.read_int(2) + cls = self.f.read(classlen).decode() + ref = (self.read_int(2), self.read_int(2)) + return _pl(locals()) + + +@reg("VG") +def _dec_vg(self, info): + nelt = self.read_int(2) + tag = [tags[self.read_int(2)] for _ in range(nelt)] + refs = [self.read_int(2) for _ in range(nelt)] + name = self.f.read(self.read_int(2)).decode() + cls = self.f.read(self.read_int(2)).decode() + return _pl(locals()) + + +@reg("NT") +def _dec_nt(self, info): + version, typ, width, cls = list(self.f.read(4)) + typ = dtypes[typ] + return _pl(locals()) + + +def _null_str(s): + return s.split("\00", 1)[0] + + +def _pl(l): + return {k: v for k, v in l.items() if k not in {"info", "f", "self"}} + + +# hdf/src/htags.h +tags = { + 1: "NULL", + 20: "LINKED", + 30: "VERSION", + 40: "COMPRESSED", + 50: "VLINKED", + 51: "VLINKED_DATA", + 60: "CHUNKED", + 61: "CHUNK", + 100: "FID", + 101: "FD", + 102: "TID", + 103: "TD", + 104: "DIL", + 105: "DIA", + 106: "NT", + 107: "MT", + 108: "FREE", + 200: "ID8", + 201: "IP8", + 202: "RI8", + 203: "CI8", + 204: "II8", + 300: "ID", + 301: "LUT", + 302: "RI", + 303: "CI", + 304: "NRI", + 306: "RIG", + 307: "LD", + 308: "MD", + 309: "MA", + 310: "CCN", + 311: "CFM", + 312: "AR", + 400: "DRAW", + 401: "RUN", + 500: "XYP", + 501: "MTO", + 602: "T14", + 603: "T105", + 700: "SDG", + 701: "SDD", + 702: "SD", + 703: "SDS", + 704: "SDL", + 705: "SDU", + 706: "SDF", + 707: "SDM", + 708: "SDC", + 709: "SDT", + 710: "SDLNK", + 720: "NDG", + 721: "RESERVED", + # "Objects of tag 721 are never actually written to the file. The tag is + # needed to make things easier mixing DFSD and SD style objects in the same file" + 731: "CAL", + 732: "FV", + 799: "BREQ", + 781: "SDRAG", + 780: "EREQ", + 1965: "VG", + 1962: "VH", + 1963: "VS", + 11: "RLE", + 12: "IMCOMP", + 13: "JPEG", + 14: "GREYJPEG", + 15: "JPEG5", + 16: "GREYJPEG5", +} +spec = { + 1: "LINKED", + 2: "EXT", + 3: "COMP", + 4: "VLINKED", + 5: "CHUNKED", + 6: "BUFFERED", + 7: "COMPRAS", +} + +# hdf4/hdf/src/hntdefs.h +dtypes = { + 5: "f4", + 6: "f8", + 20: "i1", + 21: "u1", + 4: "str", # special case, size given in header + 22: ">i2", + 23: ">u2", + 24: ">i4", + 25: ">u4", + 26: ">i8", + 27: ">u8", +} + +# hdf4/hdf/src/hcomp.h +comp = { + 0: "NONE", + 1: "RLE", + 2: "NBIT", + 3: "SKPHUFF", + 4: "DEFLATE", # called deflate, but code says "gzip" and doc says "GNU zip"; actually zlib? + # see codecs.ZlibCodec + 5: "SZIP", + 7: "JPEG", +} From 753ad16ad423e4cd952cdd12c1faf6b073202160 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 3 Sep 2024 10:30:42 -0400 Subject: [PATCH 12/13] docstring --- kerchunk/hdf4.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/kerchunk/hdf4.py b/kerchunk/hdf4.py index 95dc548f..449e4e04 100644 --- a/kerchunk/hdf4.py +++ b/kerchunk/hdf4.py @@ -15,21 +15,19 @@ def f(func): class HDF4ToZarr: + """Experimental: interface to HDF4 archival files""" + def __init__( self, path, storage_options=None, inline_threshold=100, out=None, - remote_protocol=None, - remote_options=None, ): self.path = path self.st = storage_options self.thresh = inline_threshold self.out = out or {} - self.remote_protocol = remote_protocol - self.remote_options = remote_options def read_int(self, n): return int.from_bytes(self.f.read(n), "big") @@ -76,7 +74,8 @@ def translate(self, filename=None, storage_options=None): import zarr from kerchunk.codecs import ZlibCodec - self.f = fsspec.open(self.path, **(self.st or {})).open() + fo = fsspec.open(self.path, **(self.st or {})) + self.f = fo.open() # magic header assert self.f.read(4) == b"\x0e\x03\x13\x01" @@ -134,11 +133,13 @@ def translate(self, filename=None, storage_options=None): # hierarchical output output = self._descend_vg(*list(roots)[0]) + prot = fo.fs.protocol + prot = prot[0] if isinstance(prot, tuple) else prot fs = fsspec.filesystem( "reference", fo=self.out, - remote_protocol=self.remote_protocol, - remote_options=self.remote_options, + remote_protocol=prot, + remote_options=self.st, ) g = zarr.open_group("reference://", storage_options=dict(fs=fs)) refs = {} From 3d848779398fc6e20ca438141e446f6212d873cc Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 3 Sep 2024 10:33:48 -0400 Subject: [PATCH 13/13] plumbing --- docs/source/reference.rst | 7 +++++++ kerchunk/codecs.py | 2 +- pyproject.toml | 1 + 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/docs/source/reference.rst b/docs/source/reference.rst index ffe7c328..98e4cfb9 100644 --- a/docs/source/reference.rst +++ b/docs/source/reference.rst @@ -10,6 +10,7 @@ File format backends kerchunk.fits.process_file kerchunk.tiff.tiff_to_zarr kerchunk.netCDF3.NetCDF3ToZarr + kerchunk.hdf4.HDF4ToZarr .. autoclass:: kerchunk.hdf.SingleHdf5ToZarr :members: @@ -24,6 +25,9 @@ File format backends .. autoclass:: kerchunk.netCDF3.NetCDF3ToZarr :members: __init__, translate +.. autoclass:: kerchunk.hdf4.HDF4ToZarr + :members: __init__, translate + Codecs ------ @@ -50,6 +54,9 @@ Codecs .. autoclass:: kerchunk.codecs.RecordArrayMember :members: __init__ +.. autoclass:: kerchunk.codecs.ZlibCodec + :members: __init__ + Combining --------- diff --git a/kerchunk/codecs.py b/kerchunk/codecs.py index 10b71888..852076ea 100644 --- a/kerchunk/codecs.py +++ b/kerchunk/codecs.py @@ -241,7 +241,7 @@ def encode(self, buf): raise NotImplementedError -class ZlibCodec(numcodecs.abc.Codec): +class ZlibCodec(Codec): codec_id = "zlib" def __init__(self): diff --git a/pyproject.toml b/pyproject.toml index c11e3401..5eb7c0c9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,6 +61,7 @@ fill_hdf_strings = "kerchunk.codecs:FillStringsCodec" FITSAscii = "kerchunk.codecs:AsciiTableCodec" FITSVarBintable = "kerchunk.codecs:VarArrCodec" record_member = "kerchunk.codecs:RecordArrayMember" +zlib = "kerchunk.codecs:ZlibCodec" [project.entry-points."xarray.backends"] kerchunk = "kerchunk.xarray_backend:KerchunkBackend"