Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cat with start/stop ranges #744

Merged
merged 13 commits into from
Sep 21, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions fsspec/asyn.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,26 @@ async def _cat(self, path, recursive=False, on_error="raise", **kwargs):
else:
return out[0]

async def _cat_ranges(self, paths, starts, ends, max_gap=None, **kwargs):
# TODO: on_error
if max_gap is not None:
# to be implemented in utils
raise NotImplementedError
if not isinstance(paths, list):
raise TypeError
if not isinstance(starts, list):
starts = [starts] * len(paths)
if not isinstance(ends, list):
ends = [starts] * len(paths)
if len(starts) != len(paths) or len(ends) != len(paths):
raise ValueError
return await asyncio.gather(
*[
self._cat_file(p, start=s, end=e, **kwargs)
for p, s, e in zip(paths, starts, ends)
]
)

async def _put(
self, lpath, rpath, recursive=False, callback=_DEFAULT_CALLBACK, **kwargs
):
Expand Down
25 changes: 25 additions & 0 deletions fsspec/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,30 @@ def _fetch(self, start, end):
return self.data[start:end]


class KnownPartsOfAFile(BaseCache):
name = "parts"

def __init__(self, blocksize, fetcher, size, data={}, **_):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you are assuming that the user has already read the necessary byte ranges from the file and has used them to populate the values of data before the file is open and this cache is initialized. Am I mistaken?

To clarify, I want to make sure that we can specify only the known byte ranges (not the actual data stored at those byte ranges) when we call fs.open. I would expect the necessary byte ranges to be fetched automatically from the file (using cat_ranges) when this cache is initialized. Does that make sense, or am I misunderstanding the API?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you have it right, the API as I have it here expects the data to be delivered by the caller. I could do as you suggest, and have the class fetch the ranges at instantiation, but fetcher here is not in general (or ever?) asynchronous, and in any case, we expect the reads may come from several files at once. I see your point, though, it would fit better with the current shape of the other cache classes.

super(KnownPartsOfAFile, self).__init__(blocksize, fetcher, size)

# simple consolidation of contiguous blocks
for start0, stop in data.copy():
for start, stop1 in data.copy():
if stop == start:
data[(start0, stop1)] = data.pop((start0, stop)) + data.pop(
(start, stop1)
)
self.data = data

def _fetch(self, start, stop):
for (loc0, loc1), data in self.data.items():
if loc0 <= start < loc1 and loc0 <= stop <= loc1:
off = start - loc0
return data[off : off + stop - start]
# Safety valve if we miss cache - but this should never happen
return self.fetcher(start, stop)


caches = {
"none": BaseCache,
"mmap": MMapCache,
Expand All @@ -430,4 +454,5 @@ def _fetch(self, start, end):
"block": BlockCache,
"first": FirstChunkCache,
"all": AllBytes,
"parts": KnownPartsOfAFile,
}
5 changes: 3 additions & 2 deletions fsspec/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ class SnappyFile(AbstractBufferedFile):
def __init__(self, infile, mode, **kwargs):
import snappy

self.details = {"size": 999999999} # not true, but OK if we don't seek
super().__init__(fs=None, path="snappy", mode=mode.strip("b") + "b", **kwargs)
super().__init__(
fs=None, path="snappy", mode=mode.strip("b") + "b", size=999999999, **kwargs
)
self.infile = infile
if "r" in mode:
self.codec = snappy.StreamDecompressor()
Expand Down
5 changes: 5 additions & 0 deletions fsspec/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .compression import compr
from .registry import filesystem, get_filesystem_class
from .utils import (
_unstrip_protocol,
build_name_function,
infer_compression,
stringify_path,
Expand Down Expand Up @@ -124,6 +125,10 @@ def __del__(self):
if hasattr(self, "fobjects"):
self.fobjects.clear() # may cause cleanup of objects and close files

@property
def full_name(self):
return _unstrip_protocol(self.path, self.fs)

def open(self):
"""Materialise this as a real open file without context

Expand Down
7 changes: 5 additions & 2 deletions fsspec/implementations/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,14 @@ def info(self, path, **kwargs):
"type": "directory",
}
elif path in self.store:
filelike = self.store[path]
return {
"name": path,
"size": self.store[path].getbuffer().nbytes,
"size": filelike.size
if hasattr(filelike, "size")
else filelike.getbuffer().nbytes,
"type": "file",
"created": self.store[path].created,
"created": getattr(filelike, "created", None),
}
else:
raise FileNotFoundError(path)
Expand Down
4 changes: 4 additions & 0 deletions fsspec/implementations/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ def cat_file(self, path, start=None, end=None, **kwargs):
return part_or_url[start:end]
return self.fs.cat_file(part_or_url, start=start0, end=end0)[start:end]

def pipe_file(self, path, value, **_):
"""Temporarily add binary data or reference as a file"""
self.references[path] = value

async def _get_file(self, rpath, lpath, **kwargs):
data = await self._cat_file(rpath)
with open(lpath, "wb") as f:
Expand Down
54 changes: 48 additions & 6 deletions fsspec/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .dircache import DirCache
from .transaction import Transaction
from .utils import (
_unstrip_protocol,
get_package_version_without_import,
other_paths,
read_block,
Expand Down Expand Up @@ -695,18 +696,39 @@ def pipe(self, path, value=None, **kwargs):
else:
raise ValueError("path must be str or dict")

def cat_ranges(self, paths, starts, ends, max_gap=None, **kwargs):
if max_gap is not None:
raise NotImplementedError
if not isinstance(paths, list):
raise TypeError
if not isinstance(starts, list):
starts = [starts] * len(paths)
if not isinstance(ends, list):
ends = [starts] * len(paths)
if len(starts) != len(paths) or len(ends) != len(paths):
raise ValueError
return [self.cat_file(p, s, e) for p, s, e in zip(paths, starts, ends)]

def cat(self, path, recursive=False, on_error="raise", **kwargs):
"""Fetch (potentially multiple) paths' contents

Returns a dict of {path: contents} if there are multiple paths
or the path has been otherwise expanded

Parameters
----------
recursive: bool
If True, assume the path(s) are directories, and get all the
contained files
on_error : "raise", "omit", "return"
If raise, an underlying exception will be raised (converted to KeyError
if the type is in self.missing_exceptions); if omit, keys with exception
will simply not be included in the output; if "return", all keys are
included in the output, but the value will be bytes or an exception
instance.
kwargs: passed to cat_file

Returns
-------
dict of {path: contents} if there are multiple paths
or the path has been otherwise expanded
"""
paths = self.expand_path(path, recursive=recursive)
if (
Expand Down Expand Up @@ -1236,6 +1258,7 @@ class AbstractBufferedFile(io.IOBase):
"""

DEFAULT_BLOCK_SIZE = 5 * 2 ** 20
_details = None

def __init__(
self,
Expand All @@ -1246,6 +1269,7 @@ def __init__(
autocommit=True,
cache_type="readahead",
cache_options=None,
size=None,
**kwargs,
):
"""
Expand All @@ -1269,6 +1293,8 @@ def __init__(
cache_options : dict
Additional options passed to the constructor for the cache specified
by `cache_type`.
size: int
If given and in read mode, suppressed having to look up the file size
kwargs:
Gets stored as self.kwargs
"""
Expand Down Expand Up @@ -1302,9 +1328,10 @@ def __init__(
if mode not in {"ab", "rb", "wb"}:
raise NotImplementedError("File mode not supported")
if mode == "rb":
if not hasattr(self, "details"):
self.details = fs.info(path)
self.size = self.details["size"]
if size is not None:
self.size = size
else:
self.size = self.details["size"]
self.cache = caches[cache_type](
self.blocksize, self._fetch_range, self.size, **cache_options
)
Expand All @@ -1314,6 +1341,21 @@ def __init__(
self.forced = False
self.location = None

@property
def details(self):
if self._details is None:
self._details = self.fs.info(self.path)
return self._details

@details.setter
def details(self, value):
self._details = value
self.size = value["size"]

@property
def full_name(self):
return _unstrip_protocol(self.path, self.fs)

@property
def closed(self):
# get around this attr being read-only in IOBase
Expand Down
9 changes: 9 additions & 0 deletions fsspec/tests/test_caches.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,12 @@ def test_cache_basic(Cache_imp, blocksize, size_requests):
result = cache._fetch(start, end)
expected = string.ascii_letters[start:end].encode()
assert result == expected


def test_known():
c = caches["parts"](None, None, 100, {(10, 20): b"1" * 10, (0, 10): b"0" * 10})
assert (0, 20) in c.data # got consolidated
assert c._fetch(5, 15) == b"0" * 5 + b"1" * 5
with pytest.raises(TypeError):
# tries to call None fetcher
c._fetch(25, 35)
11 changes: 11 additions & 0 deletions fsspec/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,3 +447,14 @@ def setup_logging(logger=None, logger_name=None, level="DEBUG", clear=True):
logger.addHandler(handle)
logger.setLevel(level)
return logger


def _unstrip_protocol(name, fs):
if isinstance(fs.protocol, str):
if name.startswith(fs.protocol):
return name
return fs.protocol + "://" + name
else:
if name.startswith(tuple(fs.protocol)):
return name
return fs.protocol[0] + "://" + name