Skip to content

Commit

Permalink
Merge branch 'main' into migrate/debian_oval
Browse files Browse the repository at this point in the history
  • Loading branch information
pombredanne authored Oct 19, 2022
2 parents b4f38bc + 92eb01a commit 57bd8a7
Show file tree
Hide file tree
Showing 11 changed files with 346 additions and 134 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ toml==0.10.2
tomli==2.0.1
traitlets==5.1.1
typing_extensions==4.1.1
univers==30.7.0
univers==30.9.0
urllib3==1.26.9
wcwidth==0.2.5
websocket-client==0.59.0
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ install_requires =

#essentials
packageurl-python>=0.9.4
univers>=30.3.1
univers>=30.9.0
license-expression>=21.6.14

# file and data formats
Expand Down
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#

from vulnerabilities.importers import alpine_linux
from vulnerabilities.importers import archlinux
from vulnerabilities.importers import debian
from vulnerabilities.importers import debian_oval
from vulnerabilities.importers import github
Expand All @@ -33,6 +34,7 @@
pypa.PyPaImporter,
ubuntu.UbuntuImporter,
debian_oval.DebianOvalImporter,
archlinux.ArchlinuxImporter,
]

IMPORTERS_REGISTRY = {x.qualified_name: x for x in IMPORTERS_REGISTRY}
11 changes: 1 addition & 10 deletions vulnerabilities/importers/alpine_linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from vulnerabilities.references import WireSharkReference
from vulnerabilities.references import XsaReference
from vulnerabilities.references import ZbxReference
from vulnerabilities.utils import fetch_response
from vulnerabilities.utils import is_cve

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -58,16 +59,6 @@ def advisory_data(self) -> Iterable[AdvisoryData]:
yield from process_record(record)


def fetch_response(url):
"""
Fetch and return `response` from the `url`
"""
response = requests.get(url)
if response.status_code == 200:
return response
raise Exception(f"Failed to fetch data from {url!r} with status code: {response.status_code!r}")


def fetch_advisory_directory_links(page_response_content: str) -> List[str]:
"""
Return a list of advisory directory links present in `page_response_content` html string
Expand Down
76 changes: 37 additions & 39 deletions vulnerabilities/importers/archlinux.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,66 +6,64 @@
# See https://github.com/nexB/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import dataclasses
import json

from typing import Iterable
from typing import List
from typing import Mapping
from typing import Set
from urllib.request import urlopen

from packageurl import PackageURL
from univers.version_range import ArchLinuxVersionRange
from univers.versions import ArchLinuxVersion

from vulnerabilities import severity_systems
from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackage
from vulnerabilities.importer import Importer
from vulnerabilities.importer import Reference
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.utils import nearest_patched_package
from vulnerabilities.utils import fetch_response


class ArchlinuxImporter(Importer):
def __enter__(self):
self._api_response = self._fetch()

def updated_advisories(self) -> Set[AdvisoryData]:
advisories = []
url = "https://security.archlinux.org/json"
spdx_license_expression = "MIT"
license_url = "https://github.com/archlinux/arch-security-tracker/blob/master/LICENSE"

for record in self._api_response:
advisories.extend(self._parse(record))
def fetch(self) -> Iterable[Mapping]:
response = fetch_response(self.url)
return response.json()

return self.batch_advisories(advisories)
def advisory_data(self) -> Iterable[AdvisoryData]:
for record in self.fetch():
yield from self.parse_advisory(record)

def _fetch(self) -> Iterable[Mapping]:
with urlopen(self.config.archlinux_tracker_url) as response:
return json.load(response)

def _parse(self, record) -> List[AdvisoryData]:
def parse_advisory(self, record) -> List[AdvisoryData]:
advisories = []

for cve_id in record["issues"]:
aliases = record.get("issues") or []
for alias in aliases:
affected_packages = []
for name in record["packages"]:
impacted_purls, resolved_purls = [], []
impacted_purls.append(
PackageURL(
summary = record.get("type") or ""
if summary == "unknown":
summary = ""
affected = record.get("affected") or ""
affected_version_range = (
ArchLinuxVersionRange.from_versions([affected]) if affected else None
)
fixed = record.get("fixed") or ""
fixed_version = ArchLinuxVersion(fixed) if fixed else None
affected_packages = []
affected_package = AffectedPackage(
package=PackageURL(
name=name,
type="pacman",
type="alpm",
namespace="archlinux",
version=record["affected"],
)
),
affected_version_range=affected_version_range,
fixed_version=fixed_version,
)

if record["fixed"]:
resolved_purls.append(
PackageURL(
name=name,
type="pacman",
namespace="archlinux",
version=record["fixed"],
)
)
affected_packages.extend(nearest_patched_package(impacted_purls, resolved_purls))
affected_packages.append(affected_package)

references = []
references.append(
Expand All @@ -89,9 +87,9 @@ def _parse(self, record) -> List[AdvisoryData]:
)

advisories.append(
Advisory(
vulnerability_id=cve_id,
summary="",
AdvisoryData(
aliases=[alias],
summary=summary,
affected_packages=affected_packages,
references=references,
)
Expand Down
1 change: 0 additions & 1 deletion vulnerabilities/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ def no_rmtree(monkeypatch):
"test_apache_kafka.py",
"test_apache_tomcat.py",
"test_api.py",
"test_archlinux.py",
"test_elixir_security.py",
"test_gentoo.py",
"test_importer_yielder.py",
Expand Down
115 changes: 33 additions & 82 deletions vulnerabilities/tests/test_archlinux.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,87 +16,38 @@

from vulnerabilities import models
from vulnerabilities.import_runner import ImportRunner
from vulnerabilities.importers import archlinux
from vulnerabilities.tests import util_tests

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
TEST_DATA = os.path.join(BASE_DIR, "test_data/")


class ArchlinuxImportTest(TestCase):
@classmethod
def setUpClass(cls) -> None:
fixture_path = os.path.join(TEST_DATA, "archlinux.json")
with open(fixture_path) as f:
cls.mock_response = json.load(f)

cls.importer = models.Importer.objects.create(
name="archlinux_unittests",
license="",
last_run=None,
data_source="ArchlinuxImporter",
data_source_cfg={
"archlinux_tracker_url": "https://security.example.com/json",
},
)

@classmethod
def tearDownClass(cls) -> None:
pass

def test_import(self):
runner = ImportRunner(self.importer, 5)

with patch(
"vulnerabilities.importers.ArchlinuxImporter._fetch", return_value=self.mock_response
):
runner.run()
assert models.Vulnerability.objects.count() == 6
assert models.VulnerabilityReference.objects.count() == 10
assert models.PackageRelatedVulnerability.objects.all().count() == 12
assert (
models.PackageRelatedVulnerability.objects.filter(patched_package__isnull=False).count()
== 8
)
assert models.Package.objects.count() == 10

self.assert_for_package(
"squid",
"4.10-2",
cve_ids={"CVE-2020-11945", "CVE-2019-12521", "CVE-2019-12519"},
)
self.assert_for_package("openconnect", "1:8.05-1", cve_ids={"CVE-2020-12823"})
self.assert_for_package(
"wireshark-common",
"2.6.0-1",
cve_ids={"CVE-2018-11362", "CVE-2018-11361"},
)
self.assert_for_package(
"wireshark-gtk",
"2.6.0-1",
cve_ids={"CVE-2018-11362", "CVE-2018-11361"},
)
self.assert_for_package(
"wireshark-cli",
"2.6.0-1",
cve_ids={"CVE-2018-11362", "CVE-2018-11361"},
)
self.assert_for_package(
"wireshark-qt",
"2.6.0-1",
cve_ids={"CVE-2018-11362", "CVE-2018-11361"},
)
self.assert_for_package("wireshark-common", "2.6.1-1")
self.assert_for_package("wireshark-gtk", "2.6.1-1")
self.assert_for_package("wireshark-cli", "2.6.1-1")
self.assert_for_package("wireshark-qt", "2.6.1-1")

def assert_for_package(self, name, version, cve_ids=None):
qs = models.Package.objects.filter(
name=name,
version=version,
type="pacman",
namespace="archlinux",
)
assert qs

if cve_ids:
assert cve_ids == {v.vulnerability_id for v in qs[0].vulnerabilities.all()}
TEST_DATA = os.path.join(BASE_DIR, "test_data/archlinux")


def test_parse_advisory_single():
record = {
"name": "AVG-2781",
"packages": ["python-pyjwt"],
"status": "Unknown",
"severity": "Unknown",
"type": "unknown",
"affected": "2.3.0-1",
"fixed": "2.4.0-1",
"ticket": None,
"issues": ["CVE-2022-29217"],
"advisories": [],
}

advisory_data = archlinux.ArchlinuxImporter().parse_advisory(record)
result = [data.to_dict() for data in advisory_data]
expected_file = os.path.join(TEST_DATA, f"parse-advisory-archlinux-expected.json")
util_tests.check_results_against_json(result, expected_file)


@patch("vulnerabilities.importers.archlinux.ArchlinuxImporter.fetch")
def test_archlinux_importer(mock_response):
with open(os.path.join(TEST_DATA, "archlinux-multi.json")) as f:
mock_response.return_value = json.load(f)

expected_file = os.path.join(TEST_DATA, f"archlinux-multi-expected.json")
result = [data.to_dict() for data in list(archlinux.ArchlinuxImporter().advisory_data())]
util_tests.check_results_against_json(result, expected_file)
Loading

0 comments on commit 57bd8a7

Please sign in to comment.