-
Notifications
You must be signed in to change notification settings - Fork 31
/
build-cve-cache.py
152 lines (129 loc) · 5.19 KB
/
build-cve-cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import os
import json
import urllib.request
import ssl
import hashlib
import time
# Constants
SOFA_REPO_PATH = "./v1" # Adjust this path to where your v1 folder is located
VULNCHECK_API_URL = (
"https://api.vulncheck.com/v3/index/nist-nvd2" # VulnCheck API endpoint
)
API_KEY = os.getenv('API_KEY')
HEADERS = {"accept": "application/json", "User-Agent": "sofa-cve-fetch/1.0"}
def compute_hash(data):
"""
Compute a SHA-256 hash of the given data.
:param data: The data to hash, typically a dictionary.
:return: A hexadecimal string representing the hash of the data.
"""
json_str = json.dumps(data, sort_keys=True).encode()
return hashlib.sha256(json_str).hexdigest()
def get_cve_details(cve_id):
vulncheck_headers = HEADERS
vulncheck_headers["Authorization"] = f"Bearer {API_KEY}"
api_url = f"{VULNCHECK_API_URL}?cve={cve_id}"
# Create an SSL context that does not verify SSL certificates
context = ssl._create_unverified_context()
req = urllib.request.Request(url=api_url, headers=vulncheck_headers, method="GET")
start_time = time.time()
try:
response = urllib.request.urlopen(req, context=context)
data = json.loads(response.read().decode("utf-8"))
elapsed_time = time.time() - start_time
if "cvssMetricV31" in data["data"][0]["metrics"]:
baseScore = data["data"][0]["metrics"]["cvssMetricV31"][0]["cvssData"][
"baseScore"
]
exploitabilityScore = data["data"][0]["metrics"]["cvssMetricV31"][0][
"exploitabilityScore"
]
impactScore = data["data"][0]["metrics"]["cvssMetricV31"][0]["impactScore"]
severity = data["data"][0]["metrics"]["cvssMetricV31"][0]["cvssData"].get(
"baseSeverity", None
)
hash_value = compute_hash(
{
"baseScore": baseScore,
"exploitabilityScore": exploitabilityScore,
"impactScore": impactScore,
"severity": severity,
}
)
cve_details = {
"id": cve_id,
"baseScore": baseScore,
"exploitabilityScore": exploitabilityScore,
"impactScore": impactScore,
"severity": severity,
"hash": hash_value,
}
return cve_details, elapsed_time
else:
return {
"id": cve_id,
"error_code": "NO_CVSS_METRIC",
"error_message": "No CVSS metric found",
}, elapsed_time
except urllib.error.HTTPError as e:
elapsed_time = time.time() - start_time
return {
"id": cve_id,
"error_code": "HTTP_ERROR",
"error_message": str(e),
}, elapsed_time
except Exception as e:
elapsed_time = time.time() - start_time
return {
"id": cve_id,
"error_code": "GENERAL_ERROR",
"error_message": str(e),
}, elapsed_time
def collect_cve_ids(directory_path):
cve_ids = set()
# Iterate over files in the directory
for root, dirs, files in os.walk(directory_path):
for file in files:
if file.endswith(".json"):
file_path = os.path.join(root, file)
print(f"Processing file: {file_path}")
with open(file_path, "r") as f:
try:
data = json.load(f)
os_versions = data.get("OSVersions", [])
for os_version in os_versions:
security_releases = os_version.get("SecurityReleases", [])
for release in security_releases:
cves = release.get("CVEs", {})
for cve_id in cves.keys():
if cve_id:
cve_ids.add(cve_id)
except json.JSONDecodeError as e:
print(f"Error decoding JSON from file: {file_path}, error: {e}")
print(f"Collected {len(cve_ids)} unique CVE IDs.")
return list(cve_ids)
def fetch_and_save_cve_details(cve_ids):
cve_details_list = []
total_fetch_time = 0
for cve_id in cve_ids:
print(f"Fetching details for {cve_id}")
print(f"API Request: {VULNCHECK_API_URL}?cve={cve_id}")
cve_details, fetch_time = get_cve_details(cve_id)
print(f"Details for {cve_id}: {json.dumps(cve_details, indent=4)}")
cve_details_list.append(cve_details)
total_fetch_time += fetch_time
output_data = {
"TotalFetchTime": total_fetch_time,
"UpdateHash": compute_hash(cve_details_list),
"CVE_Details": cve_details_list,
}
# Save the gathered CVE details to a new JSON file
output_file = "cache/cve_details.json"
with open(output_file, "w") as outfile:
json.dump(output_data, outfile, indent=4)
print(f"Saved CVE details to {output_file}")
# Run the script
if __name__ == "__main__":
cve_ids = collect_cve_ids(SOFA_REPO_PATH)
print(f"Collected CVE IDs: {cve_ids}")
fetch_and_save_cve_details(cve_ids)