From 45c59344883db46bb8e26dbb776f9cdb478b4d62 Mon Sep 17 00:00:00 2001 From: skshetry <18718008+skshetry@users.noreply.github.com> Date: Wed, 10 Jan 2024 20:40:47 +0545 Subject: [PATCH] callbacks: expose DEFAULT_CALLBACK (#1495) --- fsspec/asyn.py | 10 +++++----- fsspec/callbacks.py | 8 ++++---- fsspec/generic.py | 4 ++-- fsspec/implementations/cached.py | 4 ++-- fsspec/implementations/http.py | 6 +++--- fsspec/implementations/reference.py | 4 ++-- fsspec/spec.py | 12 +++++------- 7 files changed, 23 insertions(+), 25 deletions(-) diff --git a/fsspec/asyn.py b/fsspec/asyn.py index 00fecef90..fb4e05e74 100644 --- a/fsspec/asyn.py +++ b/fsspec/asyn.py @@ -11,7 +11,7 @@ from glob import has_magic from typing import TYPE_CHECKING, Iterable -from .callbacks import _DEFAULT_CALLBACK +from .callbacks import DEFAULT_CALLBACK from .exceptions import FSTimeoutError from .implementations.local import LocalFileSystem, make_path_posix, trailing_sep from .spec import AbstractBufferedFile, AbstractFileSystem @@ -205,7 +205,7 @@ def running_async() -> bool: async def _run_coros_in_chunks( coros, batch_size=None, - callback=_DEFAULT_CALLBACK, + callback=DEFAULT_CALLBACK, timeout=None, return_exceptions=False, nofiles=False, @@ -245,7 +245,7 @@ async def _run_coros_in_chunks( asyncio.Task(asyncio.wait_for(c, timeout=timeout)) for c in coros[start : start + batch_size] ] - if callback is not _DEFAULT_CALLBACK: + if callback is not DEFAULT_CALLBACK: [ t.add_done_callback(lambda *_, **__: callback.relative_update(1)) for t in chunk @@ -506,7 +506,7 @@ async def _put( lpath, rpath, recursive=False, - callback=_DEFAULT_CALLBACK, + callback=DEFAULT_CALLBACK, batch_size=None, maxdepth=None, **kwargs, @@ -583,7 +583,7 @@ async def _get( rpath, lpath, recursive=False, - callback=_DEFAULT_CALLBACK, + callback=DEFAULT_CALLBACK, maxdepth=None, **kwargs, ): diff --git a/fsspec/callbacks.py b/fsspec/callbacks.py index 5d34f4d19..e29fe1117 100644 --- a/fsspec/callbacks.py +++ b/fsspec/callbacks.py @@ -68,7 +68,7 @@ def branched(self, path_1, path_2, kwargs): """ self.branch(path_1, path_2, kwargs) # mutate kwargs so that we can force the caller to pass "callback=" explicitly - return kwargs.pop("callback", _DEFAULT_CALLBACK) + return kwargs.pop("callback", DEFAULT_CALLBACK) def branch_coro(self, fn): """ @@ -197,10 +197,10 @@ def as_callback(cls, maybe_callback=None): For the special value of ``None``, return the global instance of ``NoOpCallback``. This is an alternative to including - ``callback=_DEFAULT_CALLBACK`` directly in a method signature. + ``callback=DEFAULT_CALLBACK`` directly in a method signature. """ if maybe_callback is None: - return _DEFAULT_CALLBACK + return DEFAULT_CALLBACK return maybe_callback @@ -297,4 +297,4 @@ def __del__(self): return self.close() -_DEFAULT_CALLBACK = NoOpCallback() +DEFAULT_CALLBACK = _DEFAULT_CALLBACK = NoOpCallback() diff --git a/fsspec/generic.py b/fsspec/generic.py index 20534cf40..ddd093aa1 100644 --- a/fsspec/generic.py +++ b/fsspec/generic.py @@ -8,7 +8,7 @@ from typing import Optional from .asyn import AsyncFileSystem, _run_coros_in_chunks, sync_wrapper -from .callbacks import _DEFAULT_CALLBACK +from .callbacks import DEFAULT_CALLBACK from .core import filesystem, get_filesystem_class, split_protocol, url_to_fs _generic_fs = {} @@ -279,7 +279,7 @@ async def _cp_file( url, url2, blocksize=2**20, - callback=_DEFAULT_CALLBACK, + callback=DEFAULT_CALLBACK, **kwargs, ): fs = _resolve_fs(url, self.method) diff --git a/fsspec/implementations/cached.py b/fsspec/implementations/cached.py index 001fe87a1..b3c43fa69 100644 --- a/fsspec/implementations/cached.py +++ b/fsspec/implementations/cached.py @@ -10,7 +10,7 @@ from typing import TYPE_CHECKING, Any, Callable, ClassVar from fsspec import AbstractFileSystem, filesystem -from fsspec.callbacks import _DEFAULT_CALLBACK +from fsspec.callbacks import DEFAULT_CALLBACK from fsspec.compression import compr from fsspec.core import BaseCache, MMapCache from fsspec.exceptions import BlocksizeMismatchError @@ -607,7 +607,7 @@ def cat( path, recursive=False, on_error="raise", - callback=_DEFAULT_CALLBACK, + callback=DEFAULT_CALLBACK, **kwargs, ): paths = self.expand_path( diff --git a/fsspec/implementations/http.py b/fsspec/implementations/http.py index 87ed3376c..d1bb013b6 100644 --- a/fsspec/implementations/http.py +++ b/fsspec/implementations/http.py @@ -10,7 +10,7 @@ import yarl from fsspec.asyn import AbstractAsyncStreamedFile, AsyncFileSystem, sync, sync_wrapper -from fsspec.callbacks import _DEFAULT_CALLBACK +from fsspec.callbacks import DEFAULT_CALLBACK from fsspec.exceptions import FSTimeoutError from fsspec.spec import AbstractBufferedFile from fsspec.utils import ( @@ -234,7 +234,7 @@ async def _cat_file(self, url, start=None, end=None, **kwargs): return out async def _get_file( - self, rpath, lpath, chunk_size=5 * 2**20, callback=_DEFAULT_CALLBACK, **kwargs + self, rpath, lpath, chunk_size=5 * 2**20, callback=DEFAULT_CALLBACK, **kwargs ): kw = self.kwargs.copy() kw.update(kwargs) @@ -268,7 +268,7 @@ async def _put_file( lpath, rpath, chunk_size=5 * 2**20, - callback=_DEFAULT_CALLBACK, + callback=DEFAULT_CALLBACK, method="post", **kwargs, ): diff --git a/fsspec/implementations/reference.py b/fsspec/implementations/reference.py index cae354dd7..e1f9e9501 100644 --- a/fsspec/implementations/reference.py +++ b/fsspec/implementations/reference.py @@ -17,7 +17,7 @@ import json from ..asyn import AsyncFileSystem -from ..callbacks import _DEFAULT_CALLBACK +from ..callbacks import DEFAULT_CALLBACK from ..core import filesystem, open, split_protocol from ..utils import isfilelike, merge_offset_ranges, other_paths @@ -784,7 +784,7 @@ async def _get_file(self, rpath, lpath, **kwargs): with open(lpath, "wb") as f: f.write(data) - def get_file(self, rpath, lpath, callback=_DEFAULT_CALLBACK, **kwargs): + def get_file(self, rpath, lpath, callback=DEFAULT_CALLBACK, **kwargs): if self.isdir(rpath): return os.makedirs(lpath, exist_ok=True) data = self.cat_file(rpath, **kwargs) diff --git a/fsspec/spec.py b/fsspec/spec.py index cfe1f796d..87fdf395c 100644 --- a/fsspec/spec.py +++ b/fsspec/spec.py @@ -11,7 +11,7 @@ from hashlib import sha256 from typing import ClassVar -from .callbacks import _DEFAULT_CALLBACK +from .callbacks import DEFAULT_CALLBACK from .config import apply_config, conf from .dircache import DirCache from .transaction import Transaction @@ -876,9 +876,7 @@ def cat(self, path, recursive=False, on_error="raise", **kwargs): else: return self.cat_file(paths[0], **kwargs) - def get_file( - self, rpath, lpath, callback=_DEFAULT_CALLBACK, outfile=None, **kwargs - ): + def get_file(self, rpath, lpath, callback=DEFAULT_CALLBACK, outfile=None, **kwargs): """Copy single remote file to local""" from .implementations.local import LocalFileSystem @@ -913,7 +911,7 @@ def get( rpath, lpath, recursive=False, - callback=_DEFAULT_CALLBACK, + callback=DEFAULT_CALLBACK, maxdepth=None, **kwargs, ): @@ -970,7 +968,7 @@ def get( with callback.branched(rpath, lpath, kwargs) as child: self.get_file(rpath, lpath, callback=child, **kwargs) - def put_file(self, lpath, rpath, callback=_DEFAULT_CALLBACK, **kwargs): + def put_file(self, lpath, rpath, callback=DEFAULT_CALLBACK, **kwargs): """Copy single file to remote""" if os.path.isdir(lpath): self.makedirs(rpath, exist_ok=True) @@ -995,7 +993,7 @@ def put( lpath, rpath, recursive=False, - callback=_DEFAULT_CALLBACK, + callback=DEFAULT_CALLBACK, maxdepth=None, **kwargs, ):