-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathsearch.py
187 lines (162 loc) · 6.64 KB
/
search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import re
from typing import Generator, List, Tuple
import apsw
import orjson
from vdb.lib import db6, utils
from vdb.lib.cve_model import CVE, CVE1
from vdb.lib.utils import load_json
IS_ADVISORY = re.compile("^[A-Z]{1,7}-")
def filter_hits(raw_hits: List, compare_ver: str) -> List:
filtered_list = []
for ahit in raw_hits:
cve_id = ahit[0]
vers = ahit[4]
if utils.vers_compare(compare_ver, vers):
filtered_list.append(
{
"cve_id": cve_id,
"type": ahit[1],
"namespace": ahit[2],
"name": ahit[3],
"vers": vers,
"purl_prefix": ahit[-1],
}
)
return filtered_list
def get_cve_data(db_conn: apsw.Connection | None, index_hits: List, search_str: str) -> Generator:
"""Get CVE data for the index results
Args:
db_conn: DB Connection or None to create a new one
index_hits: Hits from one of the search methods
search_str: Original search string used
Returns:
generator: generator for CVE data with original source data as a pydantic model
"""
if not db_conn:
db_conn, _ = db6.get(read_only=True)
for ahit in index_hits:
results = exec_query(
db_conn,
"SELECT DISTINCT cve_id, type, namespace, name, source_data_hash, json(source_data), json(override_data), purl_prefix FROM cve_data WHERE cve_id = ? AND purl_prefix = ? GROUP BY purl_prefix ORDER BY cve_id DESC;",
(ahit["cve_id"], ahit["purl_prefix"]),
)
for res in results:
yield {
"cve_id": res[0],
"type": res[1],
"namespace": res[2],
"name": res[3],
"matching_vers": ahit["vers"],
"matched_by": search_str,
"source_data_hash": res[4],
"source_data": (
CVE(root=CVE1.model_validate(orjson.loads(res[5]), strict=False))
if res[5]
else None
),
"override_data": (orjson.loads(res[6]) if res[6] else None),
"purl_prefix": res[7],
}
def search_by_any(any_str: str, with_data: bool = False) -> List:
"""Convenient method to search by a string"""
if any_str.startswith("pkg:"):
return search_by_purl_like(any_str, with_data)
if IS_ADVISORY.search(any_str):
return search_by_cve(any_str, with_data)
if any_str.startswith("http"):
return search_by_url(any_str, with_data)
return search_by_cpe_like(any_str, with_data)
def search_by_cpe_like(cpe: str, with_data: bool = False) -> List:
"""Search by CPE or colon-separate strings"""
db_conn, index_conn = db6.get(read_only=True)
if cpe.startswith("cpe:"):
vendor, package, version, _ = utils.parse_cpe(cpe)
elif cpe.count(":") == 2:
vendor, package, version = cpe.split(":")
else:
return []
# check for vendor name in both namespace and type
raw_hits = exec_query(
index_conn,
"SELECT DISTINCT cve_id, type, namespace, name, vers, purl_prefix FROM cve_index where (namespace = ? OR type = ?) AND name = ?;",
(vendor, vendor, package),
)
filtered_list = filter_hits(raw_hits, version)
if with_data:
return list(get_cve_data(db_conn, filtered_list, cpe))
return filtered_list
def search_by_purl_like(purl: str, with_data: bool = False) -> List:
"""Search by purl like string"""
db_conn, index_conn = db6.get(read_only=True)
purl_obj = utils.parse_purl(purl)
if purl_obj:
ptype = purl_obj.get("type")
namespace = purl_obj.get("namespace")
name = purl_obj.get("name")
version = purl_obj.get("version", "*")
purl_prefix = f"pkg:{ptype}/"
if namespace:
purl_prefix = f"{purl_prefix}{namespace}/"
# Handle distro names for linux os purls by prefixing distro name to name
if purl_obj.get("qualifiers") and purl_obj["qualifiers"].get("distro_name"):
distro_name = purl_obj["qualifiers"].get("distro_name")
name = f"{distro_name}/{name}"
purl_prefix = f"{purl_prefix}{name}"
args = (purl_prefix,)
raw_hits = exec_query(
index_conn,
"SELECT DISTINCT cve_id, type, namespace, name, vers, purl_prefix FROM cve_index where purl_prefix = ?;",
args,
)
filtered_list = filter_hits(raw_hits, version)
if with_data:
return list(get_cve_data(db_conn, filtered_list, purl))
return filtered_list
return []
def search_by_cve(cve_id: str, with_data: bool = False, with_limit: int | None = None) -> List:
"""Search by CVE"""
db_conn, index_conn = db6.get(read_only=True)
filter_part = "cve_id LIKE ?" if "%" in cve_id else "cve_id = ?"
filter_part = f"{filter_part} ORDER BY cve_id DESC"
args = [cve_id]
if with_limit and isinstance(with_limit, int):
filter_part = f"{filter_part} LIMIT ?"
args.append(with_limit)
args = tuple(args)
raw_hits = exec_query(
index_conn,
f"SELECT DISTINCT cve_id, type, namespace, name, vers, purl_prefix FROM cve_index where {filter_part}",
args,
)
filtered_list = filter_hits(raw_hits, "*")
if with_data:
return list(get_cve_data(db_conn, filtered_list, cve_id))
return filtered_list
def search_by_url(url: str, with_data: bool = False) -> List:
"""Search by URL"""
purl_obj = utils.url_to_purl(url)
if not purl_obj:
return []
name = purl_obj["name"]
purl_str = (
f"pkg:{purl_obj['type']}/{purl_obj['namespace']}/{name}"
if purl_obj["namespace"]
else f"pkg:{purl_obj['type']}/{name}"
)
if purl_obj["version"]:
purl_str = f"{purl_str}@{purl_obj['version']}"
return search_by_purl_like(purl_str, with_data)
def search_by_cdx_bom(bom_file: str, with_data: bool = False) -> Generator:
"""Search by CycloneDX BOM file"""
cdx_obj = load_json(bom_file)
for component in cdx_obj.get("components", []):
if component.get("purl"):
yield search_by_purl_like(component["purl"], with_data)
if component.get("cpe"):
yield search_by_cpe_like(component["cpe"], with_data)
def latest_malware(with_limit=20, with_data=False) -> Generator:
"""Search for latest malware with CVE ID beginning with MAL-"""
yield search_by_cve("MAL-%", with_data=with_data, with_limit=with_limit)
def exec_query(conn, query: str, args: Tuple[str, ...]) -> list:
res = conn.execute(query, args)
return res.fetchall()