From 4ca61fbb4d064ecc0f31bc7845b36e3fcdb09027 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Tue, 5 Dec 2023 17:34:49 +0100 Subject: [PATCH] move v2 chunk io into chunk.py --- zarr/v3/abc/codec.py | 36 ++++++-- zarr/v3/array/base.py | 72 ++++----------- zarr/v3/array/chunk.py | 193 ++++++++++++++++++++++++++++++---------- zarr/v3/array/codecs.py | 18 +++- zarr/v3/array/v2.py | 70 +++++++++++++-- zarr/v3/array/v3.py | 16 ++-- 6 files changed, 286 insertions(+), 119 deletions(-) diff --git a/zarr/v3/abc/codec.py b/zarr/v3/abc/codec.py index b98c85d0a3..0cb14d2e1d 100644 --- a/zarr/v3/abc/codec.py +++ b/zarr/v3/abc/codec.py @@ -11,20 +11,19 @@ from __future__ import annotations from abc import abstractmethod, ABC -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Type import numpy as np -from zarr.v3.common import BytesLike +from zarr.v3.common import BytesLike, SliceSelection +from zarr.v3.store import StorePath if TYPE_CHECKING: - from zarr.v3.array.base import CoreArrayMetadata + from zarr.v3.array.base import CoreArrayMetadata, CodecMetadata class Codec(ABC): - supports_partial_decode: bool - supports_partial_encode: bool is_fixed_size: bool array_metadata: CoreArrayMetadata @@ -35,6 +34,12 @@ def compute_encoded_size(self, input_byte_length: int) -> int: def resolve_metadata(self) -> CoreArrayMetadata: return self.array_metadata + @classmethod + def from_metadata( + cls, codec_metadata: "CodecMetadata", array_metadata: CoreArrayMetadata + ) -> "Type[Codec]": + pass + class ArrayArrayCodec(Codec): @abstractmethod @@ -68,6 +73,27 @@ async def encode( pass +class ArrayBytesCodecPartialDecodeMixin: + @abstractmethod + async def decode_partial( + self, + store_path: StorePath, + selection: SliceSelection, + ) -> Optional[np.ndarray]: + pass + + +class ArrayBytesCodecPartialEncodeMixin: + @abstractmethod + async def encode_partial( + self, + store_path: StorePath, + chunk_array: np.ndarray, + selection: SliceSelection, + ) -> None: + pass + + class BytesBytesCodec(Codec): @abstractmethod async def decode( diff --git a/zarr/v3/array/base.py b/zarr/v3/array/base.py index 9ea134b3fd..4f0c174bd4 100644 --- a/zarr/v3/array/base.py +++ b/zarr/v3/array/base.py @@ -93,61 +93,6 @@ def to_numpy_shortname(self) -> str: } -@frozen -class RegularChunkGridConfigurationMetadata: - chunk_shape: ChunkCoords - - -@frozen -class RegularChunkGridMetadata: - configuration: RegularChunkGridConfigurationMetadata - name: Literal["regular"] = "regular" - - -@frozen -class DefaultChunkKeyEncodingConfigurationMetadata: - separator: Literal[".", "/"] = "/" - - -@frozen -class DefaultChunkKeyEncodingMetadata: - configuration: DefaultChunkKeyEncodingConfigurationMetadata = ( - DefaultChunkKeyEncodingConfigurationMetadata() - ) - name: Literal["default"] = "default" - - def decode_chunk_key(self, chunk_key: str) -> ChunkCoords: - if chunk_key == "c": - return () - return tuple(map(int, chunk_key[1:].split(self.configuration.separator))) - - def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: - return self.configuration.separator.join(map(str, ("c",) + chunk_coords)) - - -@frozen -class V2ChunkKeyEncodingConfigurationMetadata: - separator: Literal[".", "/"] = "." - - -@frozen -class V2ChunkKeyEncodingMetadata: - configuration: V2ChunkKeyEncodingConfigurationMetadata = ( - V2ChunkKeyEncodingConfigurationMetadata() - ) - name: Literal["v2"] = "v2" - - def decode_chunk_key(self, chunk_key: str) -> ChunkCoords: - return tuple(map(int, chunk_key.split(self.configuration.separator))) - - def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: - chunk_identifier = self.configuration.separator.join(map(str, chunk_coords)) - return "0" if chunk_identifier == "" else chunk_identifier - - -ChunkKeyEncodingMetadata = Union[DefaultChunkKeyEncodingMetadata, V2ChunkKeyEncodingMetadata] - - @frozen class CoreArrayMetadata: shape: ChunkCoords @@ -185,4 +130,19 @@ def ndim(self) -> int: @property def dtype(self) -> np.dtype: - return np.dtype(self.metadata.dtype) """ + return np.dtype(self.metadata.dtype) + + @property + def size(self) -> int + return np.prod(self.metadata.shape) + + @property + def T(self) -> 'ZArray': + ... + + def __getitem__(*args): + return _chunk_getitem_sync(*args): + + def __setitem__(*args): + return _chunk_setitem_sync(*args) + """ diff --git a/zarr/v3/array/chunk.py b/zarr/v3/array/chunk.py index 8c88941093..82b987cf04 100644 --- a/zarr/v3/array/chunk.py +++ b/zarr/v3/array/chunk.py @@ -1,6 +1,7 @@ -from typing import Any, Literal, Optional, Tuple +from typing import Any, Dict, List, Literal, Optional, Tuple, Union + +from attr import frozen from zarr.util import is_total_slice -from zarr.v3.array.base import ChunkKeyEncodingMetadata from zarr.v3.common import BytesLike, ChunkCoords, SliceSelection, to_thread import numpy as np import numcodecs @@ -9,18 +10,85 @@ from zarr.v3.store import StorePath -async def _read_chunk_v2( +@frozen +class RegularChunkGridConfigurationMetadata: + chunk_shape: ChunkCoords + + +@frozen +class RegularChunkGridMetadata: + configuration: RegularChunkGridConfigurationMetadata + name: Literal["regular"] = "regular" + + +@frozen +class DefaultChunkKeyEncodingConfigurationMetadata: + separator: Literal[".", "/"] = "/" + + +@frozen +class DefaultChunkKeyEncodingMetadata: + configuration: DefaultChunkKeyEncodingConfigurationMetadata = ( + DefaultChunkKeyEncodingConfigurationMetadata() + ) + name: Literal["default"] = "default" + + def decode_chunk_key(self, chunk_key: str) -> ChunkCoords: + if chunk_key == "c": + return () + return tuple(map(int, chunk_key[1:].split(self.configuration.separator))) + + def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: + return self.configuration.separator.join(map(str, ("c",) + chunk_coords)) + + +@frozen +class V2ChunkKeyEncodingConfigurationMetadata: + separator: Literal[".", "/"] = "." + + +@frozen +class V2ChunkKeyEncodingMetadata: + configuration: V2ChunkKeyEncodingConfigurationMetadata = ( + V2ChunkKeyEncodingConfigurationMetadata() + ) + name: Literal["v2"] = "v2" + + def decode_chunk_key(self, chunk_key: str) -> ChunkCoords: + return tuple(map(int, chunk_key.split(self.configuration.separator))) + + def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: + chunk_identifier = self.configuration.separator.join(map(str, chunk_coords)) + return "0" if chunk_identifier == "" else chunk_identifier + + +ChunkKeyEncodingMetadata = Union[DefaultChunkKeyEncodingMetadata, V2ChunkKeyEncodingMetadata] + + +async def read_chunk_v2( fill_value: Any, chunk_key_encoding: ChunkKeyEncodingMetadata, store_path, + chunks: ChunkCoords, chunk_coords: ChunkCoords, chunk_selection: SliceSelection, out_selection: SliceSelection, + compressor: Optional[Dict[str, Any]], + filters: List[Optional[Dict[str, Any]]], + dtype: np.dtype, + order: Literal["C", "F"], out: np.ndarray, ): store_path = store_path / chunk_key_encoding.encode_chunk_key(chunk_coords) - - chunk_array = await _decode_chunk_v2(await store_path.get_async()) + chunk_bytes = await store_path.get_async() + chunk_array = await _decode_chunk_v2( + compressor=compressor, + dtype=dtype, + chunks=chunks, + filters=filters, + order=order, + chunk_bytes=chunk_bytes, + ) if chunk_array is not None: tmp = chunk_array[chunk_selection] out[out_selection] = tmp @@ -29,18 +97,19 @@ async def _read_chunk_v2( async def _decode_chunk_v2( - self, + compressor: Optional[Dict[str, Any]], dtype: np.dtype, chunks: ChunkCoords, order: Literal["C", "F"], filters: Tuple[Any], chunk_bytes: Optional[BytesLike], ) -> Optional[np.ndarray]: + if chunk_bytes is None: return None - if self.metadata.compressor is not None: - compressor = numcodecs.get_codec(self.metadata.compressor) + if compressor is not None: + compressor = numcodecs.get_codec(compressor) chunk_array = ensure_ndarray(await to_thread(compressor.decode, chunk_bytes)) else: chunk_array = ensure_ndarray(chunk_bytes) @@ -65,73 +134,116 @@ async def _decode_chunk_v2( async def _write_chunk_v2( - self, value: np.ndarray, + chunk_key_encoding: ChunkKeyEncodingMetadata, + store_path: StorePath, + dtype: np.dtype, + order: Literal["C", "F"], + compressor: Any, + filters: List[Any], + chunks: ChunkCoords, chunk_shape: ChunkCoords, chunk_coords: ChunkCoords, chunk_selection: SliceSelection, out_selection: SliceSelection, + fill_value: Any, ): - store_path = self.store_path / self._encode_chunk_key(chunk_coords) + store_path = store_path / chunk_key_encoding.encode_chunk_key(chunk_coords) if is_total_slice(chunk_selection, chunk_shape): # write entire chunks if np.isscalar(value): chunk_array = np.empty( chunk_shape, - dtype=self.metadata.dtype, - order=self.metadata.order, + dtype=dtype, + order=order, ) chunk_array.fill(value) else: chunk_array = value[out_selection] - await self._write_chunk_to_store(store_path, chunk_array) + + await _write_chunk_to_store_v2( + fill_value=fill_value, + store_path=store_path, + chunk_array=chunk_array, + order=order, + compressor=compressor, + filters=filters, + ) else: # writing partial chunks # read chunk first - tmp = await self._decode_chunk(await store_path.get_async()) + chunk_bytes = await store_path.get_async() + tmp = await _decode_chunk_v2( + compressor=compressor, + dtype=dtype, + chunks=chunks, + order=order, + filters=filters, + chunk_bytes=chunk_bytes, + ) # merge new value if tmp is None: chunk_array = np.empty( chunk_shape, - dtype=self.metadata.dtype, - order=self.metadata.order, + dtype=dtype, + order=order, ) - chunk_array.fill(self.metadata.fill_value) + chunk_array.fill(fill_value) else: chunk_array = tmp.copy( - order=self.metadata.order, + order=order, ) # make a writable copy chunk_array[chunk_selection] = value[out_selection] - await self._write_chunk_to_store(store_path, chunk_array) + await _write_chunk_to_store_v2( + fill_value=fill_value, + store_path=store_path, + chunk_array=chunk_array, + order=order, + compressor=compressor, + filters=filters, + ) -async def _write_chunk_to_store_v2(self, store_path: StorePath, chunk_array: np.ndarray): +async def _write_chunk_to_store_v2( + fill_value: Any, + store_path: StorePath, + chunk_array: np.ndarray, + order: Literal["C", "F"], + compressor: Dict[str, Any], + filters, +): chunk_bytes: Optional[BytesLike] - if np.all(chunk_array == self.metadata.fill_value): + if np.all(chunk_array == fill_value): # chunks that only contain fill_value will be removed await store_path.delete_async() else: - chunk_bytes = await self._encode_chunk(chunk_array) + chunk_bytes = await _encode_chunk_v2( + chunk_array=chunk_array, order=order, compressor=compressor, filters=filters + ) if chunk_bytes is None: await store_path.delete_async() else: await store_path.set_async(chunk_bytes) -async def _encode_chunk_v2(self, chunk_array: np.ndarray) -> Optional[BytesLike]: - chunk_array = chunk_array.ravel(order=self.metadata.order) +async def _encode_chunk_v2( + chunk_array: np.ndarray, + order: Literal["C", "F"], + compressor: Any, + filters: List[Any], +) -> Optional[BytesLike]: + chunk_array = chunk_array.ravel(order=order) - if self.metadata.filters is not None: - for filter_metadata in self.metadata.filters: - filter = numcodecs.get_codec(filter_metadata) - chunk_array = await to_thread(filter.encode, chunk_array) + for filter_metadata in filters: + filter = numcodecs.get_codec(filter_metadata) + chunk_array = await to_thread(filter.encode, chunk_array) - if self.metadata.compressor is not None: - compressor = numcodecs.get_codec(self.metadata.compressor) + if compressor is not None: + compressor = numcodecs.get_codec(compressor) if not chunk_array.flags.c_contiguous and not chunk_array.flags.f_contiguous: chunk_array = chunk_array.copy(order="A") encoded_chunk_bytes = ensure_bytes(await to_thread(compressor.encode, chunk_array)) @@ -141,11 +253,6 @@ async def _encode_chunk_v2(self, chunk_array: np.ndarray) -> Optional[BytesLike] return encoded_chunk_bytes -def _encode_chunk_key_v2(self, chunk_coords: ChunkCoords) -> str: - chunk_identifier = self.metadata.dimension_separator.join(map(str, chunk_coords)) - return "0" if chunk_identifier == "" else chunk_identifier - - async def _write_chunk_v3( self, value: np.ndarray, @@ -214,24 +321,20 @@ async def _write_chunk_to_store_v3(self, store_path: StorePath, chunk_array: np. async def _read_chunk_v3( - self, + chunk_key_encoding: ChunkKeyEncodingMetadata, fill_value: Any, store_path, + codec_pipeline, chunk_coords: ChunkCoords, chunk_selection: SliceSelection, out_selection: SliceSelection, out: np.ndarray, ): - chunk_key_encoding = self.metadata.chunk_key_encoding chunk_key = chunk_key_encoding.encode_chunk_key(chunk_coords) - store_path = self.store_path / chunk_key + store_path = store_path / chunk_key - if len(self.codec_pipeline.codecs) == 1 and isinstance( - self.codec_pipeline.codecs[0], ShardingCodec - ): - chunk_array = await self.codec_pipeline.codecs[0].decode_partial( - store_path, chunk_selection - ) + if len(codec_pipeline.codecs) == 1 and isinstance(codec_pipeline.codecs[0], ShardingCodec): + chunk_array = await codec_pipeline.codecs[0].decode_partial(store_path, chunk_selection) if chunk_array is not None: out[out_selection] = chunk_array else: @@ -239,7 +342,7 @@ async def _read_chunk_v3( else: chunk_bytes = await store_path.get_async() if chunk_bytes is not None: - chunk_array = await self.codec_pipeline.decode(chunk_bytes) + chunk_array = await codec_pipeline.decode(chunk_bytes) tmp = chunk_array[chunk_selection] out[out_selection] = tmp else: diff --git a/zarr/v3/array/codecs.py b/zarr/v3/array/codecs.py index 599484cf79..6096a53ac0 100644 --- a/zarr/v3/array/codecs.py +++ b/zarr/v3/array/codecs.py @@ -1,7 +1,18 @@ from __future__ import annotations from functools import reduce -from typing import TYPE_CHECKING, Iterable, List, Literal, Optional, Tuple, Union +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Iterable, + List, + Literal, + Optional, + Protocol, + Tuple, + Union, +) from warnings import warn import numcodecs @@ -22,6 +33,11 @@ BloscShuffle = Literal["noshuffle", "shuffle", "bitshuffle"] +class CodecMetadata(Protocol): + name: str + configuration: Optional[Any] + + @frozen class BloscCodecConfigurationMetadata: typesize: int diff --git a/zarr/v3/array/v2.py b/zarr/v3/array/v2.py index 218a2ac53d..bf7a782716 100644 --- a/zarr/v3/array/v2.py +++ b/zarr/v3/array/v2.py @@ -8,7 +8,12 @@ import numpy as np from attr import evolve, frozen, asdict, field from numcodecs.compat import ensure_bytes, ensure_ndarray - +from zarr.v3.array.chunk import ( + V2ChunkKeyEncodingConfigurationMetadata, + V2ChunkKeyEncodingMetadata, + _write_chunk_v2, +) +from zarr.v3.array.chunk import read_chunk_v2 from zarr.v3.common import ( ZARRAY_JSON, ZATTRS_JSON, @@ -283,6 +288,7 @@ async def get_async(self, selection: Selection): else: return out[()] + """ async def _read_chunk( self, chunk_coords: ChunkCoords, @@ -298,8 +304,43 @@ async def _read_chunk( out[out_selection] = tmp else: out[out_selection] = self.metadata.fill_value + """ + + async def _read_chunk( + self, + chunk_coords: ChunkCoords, + chunk_selection: SliceSelection, + out_selection: SliceSelection, + out: np.ndarray, + ): + + chunk_key_encoding = V2ChunkKeyEncodingMetadata( + configuration=V2ChunkKeyEncodingConfigurationMetadata( + separator=self.metadata.dimension_separator + ) + ) + if self.metadata.filters is None: + filters = [] + else: + filters = self.metadata.filters + + await read_chunk_v2( + self.metadata.fill_value, + chunk_key_encoding=chunk_key_encoding, + store_path=self.store_path, + chunks=self.metadata.chunks, + chunk_coords=chunk_coords, + chunk_selection=chunk_selection, + out_selection=out_selection, + compressor=self.metadata.compressor, + filters=filters, + dtype=self.metadata.dtype, + order=self.metadata.order, + out=out, + ) - async def _decode_chunk(self, chunk_bytes: Optional[BytesLike]) -> Optional[np.ndarray]: + """ + async def _decode_chunk(self, chunk_bytes: Optional[BytesLike]) -> Optional[np.ndarray]: if chunk_bytes is None: return None @@ -327,6 +368,7 @@ async def _decode_chunk(self, chunk_bytes: Optional[BytesLike]) -> Optional[np.n ) return chunk_array + """ def __setitem__(self, selection: Selection, value: np.ndarray) -> None: sync(self.set_async(selection, value), self.runtime_configuration.asyncio_loop) @@ -338,7 +380,15 @@ async def set_async(self, selection: Selection, value: np.ndarray) -> None: shape=self.metadata.shape, chunk_shape=chunk_shape, ) - + chunk_key_encoding = V2ChunkKeyEncodingMetadata( + configuration=V2ChunkKeyEncodingConfigurationMetadata( + separator=self.metadata.dimension_separator + ) + ) + if self.metadata.filters is None: + filters = [] + else: + filters = self.metadata.filters sel_shape = indexer.shape # check value shape @@ -357,16 +407,25 @@ async def set_async(self, selection: Selection, value: np.ndarray) -> None: [ ( value, + chunk_key_encoding, + self.store_path, + self.metadata.dtype, + self.metadata.order, + self.metadata.compressor, + filters, + self.metadata.chunks, chunk_shape, chunk_coords, chunk_selection, out_selection, + self.metadata.fill_value, ) for chunk_coords, chunk_selection, out_selection in indexer ], - self._write_chunk, + _write_chunk_v2, ) + """ async def _write_chunk( self, value: np.ndarray, @@ -410,7 +469,7 @@ async def _write_chunk( chunk_array[chunk_selection] = value[out_selection] await self._write_chunk_to_store(store_path, chunk_array) - + async def _write_chunk_to_store(self, store_path: StorePath, chunk_array: np.ndarray): chunk_bytes: Optional[BytesLike] if np.all(chunk_array == self.metadata.fill_value): @@ -444,6 +503,7 @@ async def _encode_chunk(self, chunk_array: np.ndarray) -> Optional[BytesLike]: def _encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: chunk_identifier = self.metadata.dimension_separator.join(map(str, chunk_coords)) return "0" if chunk_identifier == "" else chunk_identifier + """ async def resize_async(self, new_shape: ChunkCoords) -> ZArray: assert len(new_shape) == len(self.metadata.shape) diff --git a/zarr/v3/array/v3.py b/zarr/v3/array/v3.py index 722b4a975b..4608cadc9a 100644 --- a/zarr/v3/array/v3.py +++ b/zarr/v3/array/v3.py @@ -18,6 +18,15 @@ from attr import asdict, evolve, frozen, field from zarr.v3.abc.array import SynchronousArray, AsynchronousArray +from zarr.v3.array.chunk import ( + DefaultChunkKeyEncodingConfigurationMetadata, + DefaultChunkKeyEncodingMetadata, + RegularChunkGridConfigurationMetadata, + RegularChunkGridMetadata, + ChunkKeyEncodingMetadata, + V2ChunkKeyEncodingConfigurationMetadata, + V2ChunkKeyEncodingMetadata, +) # from zarr.v3.array_v2 import ArrayV2 from zarr.v3.array.codecs import CodecMetadata, CodecPipeline, bytes_codec @@ -31,16 +40,9 @@ ) from zarr.v3.array.indexing import BasicIndexer, all_chunk_coords, is_total_slice from zarr.v3.array.base import ( - ChunkKeyEncodingMetadata, CoreArrayMetadata, DataType, - DefaultChunkKeyEncodingConfigurationMetadata, - DefaultChunkKeyEncodingMetadata, - RegularChunkGridConfigurationMetadata, - RegularChunkGridMetadata, RuntimeConfiguration, - V2ChunkKeyEncodingConfigurationMetadata, - V2ChunkKeyEncodingMetadata, dtype_to_data_type, ) from zarr.v3.sharding import ShardingCodec