Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Type annotate bandersnatch plugins #561

Merged
merged 10 commits into from
Jun 8, 2020
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ repos:
rev: v0.770
hooks:
- id: mypy
exclude: (docs/.*|src/bandersnatch/tests/.*|src/bandersnatch_filter_plugins/.*|src/bandersnatch_storage_plugins/.*)
exclude: (docs/.*|src/bandersnatch/tests/.*)
22 changes: 11 additions & 11 deletions src/bandersnatch_filter_plugins/blacklist_name.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import List
from typing import Any, Dict, List, Set

from packaging.requirements import Requirement
from packaging.version import InvalidVersion, Version
Expand All @@ -14,7 +14,7 @@ class BlacklistProject(FilterProjectPlugin):
# Requires iterable default
blacklist_package_names: List[str] = []

def initialize_plugin(self):
def initialize_plugin(self) -> None:
"""
Initialize the plugin
"""
Expand All @@ -28,7 +28,7 @@ def initialize_plugin(self):
+ f"{self.blacklist_package_names}"
)

def _determine_filtered_package_names(self):
def _determine_filtered_package_names(self) -> List[str]:
"""
Return a list of package names to be filtered base on the configuration
file.
Expand All @@ -37,7 +37,7 @@ def _determine_filtered_package_names(self):
# configuration contains a PEP440 specifier it will be processed by the
# blacklist release filter. So we need to remove any packages that
# are not applicable for this plugin.
filtered_packages = set()
filtered_packages: Set[str] = set()
try:
lines = self.configuration["blacklist"]["packages"]
package_lines = lines.split("\n")
Expand All @@ -61,10 +61,10 @@ def _determine_filtered_package_names(self):
logger.debug("Project blacklist is %r", list(filtered_packages))
return list(filtered_packages)

def filter(self, metadata):
def filter(self, metadata: Dict) -> bool:
return not self.check_match(name=metadata["info"]["name"])

def check_match(self, **kwargs):
def check_match(self, **kwargs: Any) -> bool:
"""
Check if the package name matches against a project that is blacklisted
in the configuration.
Expand Down Expand Up @@ -95,7 +95,7 @@ class BlacklistRelease(FilterReleasePlugin):
# Requires iterable default
blacklist_package_names: List[Requirement] = []

def initialize_plugin(self):
def initialize_plugin(self) -> None:
"""
Initialize the plugin
"""
Expand All @@ -111,7 +111,7 @@ def initialize_plugin(self):
+ f"{self.blacklist_release_requirements}"
)

def _determine_filtered_package_requirements(self):
def _determine_filtered_package_requirements(self) -> List[Requirement]:
"""
Parse the configuration file for [blacklist]packages

Expand All @@ -120,7 +120,7 @@ def _determine_filtered_package_requirements(self):
list of packaging.requirements.Requirement
For all PEP440 package specifiers
"""
filtered_requirements = set()
filtered_requirements: Set[Requirement] = set()
try:
lines = self.configuration["blacklist"]["packages"]
package_lines = lines.split("\n")
Expand All @@ -133,7 +133,7 @@ def _determine_filtered_package_requirements(self):
filtered_requirements.add(Requirement(package_line))
return list(filtered_requirements)

def filter(self, metadata):
def filter(self, metadata: Dict) -> bool:
name = metadata["info"]["name"]
releases = metadata["releases"]
for version in list(releases.keys()):
Expand All @@ -144,7 +144,7 @@ def filter(self, metadata):
else:
return True

def _check_match(self, name, version_string) -> bool:
def _check_match(self, name: str, version_string: str) -> bool:
"""
Check if the package name and version matches against a blacklisted
package version specifier.
Expand Down
10 changes: 5 additions & 5 deletions src/bandersnatch_filter_plugins/filename_name.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import List
from typing import Dict, List

from bandersnatch.filter import FilterReleasePlugin

Expand Down Expand Up @@ -29,7 +29,7 @@ class ExcludePlatformFilter(FilterReleasePlugin):
"manylinux2010_x86_64", # PEP 571
]

def initialize_plugin(self):
def initialize_plugin(self) -> None:
"""
Initialize the plugin reading patterns from the config.
"""
Expand Down Expand Up @@ -77,7 +77,7 @@ def initialize_plugin(self):

logger.info(f"Initialized {self.name} plugin with {self._patterns!r}")

def filter(self, metadata):
def filter(self, metadata: Dict) -> bool:
releases = metadata["releases"]
"""
Remove files from `releases` that match any pattern.
Expand All @@ -104,13 +104,13 @@ def filter(self, metadata):
else:
return True

def _check_match(self, file_desc) -> bool:
def _check_match(self, file_desc: Dict) -> bool:
"""
Check if a release version matches any of the specified patterns.

Parameters
==========
name: file_desc
file_desc: Dict
file description entry

Returns
Expand Down
12 changes: 8 additions & 4 deletions src/bandersnatch_filter_plugins/latest_name.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from operator import itemgetter
from typing import Dict, List, Optional, Set, Union

from packaging.version import parse

Expand All @@ -16,7 +17,7 @@ class LatestReleaseFilter(FilterReleasePlugin):
name = "latest_release"
keep = 0 # by default, keep 'em all

def initialize_plugin(self):
def initialize_plugin(self) -> None:
"""
Initialize the plugin reading patterns from the config.
"""
Expand All @@ -32,22 +33,25 @@ def initialize_plugin(self):
if self.keep > 0:
logger.info(f"Initialized latest releases plugin with keep={self.keep}")

def filter(self, metadata):
def filter( # type: ignore[override]
self, metadata: Dict
) -> Optional[bool]:
"""
Keep the latest releases
"""
latest: Union[List, Set]
info = metadata["info"]
releases = metadata["releases"]

if self.keep == 0:
return
return None

versions = list(releases.keys())
before = len(versions)

if before <= self.keep:
# not enough releases: do nothing
return
return None

versions_pair = map(lambda v: (parse(v), v), versions)
latest = sorted(versions_pair)[-self.keep :] # noqa: E203
Expand Down
61 changes: 35 additions & 26 deletions src/bandersnatch_filter_plugins/metadata_filter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import re
from typing import Dict, List
from configparser import SectionProxy
from typing import Any, Dict, List

from packaging.specifiers import SpecifierSet
from packaging.version import parse
Expand All @@ -25,12 +26,12 @@ class RegexFilter(Filter):
initilized = False
patterns: Dict = {}

def initialize_plugin(self):
def initialize_plugin(self) -> None:
"""
Initialize the plugin reading patterns from the config.
"""
try:
config = self.configuration[self.name]
config: SectionProxy = self.configuration[self.name]
except KeyError:
return
else:
Expand All @@ -57,7 +58,7 @@ def filter(self, metadata: Dict) -> bool:
# Walk through keys of patterns dict and return True iff all match
return all(self._match_node_at_path(k, metadata) for k in self.patterns)

def _match_node_at_path(self, key: str, metadata):
def _match_node_at_path(self, key: str, metadata: Dict) -> bool:

# Grab any tags prepended to key
tags = key.split(":")
Expand Down Expand Up @@ -94,21 +95,23 @@ def _match_node_at_path(self, key: str, metadata):
else:
return self._match_any_patterns(key, node, nulls_match=nulls_match)

def _find_element_by_dotted_path(self, path, metadata):
def _find_element_by_dotted_path(self, path: str, metadata: Dict) -> List:
# Walk our metadata structure following dotted path.
path = path.split(".")
split_path = path.split(".")
node = metadata
for p in path:
for p in split_path:
if p in node and node[p] is not None:
node = node[p]
else:
return []
if isinstance(node, list):
return node
if isinstance(node, list): # type: ignore
return node # type: ignore
else:
return [node]

def _match_any_patterns(self, key: str, values: List, nulls_match=True):
def _match_any_patterns(
self, key: str, values: List[str], nulls_match: bool = True
) -> bool:
results = []
for pattern in self.patterns[key]:
if nulls_match and not values:
Expand All @@ -118,7 +121,9 @@ def _match_any_patterns(self, key: str, values: List, nulls_match=True):
results.append(pattern.match(value))
return any(results)

def _match_all_patterns(self, key: str, values: List, nulls_match=True):
def _match_all_patterns(
self, key: str, values: List[str], nulls_match: bool = True
) -> bool:
results = []
for pattern in self.patterns[key]:
if nulls_match and not values:
Expand All @@ -127,14 +132,16 @@ def _match_all_patterns(self, key: str, values: List, nulls_match=True):
results.append(any(pattern.match(v) for v in values))
return all(results)

def _match_none_patterns(self, key: str, values: List, nulls_match=True):
def _match_none_patterns(
self, key: str, values: List[str], nulls_match: bool = True
) -> bool:
return not self._match_any_patterns(key, values)


class RegexProjectMetadataFilter(FilterMetadataPlugin, RegexFilter):
"""
Plugin to download only packages having metadata matching
at least one of the specified patterns.
at least one of the specified patterns.
"""

name = "regex_project_metadata"
Expand All @@ -143,10 +150,10 @@ class RegexProjectMetadataFilter(FilterMetadataPlugin, RegexFilter):
initilized = False
patterns: Dict = {}

def initilize_plugin(self):
def initilize_plugin(self) -> None:
RegexFilter.initialize_plugin(self)

def filter(self, metadata: dict) -> bool:
def filter(self, metadata: Dict) -> bool:
return RegexFilter.filter(self, metadata)


Expand All @@ -162,10 +169,10 @@ class RegexReleaseFileMetadataFilter(FilterReleaseFilePlugin, RegexFilter):
initilized = False
patterns: Dict = {}

def initilize_plugin(self):
def initilize_plugin(self) -> None:
RegexFilter.initialize_plugin(self)

def filter(self, metadata: dict) -> bool:
def filter(self, metadata: Dict) -> bool:
return RegexFilter.filter(self, metadata)


Expand All @@ -180,12 +187,14 @@ class VersionRangeFilter(Filter):
specifiers: Dict = {}
nulls_match = True

def initialize_plugin(self):
def initialize_plugin(self) -> None:
"""
Initialize the plugin reading version ranges from the config.
"""
try:
config = self.configuration["version_range_release_file_metadata"]
config: SectionProxy = self.configuration[
"version_range_release_file_metadata"
]
except KeyError:
return
else:
Expand All @@ -212,19 +221,19 @@ def filter(self, metadata: Dict) -> bool:

return all(self._match_node_at_path(k, metadata) for k in self.specifiers)

def _find_element_by_dotted_path(self, path, metadata):
def _find_element_by_dotted_path(self, path: str, metadata: Dict) -> Any:
# Walk our metadata structure following dotted path.
path = path.split(".")
split_path = path.split(".")
node = metadata
for p in path:
for p in split_path:
if p in node and node[p] is not None:
node = node[p]
else:
return None

return node

def _match_node_at_path(self, key: str, metadata):
def _match_node_at_path(self, key: str, metadata: Dict) -> bool:

# Grab any tags prepended to key
tags = key.split(":")
Expand Down Expand Up @@ -252,7 +261,7 @@ def _match_node_at_path(self, key: str, metadata):

# Check if SpeciferSet matches target versions
# TODO: Figure out proper intersection of SpecifierSets
ospecs = SpecifierSet(node)
ospecs: SpecifierSet = SpecifierSet(node)
ispecs = self.specifiers[key]
if any(ospecs.contains(ispec, prereleases=True) for ispec in ispecs):
return True
Expand All @@ -274,7 +283,7 @@ class VersionRangeProjectMetadataFilter(FilterMetadataPlugin, VersionRangeFilter
specifiers: Dict = {}
nulls_match = True

def initialize_plugin(self):
def initialize_plugin(self) -> None:
VersionRangeFilter.initialize_plugin(self)

def filter(self, metadata: dict) -> bool:
Expand All @@ -294,7 +303,7 @@ class VersionRangeReleaseFileMetadataFilter(
specifiers: Dict = {}
nulls_match = True

def initialize_plugin(self):
def initialize_plugin(self) -> None:
VersionRangeFilter.initialize_plugin(self)

def filter(self, metadata: dict) -> bool:
Expand Down
Loading