diff --git a/fsspec/implementations/reference.py b/fsspec/implementations/reference.py index 3b6752ee1..364bae028 100644 --- a/fsspec/implementations/reference.py +++ b/fsspec/implementations/reference.py @@ -106,6 +106,12 @@ def __init__( self, root, fs=None, out_root=None, cache_size=128, categorical_threshold=10 ): """ + + This instance will be writable, storing changes in memory until full partitions + are accumulated or .flush() is called. + + To create an empty lazy store, use .create() + Parameters ---------- root : str @@ -119,13 +125,12 @@ def __init__( Encode urls as pandas.Categorical to reduce memory footprint if the ratio of the number of unique urls to total number of refs for each variable is greater than or equal to this number. (default 10) - - """ self.root = root self.chunk_sizes = {} self._items = {} self.dirs = None + # TODO: derive fs from `root` self.fs = fsspec.filesystem("file") if fs is None else fs self._items[".zmetadata"] = self.fs.cat_file( "/".join([self.root, ".zmetadata"]) @@ -153,6 +158,8 @@ def open_refs(field, record): def create(root, storage_options=None, fs=None, record_size=10000, **kwargs): """Make empty parquet reference set + First deletes the contents of the given directory, if it exists. + Parameters ---------- root: str @@ -172,12 +179,15 @@ def create(root, storage_options=None, fs=None, record_size=10000, **kwargs): met = {"metadata": {}, "record_size": record_size} if fs is None: fs, root = fsspec.core.url_to_fs(root, **(storage_options or {})) + if fs.exists(root): + fs.rm(root, recursive=True) fs.makedirs(root, exist_ok=True) fs.pipe("/".join([root, ".zmetadata"]), json.dumps(met).encode()) return LazyReferenceMapper(root, fs, **kwargs) def listdir(self, basename=True): """List top-level directories""" + # cache me? if self.dirs is None: dirs = [p.split("/", 1)[0] for p in self.zmetadata] self.dirs = {p for p in dirs if p and not p.startswith(".")} @@ -258,19 +268,18 @@ def _load_one_key(self, key): elif "/" not in key or self._is_meta(key): raise KeyError(key) field, sub_key = key.split("/") - record, _, _ = self._key_to_record(key) - maybe = self._items.get((field, key), {}).get(sub_key, False) + record, ri, chunk_size = self._key_to_record(key) + maybe = self._items.get((field, record), {}).get(ri, False) if maybe is None: # explicitly deleted raise KeyError elif maybe: return maybe + elif chunk_size == 0: + return b"" # Chunk keys can be loaded from row group and cached in LRU cache try: - record, ri, chunk_size = self._key_to_record(key) - if chunk_size == 0: - return b"" refs = self.open_refs(field, record) except (ValueError, TypeError, FileNotFoundError): raise KeyError(key) @@ -280,7 +289,7 @@ def _load_one_key(self, key): if raw is not None: return raw if selection[0] is None: - raise KeyError("This reference has been deleted") + raise KeyError("This reference does not exist or has been deleted") if selection[1:3] == [0, 0]: # URL only return selection[:1] @@ -342,7 +351,6 @@ def items(self): def __hash__(self): return id(self) - @lru_cache(20) def __getitem__(self, key): return self._load_one_key(key) @@ -373,9 +381,9 @@ def __delitem__(self, key): else: if "/" in key and not self._is_meta(key): field, chunk = key.split("/") - record, _, _ = self._key_to_record(key) + record, i, _ = self._key_to_record(key) subdict = self._items.setdefault((field, record), {}) - subdict[chunk] = None + subdict[i] = None if len(subdict) == self.record_size: self.write(field, record) else: @@ -388,26 +396,38 @@ def write(self, field, record, base_url=None, storage_options=None): import numpy as np import pandas as pd - # TODO: if the dict is incomplete, also load records and merge in partition = self._items[(field, record)] - fn = f"{base_url or self.out_root}/{field}/refs.{record}.parq" + original = False + if len(partition) < self.record_size: + try: + original = self.open_refs(field, record) + except IOError: + pass - #### - paths = np.full(self.record_size, np.nan, dtype="O") - offsets = np.zeros(self.record_size, dtype="int64") - sizes = np.zeros(self.record_size, dtype="int64") - raws = np.full(self.record_size, np.nan, dtype="O") - nraw = 0 - npath = 0 + if original: + paths = original["path"] + offsets = original["offset"] + sizes = original["size"] + raws = original["raw"] + else: + paths = np.full(self.record_size, np.nan, dtype="O") + offsets = np.zeros(self.record_size, dtype="int64") + sizes = np.zeros(self.record_size, dtype="int64") + raws = np.full(self.record_size, np.nan, dtype="O") for j, data in partition.items(): if isinstance(data, list): - npath += 1 paths[j] = data[0] if len(data) > 1: offsets[j] = data[1] sizes[j] = data[2] + elif data is None: + # delete + paths[j] = None + offsets[j] = 0 + sizes[j] = 0 + raws[j] = None else: - nraw += 1 + # this is the only call into kerchunk, could remove raws[j] = kerchunk.df._proc_raw(data) # TODO: only save needed columns df = pd.DataFrame( @@ -424,6 +444,7 @@ def write(self, field, record, base_url=None, storage_options=None): object_encoding = {"raw": "bytes", "path": "utf8"} has_nulls = ["path", "raw"] + fn = f"{base_url or self.out_root}/{field}/refs.{record}.parq" self.fs.mkdirs(f"{base_url or self.out_root}/{field}", exist_ok=True) df.to_parquet( fn, @@ -474,29 +495,30 @@ def flush(self, base_url=None, storage_options=None): self.open_refs.cache_clear() def __len__(self): - # Caveat: This counts expected references, not actual + # Caveat: This counts expected references, not actual - but is fast count = 0 for field in self.listdir(): if field.startswith("."): count += 1 else: - chunk_sizes = self._get_chunk_sizes(field) - nchunks = self.np.product(chunk_sizes) - count += nchunks + count += math.prod(self._get_chunk_sizes(field)) count += len(self.zmetadata) # all metadata keys - count += len(self._items) # the metadata file itself + # any other files not in reference partitions + count += sum(1 for _ in self._items if not isinstance(_, tuple)) return count def __iter__(self): - # Caveat: Note that this generates all expected keys, but does not - # account for reference keys that are missing. + # Caveat: returns only existing keys, so the number of these does not + # match len(self) metas = set(self.zmetadata) metas.update(self._items) for bit in metas: if isinstance(bit, str): yield bit for field in self.listdir(): - yield from self._keys_in_field(field) + for k in self._keys_in_field(field): + if k in self: + yield k def __contains__(self, item): try: diff --git a/fsspec/implementations/tests/test_reference.py b/fsspec/implementations/tests/test_reference.py index 1b92558fa..19bc0cfa4 100644 --- a/fsspec/implementations/tests/test_reference.py +++ b/fsspec/implementations/tests/test_reference.py @@ -5,7 +5,11 @@ import fsspec from fsspec.implementations.local import LocalFileSystem -from fsspec.implementations.reference import ReferenceFileSystem, ReferenceNotReachable +from fsspec.implementations.reference import ( + LazyReferenceMapper, + ReferenceFileSystem, + ReferenceNotReachable, +) from fsspec.tests.conftest import data, realfile, reset_files, server, win # noqa: F401 @@ -674,3 +678,42 @@ def test_cached(m, tmpdir): skip_instance_cache=True, ) assert fs.cat("a") == b"A" + + +@pytest.fixture() +def lazy_refs(m): + zarr = pytest.importorskip("zarr") + l = LazyReferenceMapper.create("memory://refs", fs=m) + g = zarr.open(l, mode="w") + g.create_dataset(name="data", shape=(100,), chunks=(10,), dtype="int64") + return l + + +def test_append_parquet(lazy_refs, m): + with pytest.raises(KeyError): + lazy_refs["data/0"] + lazy_refs["data/0"] = b"data" + assert lazy_refs["data/0"] == b"data" + lazy_refs.flush() + + lazy2 = LazyReferenceMapper("memory://refs", fs=m) + assert lazy2["data/0"] == b"data" + with pytest.raises(KeyError): + lazy_refs["data/1"] + lazy2["data/1"] = b"Bdata" + assert lazy2["data/1"] == b"Bdata" + lazy2.flush() + + lazy2 = LazyReferenceMapper("memory://refs", fs=m) + assert lazy2["data/0"] == b"data" + assert lazy2["data/1"] == b"Bdata" + lazy2["data/1"] = b"Adata" + del lazy2["data/0"] + assert lazy2["data/1"] == b"Adata" + assert "data/0" not in lazy2 + lazy2.flush() + + lazy2 = LazyReferenceMapper("memory://refs", fs=m) + with pytest.raises(KeyError): + lazy2["data/0"] + assert lazy2["data/1"] == b"Adata"