Skip to content

Commit

Permalink
Make LazyReferences editable from existing storage
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant committed Dec 13, 2023
1 parent dd8cb9b commit 52f2ad5
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 31 deletions.
82 changes: 52 additions & 30 deletions fsspec/implementations/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ def __init__(
self, root, fs=None, out_root=None, cache_size=128, categorical_threshold=10
):
"""
This instance will be writable, storing changes in memory until full partitions
are accumulated or .flush() is called.
To create an empty lazy store, use .create()
Parameters
----------
root : str
Expand All @@ -119,13 +125,12 @@ def __init__(
Encode urls as pandas.Categorical to reduce memory footprint if the ratio
of the number of unique urls to total number of refs for each variable
is greater than or equal to this number. (default 10)
"""
self.root = root
self.chunk_sizes = {}
self._items = {}
self.dirs = None
# TODO: derive fs from `root`
self.fs = fsspec.filesystem("file") if fs is None else fs
self._items[".zmetadata"] = self.fs.cat_file(
"/".join([self.root, ".zmetadata"])
Expand Down Expand Up @@ -153,6 +158,8 @@ def open_refs(field, record):
def create(root, storage_options=None, fs=None, record_size=10000, **kwargs):
"""Make empty parquet reference set
First deletes the contents of the given directory, if it exists.
Parameters
----------
root: str
Expand All @@ -172,12 +179,15 @@ def create(root, storage_options=None, fs=None, record_size=10000, **kwargs):
met = {"metadata": {}, "record_size": record_size}
if fs is None:
fs, root = fsspec.core.url_to_fs(root, **(storage_options or {}))
if fs.exists(root):
fs.rm(root, recursive=True)
fs.makedirs(root, exist_ok=True)
fs.pipe("/".join([root, ".zmetadata"]), json.dumps(met).encode())
return LazyReferenceMapper(root, fs, **kwargs)

def listdir(self, basename=True):
"""List top-level directories"""
# cache me?
if self.dirs is None:
dirs = [p.split("/", 1)[0] for p in self.zmetadata]
self.dirs = {p for p in dirs if p and not p.startswith(".")}
Expand Down Expand Up @@ -258,19 +268,18 @@ def _load_one_key(self, key):
elif "/" not in key or self._is_meta(key):
raise KeyError(key)
field, sub_key = key.split("/")
record, _, _ = self._key_to_record(key)
maybe = self._items.get((field, key), {}).get(sub_key, False)
record, ri, chunk_size = self._key_to_record(key)
maybe = self._items.get((field, record), {}).get(ri, False)
if maybe is None:
# explicitly deleted
raise KeyError
elif maybe:
return maybe
elif chunk_size == 0:
return b""

# Chunk keys can be loaded from row group and cached in LRU cache
try:
record, ri, chunk_size = self._key_to_record(key)
if chunk_size == 0:
return b""
refs = self.open_refs(field, record)
except (ValueError, TypeError, FileNotFoundError):
raise KeyError(key)
Expand All @@ -280,7 +289,7 @@ def _load_one_key(self, key):
if raw is not None:
return raw
if selection[0] is None:
raise KeyError("This reference has been deleted")
raise KeyError("This reference does not exist or has been deleted")
if selection[1:3] == [0, 0]:
# URL only
return selection[:1]
Expand Down Expand Up @@ -342,7 +351,6 @@ def items(self):
def __hash__(self):
return id(self)

@lru_cache(20)
def __getitem__(self, key):
return self._load_one_key(key)

Expand Down Expand Up @@ -373,9 +381,9 @@ def __delitem__(self, key):
else:
if "/" in key and not self._is_meta(key):
field, chunk = key.split("/")
record, _, _ = self._key_to_record(key)
record, i, _ = self._key_to_record(key)
subdict = self._items.setdefault((field, record), {})
subdict[chunk] = None
subdict[i] = None
if len(subdict) == self.record_size:
self.write(field, record)
else:
Expand All @@ -388,26 +396,38 @@ def write(self, field, record, base_url=None, storage_options=None):
import numpy as np
import pandas as pd

# TODO: if the dict is incomplete, also load records and merge in
partition = self._items[(field, record)]
fn = f"{base_url or self.out_root}/{field}/refs.{record}.parq"
original = False
if len(partition) < self.record_size:
try:
original = self.open_refs(field, record)
except IOError:
pass

####
paths = np.full(self.record_size, np.nan, dtype="O")
offsets = np.zeros(self.record_size, dtype="int64")
sizes = np.zeros(self.record_size, dtype="int64")
raws = np.full(self.record_size, np.nan, dtype="O")
nraw = 0
npath = 0
if original:
paths = original["path"]
offsets = original["offset"]
sizes = original["size"]
raws = original["raw"]
else:
paths = np.full(self.record_size, np.nan, dtype="O")
offsets = np.zeros(self.record_size, dtype="int64")
sizes = np.zeros(self.record_size, dtype="int64")
raws = np.full(self.record_size, np.nan, dtype="O")
for j, data in partition.items():
if isinstance(data, list):
npath += 1
paths[j] = data[0]
if len(data) > 1:
offsets[j] = data[1]
sizes[j] = data[2]
elif data is None:
# delete
paths[j] = None
offsets[j] = 0
sizes[j] = 0
raws[j] = None
else:
nraw += 1
# this is the only call into kerchunk, could remove
raws[j] = kerchunk.df._proc_raw(data)
# TODO: only save needed columns
df = pd.DataFrame(
Expand All @@ -424,6 +444,7 @@ def write(self, field, record, base_url=None, storage_options=None):
object_encoding = {"raw": "bytes", "path": "utf8"}
has_nulls = ["path", "raw"]

fn = f"{base_url or self.out_root}/{field}/refs.{record}.parq"
self.fs.mkdirs(f"{base_url or self.out_root}/{field}", exist_ok=True)
df.to_parquet(
fn,
Expand Down Expand Up @@ -474,29 +495,30 @@ def flush(self, base_url=None, storage_options=None):
self.open_refs.cache_clear()

def __len__(self):
# Caveat: This counts expected references, not actual
# Caveat: This counts expected references, not actual - but is fast
count = 0
for field in self.listdir():
if field.startswith("."):
count += 1
else:
chunk_sizes = self._get_chunk_sizes(field)
nchunks = self.np.product(chunk_sizes)
count += nchunks
count += math.prod(self._get_chunk_sizes(field))
count += len(self.zmetadata) # all metadata keys
count += len(self._items) # the metadata file itself
# any other files not in reference partitions
count += sum(1 for _ in self._items if not isinstance(_, tuple))
return count

def __iter__(self):
# Caveat: Note that this generates all expected keys, but does not
# account for reference keys that are missing.
# Caveat: returns only existing keys, so the number of these does not
# match len(self)
metas = set(self.zmetadata)
metas.update(self._items)
for bit in metas:
if isinstance(bit, str):
yield bit
for field in self.listdir():
yield from self._keys_in_field(field)
for k in self._keys_in_field(field):
if k in self:
yield k

def __contains__(self, item):
try:
Expand Down
45 changes: 44 additions & 1 deletion fsspec/implementations/tests/test_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@

import fsspec
from fsspec.implementations.local import LocalFileSystem
from fsspec.implementations.reference import ReferenceFileSystem, ReferenceNotReachable
from fsspec.implementations.reference import (
LazyReferenceMapper,
ReferenceFileSystem,
ReferenceNotReachable,
)
from fsspec.tests.conftest import data, realfile, reset_files, server, win # noqa: F401


Expand Down Expand Up @@ -674,3 +678,42 @@ def test_cached(m, tmpdir):
skip_instance_cache=True,
)
assert fs.cat("a") == b"A"


@pytest.fixture()
def lazy_refs(m):
zarr = pytest.importorskip("zarr")
l = LazyReferenceMapper.create("memory://refs", fs=m)
g = zarr.open(l, mode="w")
g.create_dataset(name="data", shape=(100,), chunks=(10,), dtype="int64")
return l


def test_append_parquet(lazy_refs, m):
with pytest.raises(KeyError):
lazy_refs["data/0"]
lazy_refs["data/0"] = b"data"
assert lazy_refs["data/0"] == b"data"
lazy_refs.flush()

lazy2 = LazyReferenceMapper("memory://refs", fs=m)
assert lazy2["data/0"] == b"data"
with pytest.raises(KeyError):
lazy_refs["data/1"]
lazy2["data/1"] = b"Bdata"
assert lazy2["data/1"] == b"Bdata"
lazy2.flush()

lazy2 = LazyReferenceMapper("memory://refs", fs=m)
assert lazy2["data/0"] == b"data"
assert lazy2["data/1"] == b"Bdata"
lazy2["data/1"] = b"Adata"
del lazy2["data/0"]
assert lazy2["data/1"] == b"Adata"
assert "data/0" not in lazy2
lazy2.flush()

lazy2 = LazyReferenceMapper("memory://refs", fs=m)
with pytest.raises(KeyError):
lazy2["data/0"]
assert lazy2["data/1"] == b"Adata"

0 comments on commit 52f2ad5

Please sign in to comment.