-
-
Notifications
You must be signed in to change notification settings - Fork 290
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Basic working FsspecStore * upath to be optional * fill out methods * add fsspec to deps (I believe we want this) * fixes * importable * exceptions * Add simple test * Add to test env * fix typing * Update src/zarr/store/remote.py Co-authored-by: Norman Rzepka <[email protected]> * BufferPrototype * set up testing infrastructure for remote store * broken tests but get and set are implemented correctly for TestRemoteStoreS3 * remove implementation of test_get, and make s3 fixture autoused, to reveal multiple event loop error * Update tests/v3/test_store/test_remote.py Co-authored-by: Martin Durant <[email protected]> * don't use fsmap, and don't use os.path.join * scope s3 fixture to session, mark test_store_supports_partial_writes as xfail * Update src/zarr/store/remote.py Co-authored-by: Davis Bennett <[email protected]> * Fix most * fixed more * fix rest * Massage old v2 tests * just skip them.. * Attribute rename to allowed_exceptions --------- Co-authored-by: Davis Bennett <[email protected]> Co-authored-by: Joe Hamman <[email protected]> Co-authored-by: Norman Rzepka <[email protected]>
- Loading branch information
1 parent
ee30347
commit 7ded5d6
Showing
8 changed files
with
317 additions
and
95 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,103 +1,190 @@ | ||
from __future__ import annotations | ||
|
||
from collections.abc import AsyncGenerator | ||
from typing import TYPE_CHECKING, Any | ||
|
||
import fsspec | ||
|
||
from zarr.abc.store import Store | ||
from zarr.buffer import Buffer, BufferPrototype | ||
from zarr.buffer import Buffer, BufferPrototype, default_buffer_prototype | ||
from zarr.common import OpenMode | ||
from zarr.store.core import _dereference_path | ||
|
||
if TYPE_CHECKING: | ||
from fsspec.asyn import AsyncFileSystem | ||
from upath import UPath | ||
|
||
from zarr.buffer import Buffer | ||
from zarr.common import BytesLike | ||
|
||
|
||
class RemoteStore(Store): | ||
# based on FSSpec | ||
supports_writes: bool = True | ||
supports_partial_writes: bool = False | ||
supports_listing: bool = True | ||
|
||
root: UPath | ||
_fs: AsyncFileSystem | ||
path: str | ||
allowed_exceptions: tuple[type[Exception], ...] | ||
|
||
def __init__( | ||
self, url: UPath | str, *, mode: OpenMode = "r", **storage_options: dict[str, Any] | ||
self, | ||
url: UPath | str, | ||
mode: OpenMode = "r", | ||
allowed_exceptions: tuple[type[Exception], ...] = ( | ||
FileNotFoundError, | ||
IsADirectoryError, | ||
NotADirectoryError, | ||
), | ||
**storage_options: Any, | ||
): | ||
import fsspec | ||
from upath import UPath | ||
""" | ||
Parameters | ||
---------- | ||
url: root of the datastore. In fsspec notation, this is usually like "protocol://path/to". | ||
Can also be a upath.UPath instance/ | ||
allowed_exceptions: when fetching data, these cases will be deemed to correspond to missing | ||
keys, rather than some other IO failure | ||
storage_options: passed on to fsspec to make the filesystem instance. If url is a UPath, | ||
this must not be used. | ||
""" | ||
|
||
super().__init__(mode=mode) | ||
|
||
if isinstance(url, str): | ||
self.root = UPath(url, **storage_options) | ||
self._fs, self.path = fsspec.url_to_fs(url, **storage_options) | ||
elif hasattr(url, "protocol") and hasattr(url, "fs"): | ||
# is UPath-like - but without importing | ||
if storage_options: | ||
raise ValueError( | ||
"If constructed with a UPath object, no additional " | ||
"storage_options are allowed" | ||
) | ||
self.path = url.path | ||
self._fs = url._fs | ||
else: | ||
assert ( | ||
len(storage_options) == 0 | ||
), "If constructed with a UPath object, no additional storage_options are allowed." | ||
self.root = url.rstrip("/") | ||
|
||
raise ValueError("URL not understood, %s", url) | ||
self.allowed_exceptions = allowed_exceptions | ||
# test instantiate file system | ||
fs, _ = fsspec.core.url_to_fs(str(self.root), asynchronous=True, **self.root._kwargs) | ||
assert fs.__class__.async_impl, "FileSystem needs to support async operations." | ||
if not self._fs.async_impl: | ||
raise TypeError("FileSystem needs to support async operations") | ||
|
||
def __str__(self) -> str: | ||
return str(self.root) | ||
return f"Remote fsspec store: {type(self._fs).__name__} , {self.path}" | ||
|
||
def __repr__(self) -> str: | ||
return f"RemoteStore({str(self)!r})" | ||
|
||
def _make_fs(self) -> tuple[AsyncFileSystem, str]: | ||
import fsspec | ||
|
||
storage_options = self.root._kwargs.copy() | ||
storage_options.pop("_url", None) | ||
fs, root = fsspec.core.url_to_fs(str(self.root), asynchronous=True, **self.root._kwargs) | ||
assert fs.__class__.async_impl, "FileSystem needs to support async operations." | ||
return fs, root | ||
return f"<RemoteStore({type(self._fs).__name__} , {self.path})>" | ||
|
||
async def get( | ||
self, | ||
key: str, | ||
prototype: BufferPrototype, | ||
prototype: BufferPrototype = default_buffer_prototype, | ||
byte_range: tuple[int | None, int | None] | None = None, | ||
) -> Buffer | None: | ||
assert isinstance(key, str) | ||
fs, root = self._make_fs() | ||
path = _dereference_path(root, key) | ||
path = _dereference_path(self.path, key) | ||
|
||
try: | ||
value: Buffer | None = await ( | ||
fs._cat_file(path, start=byte_range[0], end=byte_range[1]) | ||
if byte_range | ||
else fs._cat_file(path) | ||
if byte_range: | ||
# fsspec uses start/end, not start/length | ||
start, length = byte_range | ||
if start is not None and length is not None: | ||
end = start + length | ||
elif length is not None: | ||
end = length | ||
else: | ||
end = None | ||
value: Buffer = prototype.buffer.from_bytes( | ||
await ( | ||
self._fs._cat_file(path, start=byte_range[0], end=end) | ||
if byte_range | ||
else self._fs._cat_file(path) | ||
) | ||
) | ||
except (FileNotFoundError, IsADirectoryError, NotADirectoryError): | ||
return None | ||
return value | ||
|
||
return value | ||
except self.allowed_exceptions: | ||
return None | ||
except OSError as e: | ||
if "not satisfiable" in str(e): | ||
# this is an s3-specific condition we probably don't want to leak | ||
return prototype.buffer.from_bytes(b"") | ||
raise | ||
|
||
async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None = None) -> None: | ||
async def set( | ||
self, | ||
key: str, | ||
value: Buffer, | ||
byte_range: tuple[int, int] | None = None, | ||
) -> None: | ||
self._check_writable() | ||
assert isinstance(key, str) | ||
fs, root = self._make_fs() | ||
path = _dereference_path(root, key) | ||
|
||
path = _dereference_path(self.path, key) | ||
# write data | ||
if byte_range: | ||
with fs._open(path, "r+b") as f: | ||
f.seek(byte_range[0]) | ||
f.write(value) | ||
else: | ||
await fs._pipe_file(path, value) | ||
raise NotImplementedError | ||
await self._fs._pipe_file(path, value.to_bytes()) | ||
|
||
async def delete(self, key: str) -> None: | ||
self._check_writable() | ||
fs, root = self._make_fs() | ||
path = _dereference_path(root, key) | ||
if await fs._exists(path): | ||
await fs._rm(path) | ||
path = _dereference_path(self.path, key) | ||
try: | ||
await self._fs._rm(path) | ||
except FileNotFoundError: | ||
pass | ||
except self.allowed_exceptions: | ||
pass | ||
|
||
async def exists(self, key: str) -> bool: | ||
fs, root = self._make_fs() | ||
path = _dereference_path(root, key) | ||
exists: bool = await fs._exists(path) | ||
path = _dereference_path(self.path, key) | ||
exists: bool = await self._fs._exists(path) | ||
return exists | ||
|
||
async def get_partial_values( | ||
self, | ||
prototype: BufferPrototype, | ||
key_ranges: list[tuple[str, tuple[int | None, int | None]]], | ||
) -> list[Buffer | None]: | ||
if key_ranges: | ||
paths, starts, stops = zip( | ||
*( | ||
( | ||
_dereference_path(self.path, k[0]), | ||
k[1][0], | ||
((k[1][0] or 0) + k[1][1]) if k[1][1] is not None else None, | ||
) | ||
for k in key_ranges | ||
), | ||
strict=False, | ||
) | ||
else: | ||
return [] | ||
# TODO: expectations for exceptions or missing keys? | ||
res = await self._fs._cat_ranges(list(paths), starts, stops, on_error="return") | ||
# the following is an s3-specific condition we probably don't want to leak | ||
res = [b"" if (isinstance(r, OSError) and "not satisfiable" in str(r)) else r for r in res] | ||
for r in res: | ||
if isinstance(r, Exception) and not isinstance(r, self.allowed_exceptions): | ||
raise r | ||
|
||
return [None if isinstance(r, Exception) else prototype.buffer.from_bytes(r) for r in res] | ||
|
||
async def set_partial_values(self, key_start_values: list[tuple[str, int, BytesLike]]) -> None: | ||
raise NotImplementedError | ||
|
||
async def list(self) -> AsyncGenerator[str, None]: | ||
allfiles = await self._fs._find(self.path, detail=False, withdirs=False) | ||
for onefile in (a.replace(self.path + "/", "") for a in allfiles): | ||
yield onefile | ||
|
||
async def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: | ||
prefix = f"{self.path}/{prefix.rstrip('/')}" | ||
try: | ||
allfiles = await self._fs._ls(prefix, detail=False) | ||
except FileNotFoundError: | ||
return | ||
for onefile in (a.replace(prefix + "/", "") for a in allfiles): | ||
yield onefile | ||
|
||
async def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: | ||
for onefile in await self._fs._ls(prefix, detail=False): | ||
yield onefile |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
36 changes: 0 additions & 36 deletions
36
tests/v3/test_store.py → tests/v3/test_store/test_local.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.