diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d50eaa7ae..bd10f82cb 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -30,4 +30,4 @@ repos: rev: v0.770 hooks: - id: mypy - exclude: (docs/.*|src/bandersnatch/tests/.*|src/bandersnatch_filter_plugins/.*|src/bandersnatch_storage_plugins/.*) + exclude: (docs/.*|src/bandersnatch/tests/.*) diff --git a/src/bandersnatch_filter_plugins/blacklist_name.py b/src/bandersnatch_filter_plugins/blacklist_name.py index 556b0e248..060079249 100644 --- a/src/bandersnatch_filter_plugins/blacklist_name.py +++ b/src/bandersnatch_filter_plugins/blacklist_name.py @@ -1,5 +1,5 @@ import logging -from typing import List +from typing import Any, Dict, List, Set from packaging.requirements import Requirement from packaging.version import InvalidVersion, Version @@ -14,7 +14,7 @@ class BlacklistProject(FilterProjectPlugin): # Requires iterable default blacklist_package_names: List[str] = [] - def initialize_plugin(self): + def initialize_plugin(self) -> None: """ Initialize the plugin """ @@ -28,7 +28,7 @@ def initialize_plugin(self): + f"{self.blacklist_package_names}" ) - def _determine_filtered_package_names(self): + def _determine_filtered_package_names(self) -> List[str]: """ Return a list of package names to be filtered base on the configuration file. @@ -37,7 +37,7 @@ def _determine_filtered_package_names(self): # configuration contains a PEP440 specifier it will be processed by the # blacklist release filter. So we need to remove any packages that # are not applicable for this plugin. - filtered_packages = set() + filtered_packages: Set[str] = set() try: lines = self.configuration["blacklist"]["packages"] package_lines = lines.split("\n") @@ -61,10 +61,10 @@ def _determine_filtered_package_names(self): logger.debug("Project blacklist is %r", list(filtered_packages)) return list(filtered_packages) - def filter(self, metadata): + def filter(self, metadata: Dict) -> bool: return not self.check_match(name=metadata["info"]["name"]) - def check_match(self, **kwargs): + def check_match(self, **kwargs: Any) -> bool: """ Check if the package name matches against a project that is blacklisted in the configuration. @@ -95,7 +95,7 @@ class BlacklistRelease(FilterReleasePlugin): # Requires iterable default blacklist_package_names: List[Requirement] = [] - def initialize_plugin(self): + def initialize_plugin(self) -> None: """ Initialize the plugin """ @@ -111,7 +111,7 @@ def initialize_plugin(self): + f"{self.blacklist_release_requirements}" ) - def _determine_filtered_package_requirements(self): + def _determine_filtered_package_requirements(self) -> List[Requirement]: """ Parse the configuration file for [blacklist]packages @@ -120,7 +120,7 @@ def _determine_filtered_package_requirements(self): list of packaging.requirements.Requirement For all PEP440 package specifiers """ - filtered_requirements = set() + filtered_requirements: Set[Requirement] = set() try: lines = self.configuration["blacklist"]["packages"] package_lines = lines.split("\n") @@ -133,7 +133,7 @@ def _determine_filtered_package_requirements(self): filtered_requirements.add(Requirement(package_line)) return list(filtered_requirements) - def filter(self, metadata): + def filter(self, metadata: Dict) -> bool: name = metadata["info"]["name"] releases = metadata["releases"] for version in list(releases.keys()): @@ -144,7 +144,7 @@ def filter(self, metadata): else: return True - def _check_match(self, name, version_string) -> bool: + def _check_match(self, name: str, version_string: str) -> bool: """ Check if the package name and version matches against a blacklisted package version specifier. diff --git a/src/bandersnatch_filter_plugins/filename_name.py b/src/bandersnatch_filter_plugins/filename_name.py index 058dd76ee..b94dbd5d0 100644 --- a/src/bandersnatch_filter_plugins/filename_name.py +++ b/src/bandersnatch_filter_plugins/filename_name.py @@ -1,5 +1,5 @@ import logging -from typing import List +from typing import Dict, List from bandersnatch.filter import FilterReleasePlugin @@ -29,7 +29,7 @@ class ExcludePlatformFilter(FilterReleasePlugin): "manylinux2010_x86_64", # PEP 571 ] - def initialize_plugin(self): + def initialize_plugin(self) -> None: """ Initialize the plugin reading patterns from the config. """ @@ -77,7 +77,7 @@ def initialize_plugin(self): logger.info(f"Initialized {self.name} plugin with {self._patterns!r}") - def filter(self, metadata): + def filter(self, metadata: Dict) -> bool: releases = metadata["releases"] """ Remove files from `releases` that match any pattern. @@ -104,13 +104,13 @@ def filter(self, metadata): else: return True - def _check_match(self, file_desc) -> bool: + def _check_match(self, file_desc: Dict) -> bool: """ Check if a release version matches any of the specified patterns. Parameters ========== - name: file_desc + file_desc: Dict file description entry Returns diff --git a/src/bandersnatch_filter_plugins/latest_name.py b/src/bandersnatch_filter_plugins/latest_name.py index 6612c357b..9b8d079b9 100644 --- a/src/bandersnatch_filter_plugins/latest_name.py +++ b/src/bandersnatch_filter_plugins/latest_name.py @@ -1,5 +1,6 @@ import logging from operator import itemgetter +from typing import Dict, List, Optional, Set, Union from packaging.version import parse @@ -16,7 +17,7 @@ class LatestReleaseFilter(FilterReleasePlugin): name = "latest_release" keep = 0 # by default, keep 'em all - def initialize_plugin(self): + def initialize_plugin(self) -> None: """ Initialize the plugin reading patterns from the config. """ @@ -32,22 +33,25 @@ def initialize_plugin(self): if self.keep > 0: logger.info(f"Initialized latest releases plugin with keep={self.keep}") - def filter(self, metadata): + def filter( # type: ignore[override] + self, metadata: Dict + ) -> Optional[bool]: """ Keep the latest releases """ + latest: Union[List, Set] info = metadata["info"] releases = metadata["releases"] if self.keep == 0: - return + return None versions = list(releases.keys()) before = len(versions) if before <= self.keep: # not enough releases: do nothing - return + return None versions_pair = map(lambda v: (parse(v), v), versions) latest = sorted(versions_pair)[-self.keep :] # noqa: E203 diff --git a/src/bandersnatch_filter_plugins/metadata_filter.py b/src/bandersnatch_filter_plugins/metadata_filter.py index 8b4ad21f6..e87207c8a 100644 --- a/src/bandersnatch_filter_plugins/metadata_filter.py +++ b/src/bandersnatch_filter_plugins/metadata_filter.py @@ -1,6 +1,7 @@ import logging import re -from typing import Dict, List +from configparser import SectionProxy +from typing import Any, Dict, List from packaging.specifiers import SpecifierSet from packaging.version import parse @@ -25,12 +26,12 @@ class RegexFilter(Filter): initilized = False patterns: Dict = {} - def initialize_plugin(self): + def initialize_plugin(self) -> None: """ Initialize the plugin reading patterns from the config. """ try: - config = self.configuration[self.name] + config: SectionProxy = self.configuration[self.name] except KeyError: return else: @@ -57,7 +58,7 @@ def filter(self, metadata: Dict) -> bool: # Walk through keys of patterns dict and return True iff all match return all(self._match_node_at_path(k, metadata) for k in self.patterns) - def _match_node_at_path(self, key: str, metadata): + def _match_node_at_path(self, key: str, metadata: Dict) -> bool: # Grab any tags prepended to key tags = key.split(":") @@ -94,21 +95,23 @@ def _match_node_at_path(self, key: str, metadata): else: return self._match_any_patterns(key, node, nulls_match=nulls_match) - def _find_element_by_dotted_path(self, path, metadata): + def _find_element_by_dotted_path(self, path: str, metadata: Dict) -> List: # Walk our metadata structure following dotted path. - path = path.split(".") + split_path = path.split(".") node = metadata - for p in path: + for p in split_path: if p in node and node[p] is not None: node = node[p] else: return [] - if isinstance(node, list): - return node + if isinstance(node, list): # type: ignore + return node # type: ignore else: return [node] - def _match_any_patterns(self, key: str, values: List, nulls_match=True): + def _match_any_patterns( + self, key: str, values: List[str], nulls_match: bool = True + ) -> bool: results = [] for pattern in self.patterns[key]: if nulls_match and not values: @@ -118,7 +121,9 @@ def _match_any_patterns(self, key: str, values: List, nulls_match=True): results.append(pattern.match(value)) return any(results) - def _match_all_patterns(self, key: str, values: List, nulls_match=True): + def _match_all_patterns( + self, key: str, values: List[str], nulls_match: bool = True + ) -> bool: results = [] for pattern in self.patterns[key]: if nulls_match and not values: @@ -127,14 +132,16 @@ def _match_all_patterns(self, key: str, values: List, nulls_match=True): results.append(any(pattern.match(v) for v in values)) return all(results) - def _match_none_patterns(self, key: str, values: List, nulls_match=True): + def _match_none_patterns( + self, key: str, values: List[str], nulls_match: bool = True + ) -> bool: return not self._match_any_patterns(key, values) class RegexProjectMetadataFilter(FilterMetadataPlugin, RegexFilter): """ Plugin to download only packages having metadata matching - at least one of the specified patterns. + at least one of the specified patterns. """ name = "regex_project_metadata" @@ -143,10 +150,10 @@ class RegexProjectMetadataFilter(FilterMetadataPlugin, RegexFilter): initilized = False patterns: Dict = {} - def initilize_plugin(self): + def initilize_plugin(self) -> None: RegexFilter.initialize_plugin(self) - def filter(self, metadata: dict) -> bool: + def filter(self, metadata: Dict) -> bool: return RegexFilter.filter(self, metadata) @@ -162,10 +169,10 @@ class RegexReleaseFileMetadataFilter(FilterReleaseFilePlugin, RegexFilter): initilized = False patterns: Dict = {} - def initilize_plugin(self): + def initilize_plugin(self) -> None: RegexFilter.initialize_plugin(self) - def filter(self, metadata: dict) -> bool: + def filter(self, metadata: Dict) -> bool: return RegexFilter.filter(self, metadata) @@ -180,12 +187,14 @@ class VersionRangeFilter(Filter): specifiers: Dict = {} nulls_match = True - def initialize_plugin(self): + def initialize_plugin(self) -> None: """ Initialize the plugin reading version ranges from the config. """ try: - config = self.configuration["version_range_release_file_metadata"] + config: SectionProxy = self.configuration[ + "version_range_release_file_metadata" + ] except KeyError: return else: @@ -212,11 +221,11 @@ def filter(self, metadata: Dict) -> bool: return all(self._match_node_at_path(k, metadata) for k in self.specifiers) - def _find_element_by_dotted_path(self, path, metadata): + def _find_element_by_dotted_path(self, path: str, metadata: Dict) -> Any: # Walk our metadata structure following dotted path. - path = path.split(".") + split_path = path.split(".") node = metadata - for p in path: + for p in split_path: if p in node and node[p] is not None: node = node[p] else: @@ -224,7 +233,7 @@ def _find_element_by_dotted_path(self, path, metadata): return node - def _match_node_at_path(self, key: str, metadata): + def _match_node_at_path(self, key: str, metadata: Dict) -> bool: # Grab any tags prepended to key tags = key.split(":") @@ -252,7 +261,7 @@ def _match_node_at_path(self, key: str, metadata): # Check if SpeciferSet matches target versions # TODO: Figure out proper intersection of SpecifierSets - ospecs = SpecifierSet(node) + ospecs: SpecifierSet = SpecifierSet(node) ispecs = self.specifiers[key] if any(ospecs.contains(ispec, prereleases=True) for ispec in ispecs): return True @@ -274,7 +283,7 @@ class VersionRangeProjectMetadataFilter(FilterMetadataPlugin, VersionRangeFilter specifiers: Dict = {} nulls_match = True - def initialize_plugin(self): + def initialize_plugin(self) -> None: VersionRangeFilter.initialize_plugin(self) def filter(self, metadata: dict) -> bool: @@ -294,7 +303,7 @@ class VersionRangeReleaseFileMetadataFilter( specifiers: Dict = {} nulls_match = True - def initialize_plugin(self): + def initialize_plugin(self) -> None: VersionRangeFilter.initialize_plugin(self) def filter(self, metadata: dict) -> bool: diff --git a/src/bandersnatch_filter_plugins/prerelease_name.py b/src/bandersnatch_filter_plugins/prerelease_name.py index fa1a9b5cc..a7abe6f20 100644 --- a/src/bandersnatch_filter_plugins/prerelease_name.py +++ b/src/bandersnatch_filter_plugins/prerelease_name.py @@ -1,5 +1,6 @@ import logging import re +from typing import Dict from bandersnatch.filter import FilterReleasePlugin @@ -20,7 +21,7 @@ class PreReleaseFilter(FilterReleasePlugin): ) patterns = None - def initialize_plugin(self): + def initialize_plugin(self) -> None: """ Initialize the plugin reading patterns from the config. """ @@ -31,13 +32,13 @@ def initialize_plugin(self): ] logger.info(f"Initialized prerelease plugin with {self.patterns}") - def filter(self, metadata): + def filter(self, metadata: Dict) -> bool: """ Remove all release versions that match any of the specified patterns. """ releases = metadata["releases"] for version in list(releases.keys()): - if any(pattern.match(version) for pattern in self.patterns): + if any(pattern.match(version) for pattern in self.patterns): # type: ignore del releases[version] if not releases: return False diff --git a/src/bandersnatch_filter_plugins/regex_name.py b/src/bandersnatch_filter_plugins/regex_name.py index 7b24c1073..1de01e038 100644 --- a/src/bandersnatch_filter_plugins/regex_name.py +++ b/src/bandersnatch_filter_plugins/regex_name.py @@ -1,6 +1,6 @@ import logging import re -from typing import List, Pattern +from typing import Dict, List, Pattern from bandersnatch.filter import FilterProjectPlugin, FilterReleasePlugin @@ -16,7 +16,7 @@ class RegexReleaseFilter(FilterReleasePlugin): # Has to be iterable to ensure it works with any() patterns: List[Pattern] = [] - def initialize_plugin(self): + def initialize_plugin(self) -> None: """ Initialize the plugin reading patterns from the config. """ @@ -34,7 +34,7 @@ def initialize_plugin(self): logger.info(f"Initialized regex release plugin with {self.patterns}") - def filter(self, metadata): + def filter(self, metadata: Dict) -> bool: """ Remove all release versions that match any of the specified patterns. """ @@ -58,7 +58,7 @@ class RegexProjectFilter(FilterProjectPlugin): # Has to be iterable to ensure it works with any() patterns: List[Pattern] = [] - def initialize_plugin(self): + def initialize_plugin(self) -> None: """ Initialize the plugin reading patterns from the config. """ @@ -75,10 +75,10 @@ def initialize_plugin(self): logger.info(f"Initialized regex release plugin with {self.patterns}") - def filter(self, metadata): + def filter(self, metadata: Dict) -> bool: return not self.check_match(name=metadata["info"]["name"]) - def check_match(self, name): + def check_match(self, name: str) -> bool: # type: ignore[override] """ Check if a release version matches any of the specified patterns. diff --git a/src/bandersnatch_filter_plugins/whitelist_name.py b/src/bandersnatch_filter_plugins/whitelist_name.py index 39b7aef8b..4a8905cd6 100644 --- a/src/bandersnatch_filter_plugins/whitelist_name.py +++ b/src/bandersnatch_filter_plugins/whitelist_name.py @@ -1,5 +1,5 @@ import logging -from typing import List +from typing import Any, Dict, List, Set from bandersnatch.filter import FilterProjectPlugin @@ -11,7 +11,7 @@ class WhitelistProject(FilterProjectPlugin): # Requires iterable default whitelist_package_names: List[str] = [] - def initialize_plugin(self): + def initialize_plugin(self) -> None: """ Initialize the plugin """ @@ -25,7 +25,7 @@ def initialize_plugin(self): + f"{self.whitelist_package_names}" ) - def _determine_unfiltered_package_names(self): + def _determine_unfiltered_package_names(self) -> List[str]: """ Return a list of package names to be filtered base on the configuration file. @@ -34,7 +34,7 @@ def _determine_unfiltered_package_names(self): # configuration contains a PEP440 specifier it will be processed by the # blacklist release filter. So we need to remove any packages that # are not applicable for this plugin. - unfiltered_packages = set() + unfiltered_packages: Set[str] = set() try: lines = self.configuration["whitelist"]["packages"] package_lines = lines.split("\n") @@ -47,10 +47,10 @@ def _determine_unfiltered_package_names(self): unfiltered_packages.add(package_line) return list(unfiltered_packages) - def filter(self, metadata): + def filter(self, metadata: Dict) -> bool: return not self.check_match(name=metadata["info"]["name"]) - def check_match(self, **kwargs): + def check_match(self, **kwargs: Any) -> bool: """ Check if the package name matches against a project that is blacklisted in the configuration. diff --git a/src/bandersnatch_storage_plugins/filesystem.py b/src/bandersnatch_storage_plugins/filesystem.py index 9a51fd831..f9b95dcbd 100644 --- a/src/bandersnatch_storage_plugins/filesystem.py +++ b/src/bandersnatch_storage_plugins/filesystem.py @@ -19,10 +19,10 @@ class FilesystemStorage(StoragePlugin): name = "filesystem" PATH_BACKEND: Type[pathlib.Path] = pathlib.Path - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) - def get_lock(self, path: str = None) -> filelock.FileLock: + def get_lock(self, path: Optional[str] = None) -> filelock.FileLock: """ Retrieve the appropriate `FileLock` backend for this storage plugin @@ -164,14 +164,14 @@ def open_file( # noqa def read_file( self, path: PATH_TYPES, - text=True, + text: bool = True, encoding: str = "utf-8", errors: Optional[str] = None, ) -> Union[str, bytes]: - """Return the contents of the requested file, either a a bytestring or a unicode + """Return the contents of the requested file, either a bytestring or a unicode string depending on whether **text** is True""" with self.open_file(path, text=text, encoding=encoding) as fh: - contents = fh.read() + contents: Union[str, bytes] = fh.read() return contents def delete_file(self, path: PATH_TYPES, dry_run: bool = False) -> int: @@ -261,4 +261,4 @@ def get_hash(self, path: PATH_TYPES, function: str = "sha256") -> str: h.update(chunk) digest = h.hexdigest() logger.debug(f"Calculated digest: {digest!s}") - return h.hexdigest() + return str(h.hexdigest()) diff --git a/src/bandersnatch_storage_plugins/swift.py b/src/bandersnatch_storage_plugins/swift.py index 1c7585a32..ee497e1f4 100644 --- a/src/bandersnatch_storage_plugins/swift.py +++ b/src/bandersnatch_storage_plugins/swift.py @@ -11,7 +11,19 @@ import re import sys import tempfile -from typing import IO, Any, Dict, Generator, List, Optional, Sequence, Type, Union +from typing import ( + IO, + Any, + Dict, + Generator, + List, + NoReturn, + Optional, + Sequence, + Tuple, + Type, + Union, +) import filelock import keystoneauth1 @@ -41,7 +53,7 @@ def __init__( backend: Optional["SwiftStorage"] = None, ) -> None: # The path to the lock file. - self.backend = backend + self.backend: Optional["SwiftStorage"] = backend self._lock_file_fd: Optional["SwiftPath"] super().__init__(lock_file, timeout=timeout) @@ -84,7 +96,7 @@ def is_locked(self) -> bool: # TODO: Refactor this out into reusable base class? -class _SwiftAccessor: # type: ignore +class _SwiftAccessor: BACKEND: "SwiftStorage" @classmethod @@ -93,16 +105,16 @@ def register_backend(cls, backend: "SwiftStorage") -> None: return @staticmethod - def stat(target): + def stat(target: str) -> NoReturn: raise NotImplementedError("stat() not available on this system") @staticmethod - def lstat(target): + def lstat(target: str) -> NoReturn: raise NotImplementedError("lstat() not available on this system") @staticmethod - def open(*args, **kwargs) -> IO: - context = contextlib.ExitStack() + def open(*args: Any, **kwargs: Any) -> IO: + context: contextlib.ExitStack = contextlib.ExitStack() fh: IO = context.enter_context( _SwiftAccessor.BACKEND.open_file(*args, **kwargs) ) @@ -129,22 +141,22 @@ def listdir(target: str) -> List[str]: return results @staticmethod - def scandir(target): + def scandir(target: str) -> NoReturn: raise NotImplementedError("scandir() is not available on this platform") @staticmethod - def chmod(target): + def chmod(target: str) -> NoReturn: raise NotImplementedError("chmod() is not available on this platform") - def lchmod(self, pathobj, mode): + def lchmod(self, pathobj: str, mode: int) -> NoReturn: raise NotImplementedError("lchmod() not available on this system") @staticmethod - def mkdir(*args, **kwargs) -> None: + def mkdir(*args: Any, **kwargs: Any) -> None: return _SwiftAccessor.BACKEND.mkdir(*args, **kwargs) @staticmethod - def unlink(*args, **kwargs) -> None: + def unlink(*args: Any, **kwargs: Any) -> None: missing_ok = kwargs.pop("missing_ok", False) try: _SwiftAccessor.BACKEND.delete_file(*args, **kwargs) @@ -155,11 +167,11 @@ def unlink(*args, **kwargs) -> None: return None @staticmethod - def link(*args, **kwargs) -> None: + def link(*args: Any, **kwargs: Any) -> None: return _SwiftAccessor.BACKEND.copy_file(*args, **kwargs) @staticmethod - def rmdir(*args, **kwargs) -> None: + def rmdir(*args: Any, **kwargs: Any) -> None: try: _SwiftAccessor.BACKEND.rmdir(*args, **kwargs) except OSError: @@ -167,26 +179,32 @@ def rmdir(*args, **kwargs) -> None: return None @staticmethod - def rename(*args, **kwargs) -> None: + def rename(*args: Any, **kwargs: Any) -> None: return _SwiftAccessor.BACKEND.copy_file(*args, **kwargs) @staticmethod - def replace(*args, **kwargs) -> None: + def replace(*args: Any, **kwargs: Any) -> None: return _SwiftAccessor.BACKEND.copy_file(*args, **kwargs) @staticmethod - def symlink(a, b, target_is_directory=False, src_container=None, src_account=None): + def symlink( + a: PATH_TYPES, + b: PATH_TYPES, + target_is_directory: bool = False, + src_container: Optional[str] = None, + src_account: Optional[str] = None, + ) -> None: return _SwiftAccessor.BACKEND.symlink( a, b, src_container=src_container, src_account=src_account ) @staticmethod - def utime(target) -> None: + def utime(target: str) -> None: return _SwiftAccessor.BACKEND.update_timestamp(target) # Helper for resolve() - def readlink(self, path): - return path + def readlink(self, path: str) -> str: + return str(path) _swift_accessor: Type[_SwiftAccessor] @@ -208,7 +226,7 @@ class SwiftPath(pathlib.Path): "_closed", ) - def __new__(cls, *args) -> "SwiftPath": + def __new__(cls, *args: Any) -> "SwiftPath": self = cls._from_parts(args, init=False) self._init() return self @@ -229,7 +247,7 @@ def __str__(self) -> str: self._parts, # type: ignore ) or "." - ) # type: ignore + ) return self._str def __fspath__(self) -> str: @@ -238,7 +256,7 @@ def __fspath__(self) -> str: def __bytes__(self) -> bytes: """Return the bytes representation of the path. This is only recommended to use under Unix.""" - return os.fsencode(self) + return os.fsencode(str(self)) def __repr__(self) -> str: return f"{self.__class__.__name__}({self.as_posix()!r})" @@ -250,10 +268,10 @@ def _make_child_relpath(self, part: str) -> "SwiftPath": return self._from_parsed_parts(self._drv, self._root, parts) # type: ignore @classmethod - def _parse_args(cls, args: Sequence[str]) -> "SwiftPath": + def _parse_args(cls, args: Sequence[str]) -> Tuple[Optional[str], str, List[str]]: # This is useful when you don't want to create an instance, just # canonicalize some constructor arguments. - parts = [] + parts: List[str] = [] for a in args: a = os.fspath(a) if isinstance(a, str): @@ -267,10 +285,10 @@ def _parse_args(cls, args: Sequence[str]) -> "SwiftPath": # Modification to prevent us from starting swift paths with "/" if parts[0].startswith("/"): parts[0] = parts[0].lstrip("/") - return cls._flavour.parse_parts(parts) + return cls._flavour.parse_parts(parts) # type: ignore @classmethod - def _from_parts(cls, args, init=True): + def _from_parts(cls, args: Sequence[str], init: bool = True) -> "SwiftPath": # We need to call _parse_args on the instance, so as to get the # right flavour. self = object.__new__(cls) @@ -280,19 +298,19 @@ def _from_parts(cls, args, init=True): self._parts = parts if init: self._init() - return self + return self # type: ignore @classmethod def _from_parsed_parts( cls, drv: Optional[str], root: str, parts: List[str], init: bool = True - ): + ) -> "SwiftPath": self = object.__new__(cls) self._drv = drv self._root = root self._parts = parts if init: self._init() - return self + return self # type: ignore @classmethod def register_backend(cls, backend: "SwiftStorage") -> None: @@ -307,8 +325,8 @@ def backend(self) -> "SwiftStorage": def absolute(self) -> "SwiftPath": return self - def touch(self): - return self.write_bytes(b"") + def touch(self) -> None: # type: ignore[override] + self.write_bytes(b"") def is_dir(self) -> bool: target_path = str(self) @@ -318,7 +336,7 @@ def is_dir(self) -> bool: and not target_path.endswith(self._flavour.sep) ): target_path = f"{target_path}{self._flavour.sep}" - files = [] + files: List[str] = [] with self.backend.connection() as conn: try: _, files = conn.get_container( @@ -367,9 +385,9 @@ def write_text( ) return 0 - def symlink_to( # type: ignore + def symlink_to( self, - src: str, + src: PATH_TYPES, target_is_directory: bool = False, src_container: Optional[str] = None, src_account: Optional[str] = None, @@ -380,7 +398,7 @@ def symlink_to( # type: ignore """ self._accessor.symlink( src, - self, + str(self), target_is_directory=target_is_directory, src_account=src_account, src_container=src_container, @@ -402,8 +420,8 @@ def read_bytes(self) -> bytes: assert isinstance(result, bytes) return result - def unlink(self, missing_ok=False) -> None: - self._accessor.unlink(self, missing_ok=missing_ok) + def unlink(self, missing_ok: bool = False) -> None: + self._accessor.unlink(str(self), missing_ok=missing_ok) def iterdir( self, @@ -431,7 +449,7 @@ class SwiftStorage(StoragePlugin): PATH_BACKEND = SwiftPath @property - def directory(self): + def directory(self) -> str: try: return self.configuration.get("mirror", "directory") except configparser.NoOptionError: @@ -464,7 +482,7 @@ def initialize_plugin(self) -> None: ), "password": self.get_config_value("password", "OS_PASSWORD"), } - os_options = {} + os_options: Dict[str, Any] = {} user_id = self.get_config_value("username", "OS_USER_ID", "OS_USERNAME") project = self.get_config_value( "project_name", "OS_PROJECT_NAME", "OS_TENANT_NAME" @@ -498,7 +516,7 @@ def initialize_plugin(self) -> None: global _swift_accessor _swift_accessor = _SwiftAccessor - def get_lock(self, path: str = None) -> SwiftFileLock: + def get_lock(self, path: Optional[str] = None) -> SwiftFileLock: """ Retrieve the appropriate `FileLock` backend for this storage plugin @@ -541,7 +559,7 @@ def connection(self) -> Generator[swiftclient.client.Connection, None, None]: ) as swift_conn: yield swift_conn - def get_container(self, container: str = None) -> List[Dict[str, str]]: + def get_container(self, container: Optional[str] = None) -> List[Dict[str, str]]: """ Given the name of a container, return its contents. @@ -575,7 +593,7 @@ def get_container(self, container: str = None) -> List[Dict[str, str]]: container = self.default_container with self.connection() as conn: _, container_instance = conn.get_container(container) - return container_instance + return container_instance # type: ignore def get_object(self, container_name: str, file_path: str) -> bytes: """Retrieve an object from swift, base64 decoding the contents.""" @@ -587,7 +605,7 @@ def get_object(self, container_name: str, file_path: str) -> bytes: else: if len(file_contents) % 4 == 0 and BASE64_RE.fullmatch(file_contents): return base64.b64decode(file_contents) - return file_contents + return bytes(file_contents) def walk( self, @@ -601,7 +619,6 @@ def walk( if conn is None: conn = stack.enter_context(self.connection()) _, paths = conn.get_container(self.default_container, prefix=str(root)) - results = [] for p in paths: if "subdir" in p and dirs: results.append(self.PATH_BACKEND(p["subdir"])) @@ -692,7 +709,7 @@ def copy_local_file(self, source: PATH_TYPES, dest: PATH_TYPES) -> None: return def copy_file( - self, source: PATH_TYPES, dest: PATH_TYPES, dest_container: str = None + self, source: PATH_TYPES, dest: PATH_TYPES, dest_container: Optional[str] = None ) -> None: """Copy a file from **source** to **dest**""" if dest_container is None: @@ -727,7 +744,9 @@ def write_file( return @contextlib.contextmanager - def open_file(self, path: PATH_TYPES, text=True) -> Generator[IO, None, None]: + def open_file( + self, path: PATH_TYPES, text: bool = True + ) -> Generator[IO, None, None]: """Yield a file context to iterate over. If text is false, open the file with 'rb' mode specified.""" wrapper = io.StringIO if text else io.BytesIO @@ -737,7 +756,7 @@ def open_file(self, path: PATH_TYPES, text=True) -> Generator[IO, None, None]: def read_file( self, path: PATH_TYPES, - text=True, + text: bool = True, encoding: str = "utf-8", errors: Optional[str] = None, ) -> Union[str, bytes]: @@ -749,7 +768,7 @@ def read_file( errors = sys.getfilesystemencodeerrors() # type: ignore except AttributeError: errors = "surrogateescape" - kwargs = {} + kwargs: Dict[str, Any] = {} if errors: kwargs["errors"] = errors content = self.get_object(self.default_container, str(path)) @@ -784,7 +803,7 @@ def mkdir( ) if not isinstance(path, self.PATH_BACKEND): path = self.PATH_BACKEND(path) - return path.joinpath(".swiftkeep").touch() + path.joinpath(".swiftkeep").touch() def rmdir( self, @@ -839,7 +858,7 @@ def is_dir(self, path: PATH_TYPES) -> bool: target_path = "" if target_path and not target_path.endswith("/"): target_path = f"{target_path}/" - files = [] + files: List[str] = [] with self.connection() as conn: try: _, files = conn.get_container( @@ -863,7 +882,6 @@ def is_file(self, path: PATH_TYPES) -> bool: return False else: return True - return False def is_symlink(self, path: PATH_TYPES) -> bool: """Check whether the provided path is a symlink""" @@ -875,7 +893,7 @@ def is_symlink(self, path: PATH_TYPES) -> bool: except swiftclient.exceptions.ClientException: return False if headers: - return headers.get("content-type", "") == "application/symlink" + return bool(headers.get("content-type", "") == "application/symlink") return False def update_timestamp(self, path: PATH_TYPES) -> None: @@ -888,7 +906,7 @@ def update_timestamp(self, path: PATH_TYPES) -> None: def get_hash(self, path: str, function: str = "sha256") -> str: h = getattr(hashlib, function)(self.read_file(path, text=False)) - return h.hexdigest() + return str(h.hexdigest()) def symlink( self, @@ -908,7 +926,7 @@ def symlink( } if src_account is not None: headers["X-Symlink-Target-Account"] = src_account - return conn.put_object( + conn.put_object( self.default_container, str(dest), b"",