Skip to content

Commit

Permalink
Merge pull request #1628 from aboutcode-org/1627-migrate-pysec
Browse files Browse the repository at this point in the history
Migrate pysec importer to aboutcode pipeline
  • Loading branch information
TG1999 authored Oct 28, 2024
2 parents 45070e8 + a02e211 commit 590c91a
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 61 deletions.
4 changes: 2 additions & 2 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from vulnerabilities.importers import oss_fuzz
from vulnerabilities.importers import postgresql
from vulnerabilities.importers import project_kb_msr2019
from vulnerabilities.importers import pysec
from vulnerabilities.importers import redhat
from vulnerabilities.importers import retiredotnet
from vulnerabilities.importers import ruby
Expand All @@ -42,9 +41,9 @@
from vulnerabilities.pipelines import npm_importer
from vulnerabilities.pipelines import nvd_importer
from vulnerabilities.pipelines import pypa_importer
from vulnerabilities.pipelines import pysec_importer

IMPORTERS_REGISTRY = [
pysec.PyPIImporter,
alpine_linux.AlpineImporter,
openssl.OpensslImporter,
redhat.RedhatImporter,
Expand Down Expand Up @@ -78,6 +77,7 @@
gitlab_importer.GitLabImporterPipeline,
github_importer.GitHubAPIImporterPipeline,
nvd_importer.NVDImporterPipeline,
pysec_importer.PyPIImporterPipeline,
]

IMPORTERS_REGISTRY = {
Expand Down
44 changes: 0 additions & 44 deletions vulnerabilities/importers/pysec.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Generated by Django 4.2.16 on 2024-10-24 13:51

from django.db import migrations

"""
Update the created_by field on Advisory from the old qualified_name
to the new pipeline_id.
"""


def update_created_by(apps, schema_editor):
from vulnerabilities.pipelines.pysec_importer import PyPIImporterPipeline

Advisory = apps.get_model("vulnerabilities", "Advisory")
Advisory.objects.filter(created_by="vulnerabilities.importers.pysec.PyPIImporter").update(
created_by=PyPIImporterPipeline.pipeline_id
)


def reverse_update_created_by(apps, schema_editor):
from vulnerabilities.pipelines.pysec_importer import PyPIImporterPipeline

Advisory = apps.get_model("vulnerabilities", "Advisory")
Advisory.objects.filter(created_by=PyPIImporterPipeline.pipeline_id).update(
created_by="vulnerabilities.importers.pysec.PyPIImporter"
)


class Migration(migrations.Migration):

dependencies = [
("vulnerabilities", "0073_delete_packagerelatedvulnerability"),
]

operations = [
migrations.RunPython(update_created_by, reverse_code=reverse_update_created_by),
]
2 changes: 1 addition & 1 deletion vulnerabilities/pipelines/pypa_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import logging

from pathlib import Path
from typing import Iterable

Expand Down
66 changes: 66 additions & 0 deletions vulnerabilities/pipelines/pysec_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import json
import logging
from io import BytesIO
from typing import Iterable
from zipfile import ZipFile

import requests

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline


class PyPIImporterPipeline(VulnerableCodeBaseImporterPipeline):
"""Collect advisories from PyPI."""

pipeline_id = "pysec_importer"

license_url = "https://github.com/pypa/advisory-database/blob/main/LICENSE"
url = "https://osv-vulnerabilities.storage.googleapis.com/PyPI/all.zip"
spdx_license_expression = "CC-BY-4.0"
importer_name = "PyPI Importer"

@classmethod
def steps(cls):
return (
cls.fetch_zip,
cls.collect_and_store_advisories,
cls.import_new_advisories,
)

def fetch_zip(self):
self.log(f"Fetching `{self.url}`")
self.advisory_zip = requests.get(self.url).content

def advisories_count(self) -> int:
with ZipFile(BytesIO(self.advisory_zip)) as zip:
advisory_count = sum(1 for file in zip.namelist() if file.startswith("PYSEC-"))
return advisory_count

def collect_advisories(self) -> Iterable[AdvisoryData]:
"""Yield AdvisoryData using a zipped data dump of OSV data"""
from vulnerabilities.importers.osv import parse_advisory_data

with ZipFile(BytesIO(self.advisory_zip)) as zip_file:
for file_name in zip_file.namelist():
if not file_name.startswith("PYSEC-"):
self.log(
f"Unsupported PyPI advisory data file: {file_name}",
level=logging.ERROR,
)
continue
with zip_file.open(file_name) as f:
vul_info = json.load(f)
yield parse_advisory_data(
raw_data=vul_info,
supported_ecosystems=["pypi"],
advisory_url=self.url,
)
Original file line number Diff line number Diff line change
Expand Up @@ -7,52 +7,51 @@
# See https://aboutcode.org for more information about nexB OSS projects.
#
import json
import os
from pathlib import Path
from unittest import TestCase

from vulnerabilities.importers.osv import parse_advisory_data
from vulnerabilities.tests.util_tests import VULNERABLECODE_REGEN_TEST_FIXTURES as REGEN
from vulnerabilities.tests.util_tests import check_results_against_json

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
TEST_DATA = os.path.join(BASE_DIR, "test_data/pysec")
TEST_DATA = Path(__file__).parent.parent / "test_data" / "pysec"


class TestPyPIImporter(TestCase):
def test_to_advisories_with_summary(self):
with open(os.path.join(TEST_DATA, "pysec-advisories_with_summary.json")) as f:
with open(TEST_DATA / "pysec-advisories_with_summary.json") as f:
mock_response = json.load(f)
results = parse_advisory_data(mock_response, ["pypi"], "https://test.com").to_dict()

expected_file = os.path.join(TEST_DATA, "pysec-advisories_with_summary-expected.json")
expected_file = TEST_DATA / "pysec-advisories_with_summary-expected.json"
check_results_against_json(
results=results,
expected_file=expected_file,
regen=REGEN,
)

def test_to_advisories_without_summary(self):
with open(os.path.join(TEST_DATA, "pysec-advisories_without_summary.json")) as f:
with open(TEST_DATA / "pysec-advisories_without_summary.json") as f:
mock_response = json.load(f)

results = parse_advisory_data(mock_response, ["pypi"], "https://test.com").to_dict()

expected_file = os.path.join(TEST_DATA, "pysec-advisories_without_summary-expected.json")
expected_file = TEST_DATA / "pysec-advisories_without_summary-expected.json"
check_results_against_json(
results=results,
expected_file=expected_file,
regen=REGEN,
)

def test_to_advisories_with_cwe(self):
with open(os.path.join(TEST_DATA, "pysec-advisory_with_cwe.json")) as f:
with open(TEST_DATA / "pysec-advisory_with_cwe.json") as f:
mock_response = json.load(f)

results = parse_advisory_data(
raw_data=mock_response, supported_ecosystems=["pypi"], advisory_url="https://tes.com"
).to_dict()

expected_file = os.path.join(TEST_DATA, "pysec-advisories_with_cwe-expected.json")
expected_file = TEST_DATA / "pysec-advisories_with_cwe-expected.json"
check_results_against_json(
results=results,
expected_file=expected_file,
Expand Down
49 changes: 44 additions & 5 deletions vulnerabilities/tests/test_data_migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ def setUpBeforeMigration(self, apps):
date_collected=timezone.now(),
)

def test_removal_of_duped_purls(self):
def test_update_npm_pypa_created_by_field(self):
Advisory = apps.get_model("vulnerabilities", "Advisory")
adv = Advisory.objects.all()

Expand Down Expand Up @@ -714,7 +714,7 @@ def setUpBeforeMigration(self, apps):
date_collected=timezone.now(),
)

def test_removal_of_duped_purls(self):
def test_update_nginx_created_by_field(self):
Advisory = apps.get_model("vulnerabilities", "Advisory")
adv = Advisory.objects.all()

Expand Down Expand Up @@ -753,7 +753,7 @@ def setUpBeforeMigration(self, apps):
date_collected=timezone.now(),
)

def test_removal_of_duped_purls(self):
def test_update_gitlab_created_by_field(self):
Advisory = apps.get_model("vulnerabilities", "Advisory")
adv = Advisory.objects.all()

Expand Down Expand Up @@ -794,7 +794,7 @@ def setUpBeforeMigration(self, apps):
date_collected=timezone.now(),
)

def test_removal_of_duped_purls(self):
def test_update_github_created_by_field(self):
Advisory = apps.get_model("vulnerabilities", "Advisory")
adv = Advisory.objects.all()

Expand Down Expand Up @@ -835,9 +835,48 @@ def setUpBeforeMigration(self, apps):
date_collected=timezone.now(),
)

def test_removal_of_duped_purls(self):
def test_update_nvd_created_by_field(self):
Advisory = apps.get_model("vulnerabilities", "Advisory")
adv = Advisory.objects.all()

assert adv.filter(created_by="vulnerabilities.importers.nvd.NVDImporter").count() == 0
assert adv.filter(created_by="nvd_importer").count() == 1


class TestUpdatePysecAdvisoryCreatedByField(TestMigrations):
app_name = "vulnerabilities"
migrate_from = "0073_delete_packagerelatedvulnerability"
migrate_to = "0074_update_pysec_advisory_created_by"

advisory_data1 = AdvisoryData(
aliases=["CVE-2020-13371337"],
summary="vulnerability description here",
affected_packages=[
AffectedPackage(
package=PackageURL(type="pypi", name="foobar"),
affected_version_range=VersionRange.from_string("vers:pypi/>=1.0.0|<=2.0.0"),
)
],
references=[Reference(url="https://example.com/with/more/info/CVE-2020-13371337")],
date_published=timezone.now(),
url="https://test.com",
)

def setUpBeforeMigration(self, apps):
Advisory = apps.get_model("vulnerabilities", "Advisory")
adv1 = Advisory.objects.create(
aliases=self.advisory_data1.aliases,
summary=self.advisory_data1.summary,
affected_packages=[pkg.to_dict() for pkg in self.advisory_data1.affected_packages],
references=[ref.to_dict() for ref in self.advisory_data1.references],
url=self.advisory_data1.url,
created_by="vulnerabilities.importers.pysec.PyPIImporter",
date_collected=timezone.now(),
)

def test_update_pysec_created_by_field(self):
Advisory = apps.get_model("vulnerabilities", "Advisory")
adv = Advisory.objects.all()

assert adv.filter(created_by="vulnerabilities.importers.pysec.PyPIImporter").count() == 0
assert adv.filter(created_by="pysec_importer").count() == 1

0 comments on commit 590c91a

Please sign in to comment.