Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add array storage helpers #2065

Merged
merged 31 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ebbfbe0
implement store.list_prefix and store._set_dict
d-v-b Aug 3, 2024
da6083e
simplify string handling
d-v-b Aug 3, 2024
dc5fe47
add nchunks_initialized, and necessary additions for it
d-v-b Aug 2, 2024
b694b6e
rename _iter_chunks to _iter_chunk_coords
d-v-b Aug 2, 2024
6a27ca8
fix test name
d-v-b Aug 3, 2024
d15be9a
bring in correct store list_dir implementations
d-v-b Aug 3, 2024
ef34f25
Merge branch 'v3' of https://github.com/zarr-developers/zarr-python i…
d-v-b Aug 12, 2024
962ffed
bump numcodecs to dodge zstd exception
d-v-b Aug 12, 2024
5c98ab4
remove store._set_dict, and add _set_many and get_many instead
d-v-b Aug 12, 2024
9e64fa8
update deprecation warning template
d-v-b Aug 13, 2024
a4b4696
add a type annotation
d-v-b Aug 13, 2024
04b1d6a
refactor chunk iterators. they are not properties any more, just meth…
d-v-b Aug 13, 2024
12b3bc1
Merge branch 'v3' of https://github.com/zarr-developers/zarr-python i…
d-v-b Aug 13, 2024
3e2c656
Merge branch 'v3' of github.com:zarr-developers/zarr-python into add-…
d-v-b Sep 19, 2024
b7c1a56
_get_many returns tuple[str, buffer]
d-v-b Sep 19, 2024
44bed5c
stricter store types
d-v-b Sep 19, 2024
021d41e
Merge branch 'v3' of github.com:zarr-developers/zarr-python into add-…
d-v-b Sep 23, 2024
2db860b
fix types
d-v-b Sep 23, 2024
45f27b1
Merge branch 'v3' of github.com:zarr-developers/zarr-python into add-…
d-v-b Sep 23, 2024
78f22b9
Merge branch 'v3' of github.com:zarr-developers/zarr-python into add-…
d-v-b Sep 24, 2024
b5e08e8
lint
d-v-b Sep 24, 2024
43743e1
remove deprecation warnings
d-v-b Sep 24, 2024
f65a6e8
fix zip list_prefix
d-v-b Sep 24, 2024
df6f9a7
tests for nchunks_initialized, chunks_initialized; add selection_shap…
d-v-b Sep 24, 2024
e60cbe0
add nchunks test
d-v-b Sep 24, 2024
5c54449
fix docstrings
d-v-b Sep 24, 2024
e8598c6
fix docstring
d-v-b Sep 24, 2024
ae216e1
Merge branch 'v3' into add-array-storage-helpers
d-v-b Sep 24, 2024
768ab43
revert unnecessary changes to project config
d-v-b Sep 24, 2024
c953f21
Merge branch 'add-array-storage-helpers' of github.com:d-v-b/zarr-pyt…
d-v-b Sep 24, 2024
f0d61b2
Merge branch 'v3' of https://github.com/zarr-developers/zarr-python i…
d-v-b Sep 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ dependencies = [
'asciitree',
'numpy>=1.24',
'fasteners',
'numcodecs>=0.10.0',
'numcodecs>=0.13.0',
'fsspec>2024',
'crc32c',
'typing_extensions',
Expand Down Expand Up @@ -273,6 +273,7 @@ filterwarnings = [
"error:::zarr.*",
"ignore:PY_SSIZE_T_CLEAN will be required.*:DeprecationWarning",
"ignore:The loop argument is deprecated since Python 3.8.*:DeprecationWarning",
"ignore:.*may be removed in an early zarr-python v3 release.:DeprecationWarning",
"ignore:Creating a zarr.buffer.gpu.*:UserWarning",
"ignore:Duplicate name:UserWarning", # from ZipFile
]
Expand Down
44 changes: 32 additions & 12 deletions src/zarr/abc/store.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from asyncio import gather
from collections.abc import AsyncGenerator, Iterable
from typing import Any, NamedTuple, Protocol, runtime_checkable
from typing import TYPE_CHECKING, Any, NamedTuple, Protocol, runtime_checkable

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Iterable
from typing import Any, TypeAlias

from typing_extensions import Self
from typing_extensions import Self

from zarr.core.buffer import Buffer, BufferPrototype
from zarr.core.common import AccessModeLiteral, BytesLike
from zarr.core.buffer import Buffer, BufferPrototype
from zarr.core.common import AccessModeLiteral, BytesLike

__all__ = ["Store", "AccessMode", "ByteGetter", "ByteSetter", "set_or_delete"]

ByteRangeRequest: TypeAlias = tuple[int | None, int | None]


class AccessMode(NamedTuple):
str: AccessModeLiteral
Expand Down Expand Up @@ -94,7 +101,7 @@ async def get(
self,
key: str,
prototype: BufferPrototype,
byte_range: tuple[int | None, int | None] | None = None,
byte_range: ByteRangeRequest | None = None,
) -> Buffer | None:
"""Retrieve the value associated with a given key.

Expand All @@ -113,13 +120,13 @@ async def get(
async def get_partial_values(
self,
prototype: BufferPrototype,
key_ranges: list[tuple[str, tuple[int | None, int | None]]],
key_ranges: Iterable[tuple[str, ByteRangeRequest]],
) -> list[Buffer | None]:
"""Retrieve possibly partial values from given key_ranges.

Parameters
----------
key_ranges : list[tuple[str, tuple[int, int]]]
key_ranges : list[tuple[str, tuple[int | None, int | None]]]
Ordered set of key, range pairs, a key may occur multiple times with different ranges

Returns
Expand Down Expand Up @@ -189,7 +196,9 @@ def supports_partial_writes(self) -> bool:
...

@abstractmethod
async def set_partial_values(self, key_start_values: list[tuple[str, int, BytesLike]]) -> None:
async def set_partial_values(
self, key_start_values: Iterable[tuple[str, int, BytesLike]]
) -> None:
"""Store values at a given key, starting at byte range_start.

Parameters
Expand Down Expand Up @@ -253,21 +262,32 @@ def close(self) -> None:
"""Close the store."""
self._is_open = False

async def _get_many(
self, requests: Iterable[tuple[str, BufferPrototype, ByteRangeRequest | None]]
) -> AsyncGenerator[tuple[str, Buffer | None], None]:
"""
Retrieve a collection of objects from storage. In general this method does not guarantee
that objects will be retrieved in the order in which they were requested, so this method
yields tuple[str, Buffer | None] instead of just Buffer | None
"""
for req in requests:
yield (req[0], await self.get(*req))


@runtime_checkable
class ByteGetter(Protocol):
async def get(
self, prototype: BufferPrototype, byte_range: tuple[int, int | None] | None = None
self, prototype: BufferPrototype, byte_range: ByteRangeRequest | None = None
) -> Buffer | None: ...


@runtime_checkable
class ByteSetter(Protocol):
async def get(
self, prototype: BufferPrototype, byte_range: tuple[int, int | None] | None = None
self, prototype: BufferPrototype, byte_range: ByteRangeRequest | None = None
) -> Buffer | None: ...

async def set(self, value: Buffer, byte_range: tuple[int, int] | None = None) -> None: ...
async def set(self, value: Buffer, byte_range: ByteRangeRequest | None = None) -> None: ...

async def delete(self) -> None: ...

Expand Down
6 changes: 3 additions & 3 deletions src/zarr/codecs/sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
Codec,
CodecPipeline,
)
from zarr.abc.store import ByteGetter, ByteSetter
from zarr.abc.store import ByteGetter, ByteRangeRequest, ByteSetter
from zarr.codecs.bytes import BytesCodec
from zarr.codecs.crc32c_ import Crc32cCodec
from zarr.core.array_spec import ArraySpec
Expand Down Expand Up @@ -78,7 +78,7 @@ class _ShardingByteGetter(ByteGetter):
chunk_coords: ChunkCoords

async def get(
self, prototype: BufferPrototype, byte_range: tuple[int, int | None] | None = None
self, prototype: BufferPrototype, byte_range: ByteRangeRequest | None = None
) -> Buffer | None:
assert byte_range is None, "byte_range is not supported within shards"
assert (
Expand All @@ -91,7 +91,7 @@ async def get(
class _ShardingByteSetter(_ShardingByteGetter, ByteSetter):
shard_dict: ShardMutableMapping

async def set(self, value: Buffer, byte_range: tuple[int, int] | None = None) -> None:
async def set(self, value: Buffer, byte_range: ByteRangeRequest | None = None) -> None:
assert byte_range is None, "byte_range is not supported within shards"
self.shard_dict[self.chunk_coords] = value

Expand Down
133 changes: 128 additions & 5 deletions src/zarr/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

import json
from asyncio import gather
from collections.abc import Iterator, Sequence
from dataclasses import dataclass, field, replace
from typing import TYPE_CHECKING, Any, Literal, cast

import numpy as np
import numpy.typing as npt
from typing_extensions import deprecated

from zarr._compat import _deprecate_positional_args
from zarr.abc.codec import Codec, CodecPipeline
Expand Down Expand Up @@ -50,6 +52,8 @@
OrthogonalSelection,
Selection,
VIndex,
_iter_grid,
ceildiv,
check_fields,
check_no_multi_fields,
is_pure_fancy_indexing,
Expand All @@ -59,7 +63,7 @@
)
from zarr.core.metadata.v2 import ArrayV2Metadata
from zarr.core.metadata.v3 import ArrayV3Metadata
from zarr.core.sync import sync
from zarr.core.sync import collect_aiterator, sync
from zarr.registry import get_pipeline_class
from zarr.store import StoreLike, StorePath, make_store_path
from zarr.store.common import (
Expand Down Expand Up @@ -399,10 +403,12 @@ def shape(self) -> ChunkCoords:
def chunks(self) -> ChunkCoords:
if isinstance(self.metadata.chunk_grid, RegularChunkGrid):
return self.metadata.chunk_grid.chunk_shape
else:
raise TypeError(
f"chunk attribute is only available for RegularChunkGrid, this array has a {self.metadata.chunk_grid}"
)

msg = (
f"The `chunks` attribute is only defined for arrays using `RegularChunkGrid`."
f"This array has a {self.metadata.chunk_grid} instead."
)
raise NotImplementedError(msg)

@property
def size(self) -> int:
Expand Down Expand Up @@ -443,6 +449,55 @@ def basename(self) -> str | None:
return self.name.split("/")[-1]
return None

@property
@deprecated("AsyncArray.cdata_shape may be removed in an early zarr-python v3 release.")
def cdata_shape(self) -> ChunkCoords:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious which of these helpers could migrate to the purview of the chunk grid.

"""
The shape of the chunk grid for this array.
"""
return tuple(ceildiv(s, c) for s, c in zip(self.shape, self.chunks, strict=False))

@property
@deprecated("AsyncArray.nchunks may be removed in an early zarr-python v3 release.")
def nchunks(self) -> int:
"""
The number of chunks in the stored representation of this array.
"""
return product(self.cdata_shape)

def _iter_chunk_coords(self, origin: Sequence[int] | None = None) -> Iterator[ChunkCoords]:
"""
Produce an iterator over the coordinates of each chunk, in chunk grid space, relative to
an optional origin.
"""
return _iter_grid(self.cdata_shape, origin=origin)

def _iter_chunk_keys(self, origin: Sequence[int] | None = None) -> Iterator[str]:
"""
Return an iterator over the storage keys of each chunk, relative to an optional origin.
"""
for k in self._iter_chunk_coords(origin=origin):
yield self.metadata.encode_chunk_key(k)

def _iter_chunk_regions(self) -> Iterator[tuple[slice, ...]]:
"""
Iterate over the regions spanned by each chunk.
"""
for cgrid_position in self._iter_chunk_coords():
out: tuple[slice, ...] = ()
for c_pos, c_shape in zip(cgrid_position, self.chunks, strict=False):
start = c_pos * c_shape
stop = start + c_shape
out += (slice(start, stop, 1),)
yield out

@property
def nbytes(self) -> int:
"""
The number of bytes that can be stored in this array.
"""
return self.nchunks * self.dtype.itemsize

async def _get_selection(
self,
indexer: Indexer,
Expand Down Expand Up @@ -751,6 +806,50 @@ def read_only(self) -> bool:
def fill_value(self) -> Any:
return self.metadata.fill_value

@property
@deprecated(
"cdata_shape is transitional and will be removed in an early zarr-python v3 release."
)
def cdata_shape(self) -> ChunkCoords:
"""
The shape of the chunk grid for this array.
"""
return tuple(ceildiv(s, c) for s, c in zip(self.shape, self.chunks, strict=False))

@property
@deprecated("nchunks is transitional and will be removed in an early zarr-python v3 release.")
def nchunks(self) -> int:
"""
The number of chunks in the stored representation of this array.
"""
return self._async_array.nchunks

def _iter_chunks(self, origin: Sequence[int] | None = None) -> Iterator[ChunkCoords]:
"""
Produce an iterator over the coordinates of each chunk, in chunk grid space, relative
to an optional origin.
"""
yield from self._async_array._iter_chunk_coords(origin=origin)

@property
def nbytes(self) -> int:
"""
The number of bytes that can be stored in this array.
"""
return self._async_array.nbytes

def _iter_chunk_keys(self, origin: Sequence[int] | None = None) -> Iterator[str]:
"""
Return an iterator over the keys of each chunk, relative to an optional origin
"""
yield from self._async_array._iter_chunk_keys(origin=origin)

def _iter_chunk_regions(self) -> Iterator[tuple[slice, ...]]:
"""
Iterate over the regions spanned by each chunk.
"""
yield from self._async_array._iter_chunk_regions()

def __array__(
self, dtype: npt.DTypeLike | None = None, copy: bool | None = None
) -> NDArrayLike:
Expand Down Expand Up @@ -2082,3 +2181,27 @@ def info(self) -> None:
return sync(
self._async_array.info(),
)


@deprecated(
"nchunks_initialized is transitional and will be removed in an early zarr-python v3 release."
)
def nchunks_initialized(array: Array) -> int:
return len(chunks_initialized(array))


def chunks_initialized(array: Array) -> tuple[str, ...]:
"""
Return the keys of all the chunks that exist in storage.
"""
# todo: make this compose with the underlying async iterator
store_contents = list(
collect_aiterator(array.store_path.store.list_prefix(prefix=array.store_path.path))
)
out: list[str] = []

for chunk_key in array._iter_chunk_keys():
if chunk_key in store_contents:
out.append(chunk_key)

return tuple(out)
2 changes: 1 addition & 1 deletion src/zarr/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def product(tup: ChunkCoords) -> int:


async def concurrent_map(
items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = None
items: Iterable[T], func: Callable[..., Awaitable[V]], limit: int | None = None
) -> list[V]:
if limit is None:
return await asyncio.gather(*[func(*item) for item in items])
Expand Down
Loading
Loading