diff --git a/fsspec/asyn.py b/fsspec/asyn.py index acc3955af..850a2a627 100644 --- a/fsspec/asyn.py +++ b/fsspec/asyn.py @@ -358,6 +358,26 @@ async def _cat(self, path, recursive=False, on_error="raise", **kwargs): else: return out[0] + async def _cat_ranges(self, paths, starts, ends, max_gap=None, **kwargs): + # TODO: on_error + if max_gap is not None: + # to be implemented in utils + raise NotImplementedError + if not isinstance(paths, list): + raise TypeError + if not isinstance(starts, list): + starts = [starts] * len(paths) + if not isinstance(ends, list): + ends = [starts] * len(paths) + if len(starts) != len(paths) or len(ends) != len(paths): + raise ValueError + return await asyncio.gather( + *[ + self._cat_file(p, start=s, end=e, **kwargs) + for p, s, e in zip(paths, starts, ends) + ] + ) + async def _put( self, lpath, rpath, recursive=False, callback=_DEFAULT_CALLBACK, **kwargs ): diff --git a/fsspec/caching.py b/fsspec/caching.py index 207b1f01c..020e54621 100644 --- a/fsspec/caching.py +++ b/fsspec/caching.py @@ -422,6 +422,32 @@ def _fetch(self, start, end): return self.data[start:end] +class KnownPartsOfAFile(BaseCache): + name = "parts" + + def __init__(self, blocksize, fetcher, size, data={}, **_): + super(KnownPartsOfAFile, self).__init__(blocksize, fetcher, size) + + # simple consolidation of contiguous blocks + for start0, stop in data.copy(): + for start, stop1 in data.copy(): + if stop == start: + data[(start0, stop1)] = data.pop((start0, stop)) + data.pop( + (start, stop1) + ) + self.data = data + + def _fetch(self, start, stop): + for (loc0, loc1), data in self.data.items(): + if loc0 <= start < loc1 and loc0 <= stop <= loc1: + off = start - loc0 + out = data[off : off + stop - start] + # reads beyond buffer are padded with zero + out += b"\x00" * (stop - start - len(out)) + return out + raise ValueError("Read outside of know parts of file") + + caches = { "none": BaseCache, "mmap": MMapCache, @@ -430,4 +456,5 @@ def _fetch(self, start, end): "block": BlockCache, "first": FirstChunkCache, "all": AllBytes, + "parts": KnownPartsOfAFile, } diff --git a/fsspec/compression.py b/fsspec/compression.py index a07473238..7f200d963 100644 --- a/fsspec/compression.py +++ b/fsspec/compression.py @@ -107,8 +107,9 @@ class SnappyFile(AbstractBufferedFile): def __init__(self, infile, mode, **kwargs): import snappy - self.details = {"size": 999999999} # not true, but OK if we don't seek - super().__init__(fs=None, path="snappy", mode=mode.strip("b") + "b", **kwargs) + super().__init__( + fs=None, path="snappy", mode=mode.strip("b") + "b", size=999999999, **kwargs + ) self.infile = infile if "r" in mode: self.codec = snappy.StreamDecompressor() diff --git a/fsspec/core.py b/fsspec/core.py index 720ebd988..5512c479b 100644 --- a/fsspec/core.py +++ b/fsspec/core.py @@ -18,6 +18,7 @@ from .compression import compr from .registry import filesystem, get_filesystem_class from .utils import ( + _unstrip_protocol, build_name_function, infer_compression, stringify_path, @@ -124,6 +125,10 @@ def __del__(self): if hasattr(self, "fobjects"): self.fobjects.clear() # may cause cleanup of objects and close files + @property + def full_name(self): + return _unstrip_protocol(self.path, self.fs) + def open(self): """Materialise this as a real open file without context diff --git a/fsspec/implementations/memory.py b/fsspec/implementations/memory.py index 27e8b0a28..ba7b0612b 100644 --- a/fsspec/implementations/memory.py +++ b/fsspec/implementations/memory.py @@ -139,11 +139,14 @@ def info(self, path, **kwargs): "type": "directory", } elif path in self.store: + filelike = self.store[path] return { "name": path, - "size": self.store[path].getbuffer().nbytes, + "size": filelike.size + if hasattr(filelike, "size") + else filelike.getbuffer().nbytes, "type": "file", - "created": self.store[path].created, + "created": getattr(filelike, "created", None), } else: raise FileNotFoundError(path) diff --git a/fsspec/implementations/reference.py b/fsspec/implementations/reference.py index 21d017fe6..23d85c384 100644 --- a/fsspec/implementations/reference.py +++ b/fsspec/implementations/reference.py @@ -194,6 +194,10 @@ def cat_file(self, path, start=None, end=None, **kwargs): return part_or_url[start:end] return self.fs.cat_file(part_or_url, start=start0, end=end0)[start:end] + def pipe_file(self, path, value, **_): + """Temporarily add binary data or reference as a file""" + self.references[path] = value + async def _get_file(self, rpath, lpath, **kwargs): data = await self._cat_file(rpath) with open(lpath, "wb") as f: diff --git a/fsspec/spec.py b/fsspec/spec.py index 80562ee80..82149566f 100644 --- a/fsspec/spec.py +++ b/fsspec/spec.py @@ -14,6 +14,7 @@ from .dircache import DirCache from .transaction import Transaction from .utils import ( + _unstrip_protocol, get_package_version_without_import, other_paths, read_block, @@ -700,18 +701,39 @@ def pipe(self, path, value=None, **kwargs): else: raise ValueError("path must be str or dict") + def cat_ranges(self, paths, starts, ends, max_gap=None, **kwargs): + if max_gap is not None: + raise NotImplementedError + if not isinstance(paths, list): + raise TypeError + if not isinstance(starts, list): + starts = [starts] * len(paths) + if not isinstance(ends, list): + ends = [starts] * len(paths) + if len(starts) != len(paths) or len(ends) != len(paths): + raise ValueError + return [self.cat_file(p, s, e) for p, s, e in zip(paths, starts, ends)] + def cat(self, path, recursive=False, on_error="raise", **kwargs): """Fetch (potentially multiple) paths' contents - Returns a dict of {path: contents} if there are multiple paths - or the path has been otherwise expanded - + Parameters + ---------- + recursive: bool + If True, assume the path(s) are directories, and get all the + contained files on_error : "raise", "omit", "return" If raise, an underlying exception will be raised (converted to KeyError if the type is in self.missing_exceptions); if omit, keys with exception will simply not be included in the output; if "return", all keys are included in the output, but the value will be bytes or an exception instance. + kwargs: passed to cat_file + + Returns + ------- + dict of {path: contents} if there are multiple paths + or the path has been otherwise expanded """ paths = self.expand_path(path, recursive=recursive) if ( @@ -1241,6 +1263,7 @@ class AbstractBufferedFile(io.IOBase): """ DEFAULT_BLOCK_SIZE = 5 * 2 ** 20 + _details = None def __init__( self, @@ -1251,6 +1274,7 @@ def __init__( autocommit=True, cache_type="readahead", cache_options=None, + size=None, **kwargs, ): """ @@ -1274,6 +1298,8 @@ def __init__( cache_options : dict Additional options passed to the constructor for the cache specified by `cache_type`. + size: int + If given and in read mode, suppressed having to look up the file size kwargs: Gets stored as self.kwargs """ @@ -1307,9 +1333,10 @@ def __init__( if mode not in {"ab", "rb", "wb"}: raise NotImplementedError("File mode not supported") if mode == "rb": - if not hasattr(self, "details"): - self.details = fs.info(path) - self.size = self.details["size"] + if size is not None: + self.size = size + else: + self.size = self.details["size"] self.cache = caches[cache_type]( self.blocksize, self._fetch_range, self.size, **cache_options ) @@ -1319,6 +1346,21 @@ def __init__( self.forced = False self.location = None + @property + def details(self): + if self._details is None: + self._details = self.fs.info(self.path) + return self._details + + @details.setter + def details(self, value): + self._details = value + self.size = value["size"] + + @property + def full_name(self): + return _unstrip_protocol(self.path, self.fs) + @property def closed(self): # get around this attr being read-only in IOBase diff --git a/fsspec/tests/test_caches.py b/fsspec/tests/test_caches.py index 99ffeccc6..9dac33056 100644 --- a/fsspec/tests/test_caches.py +++ b/fsspec/tests/test_caches.py @@ -44,7 +44,10 @@ def letters_fetcher(start, end): return string.ascii_letters[start:end].encode() -@pytest.fixture(params=caches.values(), ids=list(caches.keys())) +not_parts_caches = {k: v for k, v in caches.items() if k != "parts"} + + +@pytest.fixture(params=not_parts_caches.values(), ids=list(not_parts_caches)) def Cache_imp(request): return request.param @@ -91,3 +94,12 @@ def test_cache_basic(Cache_imp, blocksize, size_requests): result = cache._fetch(start, end) expected = string.ascii_letters[start:end].encode() assert result == expected + + +def test_known(): + c = caches["parts"](None, None, 100, {(10, 20): b"1" * 10, (0, 10): b"0" * 10}) + assert (0, 20) in c.data # got consolidated + assert c._fetch(5, 15) == b"0" * 5 + b"1" * 5 + with pytest.raises(ValueError): + # tries to call None fetcher + c._fetch(25, 35) diff --git a/fsspec/utils.py b/fsspec/utils.py index 235c8799b..1f939fc65 100644 --- a/fsspec/utils.py +++ b/fsspec/utils.py @@ -450,6 +450,17 @@ def setup_logging(logger=None, logger_name=None, level="DEBUG", clear=True): return logger +def _unstrip_protocol(name, fs): + if isinstance(fs.protocol, str): + if name.startswith(fs.protocol): + return name + return fs.protocol + "://" + name + else: + if name.startswith(tuple(fs.protocol)): + return name + return fs.protocol[0] + "://" + name + + def mirror_from(origin_name, methods): """Mirror attributes and methods from the given origin_name attribute of the instance to the