-
-
Notifications
You must be signed in to change notification settings - Fork 314
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure parents are created when creating a node #2262
Changes from all commits
d8f9b01
84cfe18
3423a36
7ba2648
d65047e
d44f955
44e4554
10fdc90
264327d
1039c16
435469c
4e07e01
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,7 +13,12 @@ | |
from zarr.codecs import BytesCodec | ||
from zarr.codecs._v2 import V2Compressor, V2Filters | ||
from zarr.core.attributes import Attributes | ||
from zarr.core.buffer import BufferPrototype, NDArrayLike, NDBuffer, default_buffer_prototype | ||
from zarr.core.buffer import ( | ||
BufferPrototype, | ||
NDArrayLike, | ||
NDBuffer, | ||
default_buffer_prototype, | ||
) | ||
from zarr.core.chunk_grids import RegularChunkGrid, _guess_chunks | ||
from zarr.core.chunk_key_encodings import ( | ||
ChunkKeyEncoding, | ||
|
@@ -71,6 +76,7 @@ | |
from collections.abc import Iterable, Iterator, Sequence | ||
|
||
from zarr.abc.codec import Codec, CodecPipeline | ||
from zarr.core.group import AsyncGroup | ||
from zarr.core.metadata.common import ArrayMetadata | ||
|
||
# Array and AsyncArray are defined in the base ``zarr`` namespace | ||
|
@@ -337,7 +343,7 @@ async def _create_v3( | |
) | ||
|
||
array = cls(metadata=metadata, store_path=store_path) | ||
await array._save_metadata(metadata) | ||
await array._save_metadata(metadata, ensure_parents=True) | ||
return array | ||
|
||
@classmethod | ||
|
@@ -376,7 +382,7 @@ async def _create_v2( | |
attributes=attributes, | ||
) | ||
array = cls(metadata=metadata, store_path=store_path) | ||
await array._save_metadata(metadata) | ||
await array._save_metadata(metadata, ensure_parents=True) | ||
return array | ||
|
||
@classmethod | ||
|
@@ -621,9 +627,24 @@ async def getitem( | |
) | ||
return await self._get_selection(indexer, prototype=prototype) | ||
|
||
async def _save_metadata(self, metadata: ArrayMetadata) -> None: | ||
async def _save_metadata(self, metadata: ArrayMetadata, ensure_parents: bool = False) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new keyword is to ensure that updates to an existing nodes don't require all the |
||
to_save = metadata.to_buffer_dict(default_buffer_prototype()) | ||
awaitables = [set_or_delete(self.store_path / key, value) for key, value in to_save.items()] | ||
|
||
if ensure_parents: | ||
# To enable zarr.create(store, path="a/b/c"), we need to create all the intermediate groups. | ||
parents = _build_parents(self) | ||
|
||
for parent in parents: | ||
awaitables.extend( | ||
[ | ||
(parent.store_path / key).set_if_not_exists(value) | ||
for key, value in parent.metadata.to_buffer_dict( | ||
default_buffer_prototype() | ||
).items() | ||
] | ||
) | ||
|
||
await gather(*awaitables) | ||
|
||
async def _set_selection( | ||
|
@@ -2354,3 +2375,21 @@ def chunks_initialized(array: Array | AsyncArray) -> tuple[str, ...]: | |
out.append(chunk_key) | ||
|
||
return tuple(out) | ||
|
||
|
||
def _build_parents(node: AsyncArray | AsyncGroup) -> list[AsyncGroup]: | ||
from zarr.core.group import AsyncGroup, GroupMetadata | ||
|
||
required_parts = node.store_path.path.split("/")[:-1] | ||
parents = [] | ||
|
||
for i, part in enumerate(required_parts): | ||
path = "/".join(required_parts[:i] + [part]) | ||
parents.append( | ||
AsyncGroup( | ||
metadata=GroupMetadata(zarr_format=node.metadata.zarr_format), | ||
store_path=StorePath(store=node.store_path.store, path=path), | ||
) | ||
) | ||
|
||
return parents |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,6 +60,7 @@ def _put( | |
path: Path, | ||
value: Buffer, | ||
start: int | None = None, | ||
exclusive: bool = False, | ||
) -> int | None: | ||
path.parent.mkdir(parents=True, exist_ok=True) | ||
if start is not None: | ||
|
@@ -68,7 +69,13 @@ def _put( | |
f.write(value.as_numpy_array().tobytes()) | ||
return None | ||
else: | ||
return path.write_bytes(value.as_numpy_array().tobytes()) | ||
view = memoryview(value.as_numpy_array().tobytes()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
if exclusive: | ||
mode = "xb" | ||
else: | ||
mode = "wb" | ||
with path.open(mode=mode) as f: | ||
return f.write(view) | ||
|
||
|
||
class LocalStore(Store): | ||
|
@@ -152,14 +159,23 @@ async def get_partial_values( | |
return await concurrent_map(args, to_thread, limit=None) # TODO: fix limit | ||
|
||
async def set(self, key: str, value: Buffer) -> None: | ||
return await self._set(key, value) | ||
|
||
async def set_if_not_exists(self, key: str, value: Buffer) -> None: | ||
try: | ||
return await self._set(key, value, exclusive=True) | ||
except FileExistsError: | ||
pass | ||
|
||
async def _set(self, key: str, value: Buffer, exclusive: bool = False) -> None: | ||
if not self._is_open: | ||
await self._open() | ||
self._check_writable() | ||
assert isinstance(key, str) | ||
if not isinstance(value, Buffer): | ||
raise TypeError("LocalStore.set(): `value` must a Buffer instance") | ||
path = self.root / key | ||
await to_thread(_put, path, value) | ||
await to_thread(_put, path, value, start=None, exclusive=exclusive) | ||
|
||
async def set_partial_values( | ||
self, key_start_values: Iterable[tuple[str, int, bytes | bytearray | memoryview]] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is actually tested anywhere. I'm not 100% sure, but I think all the Group / Array Metadata creation method will be using
StorePath
as theirByteSetter
.