Skip to content

Commit

Permalink
Add logs and fix encode
Browse files Browse the repository at this point in the history
Add a doctest

Fix gsd aliases , add gsd test

Add gsd importer initial config
Signed-off-by: Ziad <[email protected]>
  • Loading branch information
ziadhany committed Feb 5, 2023
1 parent 80da375 commit fc5623c
Show file tree
Hide file tree
Showing 11 changed files with 1,273 additions and 0 deletions.
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from vulnerabilities.importers import gentoo
from vulnerabilities.importers import github
from vulnerabilities.importers import gitlab
from vulnerabilities.importers import gsd
from vulnerabilities.importers import istio
from vulnerabilities.importers import mozilla
from vulnerabilities.importers import nginx
Expand Down Expand Up @@ -59,6 +60,7 @@
elixir_security.ElixirSecurityImporter,
apache_tomcat.ApacheTomcatImporter,
xen.XenImporter,
gsd.GSDImporter,
]

IMPORTERS_REGISTRY = {x.qualified_name: x for x in IMPORTERS_REGISTRY}
222 changes: 222 additions & 0 deletions vulnerabilities/importers/gsd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import json
import logging
from io import BytesIO
from typing import Iterable
from typing import List
from typing import Set
from zipfile import ZipFile

import dateparser
import requests

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import Importer
from vulnerabilities.importer import Reference
from vulnerabilities.utils import build_description
from vulnerabilities.utils import dedupe

logger = logging.getLogger(__name__)


class GSDImporter(Importer):
spdx_license_expression = "CC0-1.0"
license_url = "https://github.com/cloudsecurityalliance/gsd-database/blob/main/LICENSE"
url = "https://codeload.github.com/cloudsecurityalliance/gsd-database/zip/refs/heads/main"

def advisory_data(self) -> Iterable[AdvisoryData]:
response = requests.get(self.url).content
with ZipFile(BytesIO(response)) as zip_file:
for file_name in zip_file.namelist():
if file_name == "gsd-database-main/allowlist.json" or not file_name.endswith(
".json"
):
continue

with zip_file.open(file_name) as f:
try:
yield parse_advisory_data(f.read())
except Exception as e:
logger.error(f"Invalid GSD advisory data file: {file_name} - {e}")


def parse_advisory_data(file):
"""
Parse a GSD advisory file and return an AdvisoryData.
Each advisory file contains the advisory information in JSON format.
"""
raw_data = json.loads(file)

namespaces = raw_data.get("namespaces") or {}
cve_org = namespaces.get("cve.org") or {}
nvd_nist_gov = namespaces.get("nvd.nist.gov") or {}

gsd = raw_data.get("GSD") or {}
gsd_alias = gsd.get("alias")
gsd_id = gsd.get("id")
gsd_description = gsd.get("description")

gsd_reference_data = gsd.get("") or []
gsd_references = [Reference(url=ref) for ref in gsd_reference_data]

details = gsd_description or "".join(get_description(cve_org))

aliases_cve_org = get_aliases(cve_org)
aliases_nvd_nist_gov = get_aliases(nvd_nist_gov)

aliases = [gsd_alias, gsd_id] + aliases_cve_org + aliases_nvd_nist_gov
aliases = [alias for alias in aliases if alias is not None]

summary = build_description(summary=get_summary(cve_org), description=details)

# severities = get_severities(cve_org)
# configurations = nvd_nist_gov.get("configurations") or {}
# nodes = configurations.get("nodes") or []
# cpes = get_cpe(nodes)

references = get_references(cve_org) + gsd_references

date_published = get_published_date_nvd_nist_gov(nvd_nist_gov)

return AdvisoryData(
aliases=dedupe(aliases),
summary=summary,
references=references,
date_published=date_published,
)


def get_summary(cve) -> str:
"""
Returns a title of CVE_data_meta
>> get_summary {"CVE_data_meta": {"TITLE": "DoS vulnerability: Invalid Accent Colors"}
'DoS vulnerability: Invalid Accent Colors'
"""
cve_data_meta = cve.get("CVE_data_meta") or {}
return cve_data_meta.get("TITLE") or ""


def get_severities(cve) -> List:
"""
Return a list of CVSS vectorString
>>> get_severities({"impact": {"cvss": {"vectorString": "CVSS:3.1/AV:N/AC:L/PR:L/UI:R/S:U/C:N/I:N/A:H"}}})
['CVSS:3.1/AV:N/AC:L/PR:L/UI:R/S:U/C:N/I:N/A:H']
"""
severities = []
impact = cve.get("impact") or {}

base_metric_2 = impact.get("baseMetricV2") or {}
if base_metric_2:
cvss_v2 = base_metric_2.get("cvssV2") or {}
cvss_vector = cvss_v2.get("vectorString")
if cvss_vector:
severities.append(cvss_vector)

base_metric_v3 = impact.get("baseMetricV3") or {}
if base_metric_v3:
cvss_v3 = base_metric_v3.get("cvssV3") or {}
cvss_vector = cvss_v3.get("vectorString")
if cvss_vector:
severities.append(cvss_vector)

cvss = impact.get("cvss") or {}
if isinstance(cvss, List):
for cvss_v in cvss:
if isinstance(cvss_v, dict):
cvss_vector = cvss_v.get("vectorString") or {}
if cvss_vector:
severities.append(cvss_vector)
else:
cvss_vector = cvss.get("vectorString")
if cvss_vector:
severities.append(cvss_vector)
return severities


def get_description(cve) -> [str]:
"""
Get a list description value from description object
>>> get_description({"description": {"description_data": [{"lang": "eng","value": "the description"}]}})
['the description']
"""
description = cve.get("description") or {}
description_data = description.get("description_data") or []
return [desc["value"] for desc in description_data if desc["value"] and desc["lang"] == "eng"]


def get_references(cve):
"""
Returns a list of Reference assigned with url
>>> get_references({"references": {
... "reference_data": [{
... "name": "https://kc.mcafee.com/corporate/index?page=content&id=SB10198",
... "refsource": "CONFIRM",
... "tags": ["Vendor Advisory"],
... "url": "https://kc.mcafee.com/corporate/index?page=content&id=SB10198"}]}})
[Reference(reference_id='', url='https://kc.mcafee.com/corporate/index?page=content&id=SB10198', severities=[])]
"""
references = cve.get("references") or {}
reference_data = references.get("reference_data") or []
return [Reference(url=ref["url"]) for ref in reference_data if ref["url"]]


def get_aliases(cve) -> [str]:
"""
Returns a list of aliases
>>> get_aliases({"CVE_data_meta": {"ID": "CVE-2017-4017"},"source": {"advisory": "GHSA-v8x6-59g4-5g3w"}})
['CVE-2017-4017', 'GHSA-v8x6-59g4-5g3w']
"""
cve_data_meta = cve.get("CVE_data_meta") or {}
alias = cve_data_meta.get("ID")

source = cve.get("source") or {}
advisory = source.get("advisory")

aliases = []
if alias:
aliases.append(alias)
if advisory:
aliases.append(advisory)
return aliases


def get_published_date_nvd_nist_gov(nvd_nist_gov):
"""
Returns a published datetime
>>> get_published_date_nvd_nist_gov({"publishedDate": "2022-06-23T07:15Z"})
datetime.datetime(2022, 6, 23, 7, 15, tzinfo=<StaticTzInfo 'Z'>)
"""
published_date = nvd_nist_gov.get("publishedDate")
return published_date and dateparser.parse(published_date)


def get_cpe(nodes) -> List:
"""
>>> get_cpe([{"children": [], "cpe_match": [{
... "cpe23Uri": "cpe:2.3:a:mutt:mutt:*:*:*:*:*:*:*:*",
... "cpe_name": [],
... "versionEndIncluding": "1.2.5.1",
... "vulnerable": True
... },{
... "cpe23Uri": "cpe:2.3:a:mutt:mutt:*:*:*:*:*:*:*:*",
... "cpe_name": [],
... "versionEndIncluding": "1.3.25",
... "vulnerable": True
... }],"operator": "OR"}])
['cpe:2.3:a:mutt:mutt:*:*:*:*:*:*:*:*', 'cpe:2.3:a:mutt:mutt:*:*:*:*:*:*:*:*']
"""
cpe_list = []
for node in nodes:
cpe_match = node.get("cpe_match") or []
for cpe23Uri in cpe_match:
cpe_uri = cpe23Uri.get("cpe23Uri")
if cpe_uri:
cpe_list.append(cpe_uri)
return cpe_list
93 changes: 93 additions & 0 deletions vulnerabilities/tests/test_data/gsd/GSD-2002-0001-expected.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
{
"aliases": [
"CVE-2002-0001",
"GSD-2002-0001"
],
"summary": "Vulnerability in RFC822 address parser in mutt before 1.2.5.1 and mutt 1.3.x before 1.3.25 allows remote attackers to execute arbitrary commands via an improperly terminated comment or phrase in the address list.",
"affected_packages": [

],
"references": [
{
"reference_id": "",
"url": "http://online.securityfocus.com/advisories/3778",
"severities": [

]
},
{
"reference_id": "",
"url": "ftp://ftp.freebsd.org/pub/FreeBSD/CERT/advisories/FreeBSD-SA-02:04.mutt.asc",
"severities": [

]
},
{
"reference_id": "",
"url": "http://www.debian.org/security/2002/dsa-096",
"severities": [

]
},
{
"reference_id": "",
"url": "http://www.mutt.org/announce/mutt-1.2.5.1-1.3.25.html",
"severities": [

]
},
{
"reference_id": "",
"url": "http://distro.conectiva.com.br/atualizacoes/?id=a&anuncio=000449",
"severities": [

]
},
{
"reference_id": "",
"url": "http://www.iss.net/security_center/static/7759.php",
"severities": [

]
},
{
"reference_id": "",
"url": "http://www.securityfocus.com/bid/3774",
"severities": [

]
},
{
"reference_id": "",
"url": "ftp://ftp.caldera.com/pub/security/OpenLinux/CSSA-2002-002.0.txt",
"severities": [

]
},
{
"reference_id": "",
"url": "http://www.novell.com/linux/security/advisories/2002_001_mutt_txt.html",
"severities": [

]
},
{
"reference_id": "",
"url": "http://marc.info/?l=bugtraq&m=100994648918287&w=2",
"severities": [

]
},
{
"reference_id": "",
"url": "http://www.redhat.com/support/errata/RHSA-2002-003.html",
"severities": [

]
}
],
"date_published": "2002-02-27T05:00:00+00:00",
"weaknesses": [

]
}
Loading

0 comments on commit fc5623c

Please sign in to comment.