Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: store learns to delete prefixes when overwriting/creating hierarchy nodes #2430

Merged
merged 19 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions src/zarr/abc/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,8 @@ def list(self) -> AsyncGenerator[str, None]:
@abstractmethod
def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
"""
Retrieve all keys in the store that begin with a given prefix. Keys are returned with the
common leading prefix removed.
Retrieve all keys in the store that begin with a given prefix. Keys are returned relative
to the root of the store.

Parameters
----------
Expand Down Expand Up @@ -371,6 +371,20 @@ def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
"""
...

async def delete_dir(self, prefix: str) -> None:
"""
Remove all keys and prefixes in the store that begin with a given prefix.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make life much easier for Store implementers if we declare that prefix will always be the path of a group or a array. Otherwise, all implementations need to deal with unnecessary edge cases like delete_dir("path/to/array/c/0") or worse.

Icechunk is an example of a store that could very cleanly and efficiently implement the case for a group/array prefix, but it would struggle to handle other prefixes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use case for recursive = False ? If zarr itself doesn't have one, I'd remove the argument.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see a use case for delete_dir("path/to/array/c/") (remove all chunks from an array) so I think we should allow that -- even if it makes life a little harder in icechunk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's totally fine. Let's document the three possible prefix cases, and say that stores are allowed to do undefined behavior for other prefixes

"""
if not self.supports_deletes:
raise NotImplementedError
if not self.supports_listing:
raise NotImplementedError
self._check_writable()
if not prefix.endswith("/"):
prefix += "/"
async for key in self.list_prefix(prefix):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we document what's the expected return value of list_prefix? Absolute vs. relative, final "/" or not, etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is done in the list_prefix docstring

await self.delete(key)

def close(self) -> None:
"""Close the store."""
self._is_open = False
Expand Down
14 changes: 12 additions & 2 deletions src/zarr/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,12 @@ async def _create_v3(
attributes: dict[str, JSON] | None = None,
exists_ok: bool = False,
) -> AsyncArray[ArrayV3Metadata]:
if not exists_ok:
if exists_ok:
if store_path.store.supports_deletes:
await store_path.delete_dir()
else:
await ensure_no_existing_node(store_path, zarr_format=3)
else:
await ensure_no_existing_node(store_path, zarr_format=3)

shape = parse_shapelike(shape)
Expand Down Expand Up @@ -605,7 +610,12 @@ async def _create_v2(
attributes: dict[str, JSON] | None = None,
exists_ok: bool = False,
) -> AsyncArray[ArrayV2Metadata]:
if not exists_ok:
if exists_ok:
if store_path.store.supports_deletes:
await store_path.delete_dir()
else:
await ensure_no_existing_node(store_path, zarr_format=2)
else:
await ensure_no_existing_node(store_path, zarr_format=2)

if order is None:
Expand Down
21 changes: 8 additions & 13 deletions src/zarr/core/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,13 @@ async def from_store(
zarr_format: ZarrFormat = 3,
) -> AsyncGroup:
store_path = await make_store_path(store)
if not exists_ok:

if exists_ok:
if store_path.store.supports_deletes:
await store_path.delete_dir()
else:
await ensure_no_existing_node(store_path, zarr_format=zarr_format)
else:
await ensure_no_existing_node(store_path, zarr_format=zarr_format)
attributes = attributes or {}
group = cls(
Expand Down Expand Up @@ -727,19 +733,8 @@ def _getitem_consolidated(

async def delitem(self, key: str) -> None:
store_path = self.store_path / key
if self.metadata.zarr_format == 3:
await (store_path / ZARR_JSON).delete()

elif self.metadata.zarr_format == 2:
await asyncio.gather(
(store_path / ZGROUP_JSON).delete(), # TODO: missing_ok=False
(store_path / ZARRAY_JSON).delete(), # TODO: missing_ok=False
(store_path / ZATTRS_JSON).delete(), # TODO: missing_ok=True
)

else:
raise ValueError(f"unexpected zarr_format: {self.metadata.zarr_format}")

await store_path.delete_dir()
if self.metadata.consolidated_metadata:
self.metadata.consolidated_metadata.metadata.pop(key, None)
await self._save_metadata()
Expand Down
9 changes: 9 additions & 0 deletions src/zarr/storage/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ async def delete(self) -> None:
"""
await self.store.delete(self.path)

async def delete_dir(self) -> None:
"""
Delete all keys with the given prefix from the store.
"""
path = self.path
if not path.endswith("/"):
path += "/"
await self.store.delete_dir(path)

async def set_if_not_exists(self, default: Buffer) -> None:
"""
Store a key to ``value`` if the key is not already present.
Expand Down
5 changes: 2 additions & 3 deletions src/zarr/storage/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,8 @@ async def list(self) -> AsyncGenerator[str, None]:

async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
# docstring inherited
to_strip = (
(self.root / prefix).as_posix() + "/"
) # TODO: fixme in https://github.com/zarr-developers/zarr-python/issues/2438
to_strip = self.root.as_posix() + "/"
prefix = prefix.rstrip("/")
for p in (self.root / prefix).rglob("*"):
if p.is_file():
yield p.as_posix().replace(to_strip, "")
Expand Down
5 changes: 5 additions & 0 deletions src/zarr/storage/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
async for key in self._store.list_dir(prefix=prefix):
yield key

async def delete_dir(self, prefix: str) -> None:
# docstring inherited
with self.log(prefix):
await self._store.delete_dir(prefix=prefix)

def with_mode(self, mode: AccessModeLiteral) -> Self:
# docstring inherited
with self.log(mode):
Expand Down
11 changes: 8 additions & 3 deletions src/zarr/storage/memory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from logging import getLogger
from typing import TYPE_CHECKING, Self

from zarr.abc.store import ByteRangeRequest, Store
Expand All @@ -14,6 +15,9 @@
from zarr.core.common import AccessModeLiteral


logger = getLogger(__name__)


class MemoryStore(Store):
"""
In-memory store for testing purposes.
Expand Down Expand Up @@ -137,7 +141,7 @@ async def delete(self, key: str) -> None:
try:
del self._store_dict[key]
except KeyError:
pass
logger.debug("Key %s does not exist.", key)

async def set_partial_values(self, key_start_values: Iterable[tuple[str, int, bytes]]) -> None:
# docstring inherited
Expand All @@ -150,9 +154,10 @@ async def list(self) -> AsyncGenerator[str, None]:

async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
# docstring inherited
for key in self._store_dict:
# note: we materialize all dict keys into a list here so we can mutate the dict in-place (e.g. in delete_prefix)
for key in list(self._store_dict):
if key.startswith(prefix):
yield key.removeprefix(prefix)
yield key

async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
# docstring inherited
Expand Down
7 changes: 4 additions & 3 deletions src/zarr/storage/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:

async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
# docstring inherited
find_str = f"{self.path}/{prefix}"
for onefile in await self.fs._find(find_str, detail=False, maxdepth=None, withdirs=False):
yield onefile.removeprefix(find_str)
for onefile in await self.fs._find(
f"{self.path}/{prefix}", detail=False, maxdepth=None, withdirs=False
):
yield onefile.removeprefix(f"{self.path}/")
2 changes: 1 addition & 1 deletion src/zarr/storage/zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
# docstring inherited
async for key in self.list():
if key.startswith(prefix):
yield key.removeprefix(prefix)
yield key

async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
# docstring inherited
Expand Down
20 changes: 17 additions & 3 deletions src/zarr/testing/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,26 @@ async def test_exists(self, store: S) -> None:
assert await store.exists("foo/zarr.json")

async def test_delete(self, store: S) -> None:
if not store.supports_deletes:
pytest.skip("store does not support deletes")
await store.set("foo/zarr.json", self.buffer_cls.from_bytes(b"bar"))
assert await store.exists("foo/zarr.json")
await store.delete("foo/zarr.json")
assert not await store.exists("foo/zarr.json")

async def test_delete_dir(self, store: S) -> None:
if not store.supports_deletes:
pytest.skip("store does not support deletes")
await store.set("zarr.json", self.buffer_cls.from_bytes(b"root"))
await store.set("foo-bar/zarr.json", self.buffer_cls.from_bytes(b"root"))
await store.set("foo/zarr.json", self.buffer_cls.from_bytes(b"bar"))
await store.set("foo/c/0", self.buffer_cls.from_bytes(b"chunk"))
await store.delete_dir("foo")
assert await store.exists("zarr.json")
assert await store.exists("foo-bar/zarr.json")
assert not await store.exists("foo/zarr.json")
assert not await store.exists("foo/c/0")

async def test_empty(self, store: S) -> None:
assert await store.empty()
await self.set(
Expand Down Expand Up @@ -249,8 +264,7 @@ async def test_list(self, store: S) -> None:
async def test_list_prefix(self, store: S) -> None:
"""
Test that the `list_prefix` method works as intended. Given a prefix, it should return
all the keys in storage that start with this prefix. Keys should be returned with the shared
prefix removed.
all the keys in storage that start with this prefix.
"""
prefixes = ("", "a/", "a/b/", "a/b/c/")
data = self.buffer_cls.from_bytes(b"")
Expand All @@ -264,7 +278,7 @@ async def test_list_prefix(self, store: S) -> None:
expected: tuple[str, ...] = ()
for key in store_dict:
if key.startswith(prefix):
expected += (key.removeprefix(prefix),)
expected += (key,)
expected = tuple(sorted(expected))
assert observed == expected

Expand Down
2 changes: 2 additions & 0 deletions tests/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def test_array_creation_existing_node(
new_dtype = "float32"

if exists_ok:
if not store.supports_deletes:
pytest.skip("store does not support deletes")
arr_new = Array.create(
spath / "extant",
shape=new_shape,
Expand Down
23 changes: 22 additions & 1 deletion tests/test_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ def test_group_open(store: Store, zarr_format: ZarrFormat, exists_ok: bool) -> N
with pytest.raises(ContainsGroupError):
Group.from_store(store, attributes=attrs, zarr_format=zarr_format, exists_ok=exists_ok)
else:
if not store.supports_deletes:
pytest.skip(
"Store does not support deletes but `exists_ok` is True, requiring deletes to override a group"
)
group_created_again = Group.from_store(
store, attributes=new_attrs, zarr_format=zarr_format, exists_ok=exists_ok
)
Expand Down Expand Up @@ -720,6 +724,8 @@ def test_group_creation_existing_node(
new_attributes = {"new": True}

if exists_ok:
if not store.supports_deletes:
pytest.skip("store does not support deletes but exists_ok is True")
node_new = Group.from_store(
spath / "extant",
attributes=new_attributes,
Expand Down Expand Up @@ -1092,7 +1098,9 @@ async def test_require_group(store: LocalStore | MemoryStore, zarr_format: ZarrF
assert foo_group.attrs == {"foo": 100}

# test that we can get the group using require_group and overwrite=True
foo_group = await root.require_group("foo", overwrite=True)
if store.supports_deletes:
foo_group = await root.require_group("foo", overwrite=True)
assert foo_group.attrs == {}

_ = await foo_group.create_array(
"bar", shape=(10,), dtype="uint8", chunk_shape=(2,), attributes={"foo": 100}
Expand Down Expand Up @@ -1371,3 +1379,16 @@ def test_group_deprecated_positional_args(method: str) -> None:
with pytest.warns(FutureWarning, match=r"Pass name=.*, data=.* as keyword args."):
arr = getattr(root, method)("foo_like", data, **kwargs)
assert arr.shape == data.shape


@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"])
def test_delitem_removes_children(store: Store, zarr_format: ZarrFormat) -> None:
# https://github.com/zarr-developers/zarr-python/issues/2191
g1 = zarr.group(store=store, zarr_format=zarr_format)
g1.create_group("0")
g1.create_group("0/0")
arr = g1.create_array("0/0/0", shape=(1,))
arr[:] = 1
del g1["0"]
with pytest.raises(KeyError):
g1["0/0"]
7 changes: 6 additions & 1 deletion tests/test_store/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@ async def test_logging_store_counter(store: Store) -> None:
arr[:] = 1

assert wrapped.counter["set"] == 2
assert wrapped.counter["get"] == 0 # 1 if overwrite=False
assert wrapped.counter["list"] == 0
assert wrapped.counter["list_dir"] == 0
assert wrapped.counter["list_prefix"] == 0
if store.supports_deletes:
assert wrapped.counter["get"] == 0 # 1 if overwrite=False
assert wrapped.counter["delete_dir"] == 1
else:
assert wrapped.counter["get"] == 1
assert wrapped.counter["delete_dir"] == 0


async def test_with_mode():
Expand Down
6 changes: 1 addition & 5 deletions tests/test_store/test_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from zarr.testing.store import StoreTests

if TYPE_CHECKING:
from collections.abc import Coroutine
from typing import Any


Expand Down Expand Up @@ -65,9 +64,6 @@ def test_store_supports_partial_writes(self, store: ZipStore) -> None:
def test_store_supports_listing(self, store: ZipStore) -> None:
assert store.supports_listing

def test_delete(self, store: ZipStore) -> Coroutine[Any, Any, None]:
pass

def test_api_integration(self, store: ZipStore) -> None:
root = zarr.open_group(store=store)

Expand Down Expand Up @@ -103,4 +99,4 @@ async def test_with_mode(self, store: ZipStore) -> None:

@pytest.mark.parametrize("mode", ["a", "w"])
async def test_store_open_mode(self, store_kwargs: dict[str, Any], mode: str) -> None:
super().test_store_open_mode(store_kwargs, mode)
await super().test_store_open_mode(store_kwargs, mode)