diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 468bcd8..5c10a14 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -20,7 +20,7 @@ repos: - id: sort-simple-yaml - id: trailing-whitespace - repo: https://github.com/astral-sh/ruff-pre-commit - rev: 'v0.1.7' + rev: 'v0.2.2' hooks: - id: ruff args: [--fix, --exit-non-zero-on-fix] @@ -30,8 +30,3 @@ repos: hooks: - id: codespell additional_dependencies: ["tomli"] - - repo: https://github.com/asottile/pyupgrade - rev: v3.15.0 - hooks: - - id: pyupgrade - args: [--py38-plus] diff --git a/pyproject.toml b/pyproject.toml index 8b782a7..35d9aa4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -103,6 +103,10 @@ module = [ ignore-words-list = "cachable," [tool.ruff] +output-format = "full" +show-fixes = true + +[tool.ruff.lint] ignore = [ "ISC001", # single-line-implicit-string-concatenation "PLR2004", # magic-value-comparison @@ -155,8 +159,6 @@ select = [ "W", # pycodestyle - Warning "YTT", # flake8-2020 ] -show-source = true -show-fixes = true [tool.ruff.lint.flake8-pytest-style] fixture-parentheses = false diff --git a/src/dvc_objects/db.py b/src/dvc_objects/db.py index 8cf4476..ae1d667 100644 --- a/src/dvc_objects/db.py +++ b/src/dvc_objects/db.py @@ -259,14 +259,11 @@ def _oids_with_limit( prefixes: Optional[Iterable[str]] = None, jobs: Optional[int] = None, ) -> Iterator[str]: - count = 0 - for oid in self._list_oids(prefixes=prefixes, jobs=jobs): + for i, oid in enumerate(self._list_oids(prefixes=prefixes, jobs=jobs), start=1): yield oid - count += 1 - if count > limit: + if i > limit: logger.debug( - "`_list_oids()` returned max %r oids, " - "skipping remaining results", + "`_list_oids()` returned max %r oids, skipping remaining results", limit, ) return @@ -348,7 +345,7 @@ def _list_oids_traverse(self, remote_size, remote_oids, jobs=None): yield from self._list_oids(prefixes=traverse_prefixes, jobs=jobs) - def all(self, jobs=None): # noqa: A003 + def all(self, jobs=None): """Iterate over all oids in this fs. Hashes will be fetched in parallel threads according to prefix diff --git a/src/dvc_objects/executors.py b/src/dvc_objects/executors.py index 3208f49..529f162 100644 --- a/src/dvc_objects/executors.py +++ b/src/dvc_objects/executors.py @@ -1,15 +1,8 @@ import asyncio -import queue -import sys from collections.abc import Coroutine, Iterable, Iterator, Sequence from concurrent import futures from itertools import islice -from typing import ( - Any, - Callable, - Optional, - TypeVar, -) +from typing import Any, Callable, Optional, TypeVar from fsspec import Callback @@ -17,18 +10,12 @@ class ThreadPoolExecutor(futures.ThreadPoolExecutor): - _max_workers: int - def __init__( self, max_workers: Optional[int] = None, cancel_on_error: bool = False, **kwargs ): super().__init__(max_workers=max_workers, **kwargs) self._cancel_on_error = cancel_on_error - @property - def max_workers(self) -> int: - return self._max_workers - def imap_unordered( self, fn: Callable[..., _T], *iterables: Iterable[Any] ) -> Iterator[_T]: @@ -36,9 +23,8 @@ def imap_unordered( It does not create all the futures at once to reduce memory usage. """ - it = zip(*iterables) - if self.max_workers == 1: + if self._max_workers == 1: for args in it: yield fn(*args) return @@ -46,42 +32,16 @@ def imap_unordered( def create_taskset(n: int) -> set[futures.Future]: return {self.submit(fn, *args) for args in islice(it, n)} - tasks = create_taskset(self.max_workers * 5) + tasks = create_taskset(self._max_workers * 5) while tasks: done, tasks = futures.wait(tasks, return_when=futures.FIRST_COMPLETED) for fut in done: yield fut.result() tasks.update(create_taskset(len(done))) - def shutdown(self, wait=True, *, cancel_futures=False): - if sys.version_info > (3, 9): - return super().shutdown(wait=wait, cancel_futures=cancel_futures) - else: # noqa: RET505 - with self._shutdown_lock: - self._shutdown = True - if cancel_futures: - # Drain all work items from the queue, and then cancel their - # associated futures. - while True: - try: - work_item = self._work_queue.get_nowait() - except queue.Empty: - break - if work_item is not None: - work_item.future.cancel() - - # Send a wake-up to prevent threads calling - # _work_queue.get(block=True) from permanently blocking. - self._work_queue.put(None) # type: ignore[arg-type] - if wait: - for t in self._threads: - t.join() - def __exit__(self, exc_type, exc_val, exc_tb): - if self._cancel_on_error: - self.shutdown(wait=True, cancel_futures=exc_val is not None) - else: - self.shutdown(wait=True) + cancel_futures = self._cancel_on_error and exc_val is not None + self.shutdown(wait=True, cancel_futures=cancel_futures) return False diff --git a/src/dvc_objects/fs/base.py b/src/dvc_objects/fs/base.py index 919cf0e..cfdf278 100644 --- a/src/dvc_objects/fs/base.py +++ b/src/dvc_objects/fs/base.py @@ -296,7 +296,7 @@ def is_empty(self, path: AnyFSPath) -> bool: return entry["size"] == 0 @overload - def open( # noqa: A003 + def open( self, path: AnyFSPath, mode: Literal["rb", "br", "wb"], @@ -305,7 +305,7 @@ def open( # noqa: A003 return self.open(path, mode, **kwargs) @overload - def open( # noqa: A003 + def open( self, path: AnyFSPath, mode: Literal["r", "rt", "w"] = "r", @@ -313,7 +313,7 @@ def open( # noqa: A003 ) -> "TextIO": ... - def open( # noqa: A003 + def open( self, path: AnyFSPath, mode: str = "r", diff --git a/src/dvc_objects/fs/local.py b/src/dvc_objects/fs/local.py index 50d860f..9c55fc7 100644 --- a/src/dvc_objects/fs/local.py +++ b/src/dvc_objects/fs/local.py @@ -127,7 +127,7 @@ def copy(self, path1, path2, recursive=False, on_error=None, **kwargs): self.rm_file(tmp_info) raise - def open(self, path, mode="r", encoding=None, **kwargs): # noqa: A003 + def open(self, path, mode="r", encoding=None, **kwargs): return open(path, mode=mode, encoding=encoding) # noqa: SIM115 def symlink(self, path1, path2):