-
Notifications
You must be signed in to change notification settings - Fork 13
/
apiclient.py
287 lines (228 loc) · 11 KB
/
apiclient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
"""
To utilize this example please install requests. The rest of the dependencies are part of the Python 3 standard
library.
# pip install --upgrade requests
Note: this script was written for Python 3.6.X or greater.
Insert your BHE API creds in the BHE constants and change the PRINT constants to print desired data.
"""
import hmac
import hashlib
import base64
import requests
import datetime
from typing import Optional
BHE_DOMAIN = "localhost"
BHE_PORT = 8080
BHE_SCHEME = "http"
BHE_TOKEN_ID = "2a5da457-e3bf-4c8d-b51f-a220cb36971b"
BHE_TOKEN_KEY = "Jpg4NpTqzBfQM345xeRzLkMGCojr1SAyw+iNIyENfAicxc3kg9ArXg=="
PRINT_PRINCIPALS = False
PRINT_ATTACK_PATH_TIMELINE_DATA = False
PRINT_POSTURE_DATA = False
DATA_START = "1970-01-01T00:00:00.000Z"
DATA_END = datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z' # Now
class Credentials(object):
def __init__(self, token_id: str, token_key: str) -> None:
self.token_id = token_id
self.token_key = token_key
class APIVersion(object):
def __init__(self, api_version: str, server_version: str) -> None:
self.api_version = api_version
self.server_version = server_version
class Domain(object):
def __init__(self, name: str, id: str, collected: bool, domain_type: str, impact_value: int) -> None:
self.name = name
self.id = id
self.type = domain_type
self.collected = collected
self.impact_value = impact_value
class AttackPath(object):
def __init__(self, id: str, title: str, domain: Domain) -> None:
self.id = id
self.title = title
self.domain_id = domain.id
self.domain_name = domain.name.strip()
def __lt__(self, other):
return self.exposure < other.exposure
class Client(object):
def __init__(self, scheme: str, host: str, port: int, credentials: Credentials) -> None:
self._scheme = scheme
self._host = host
self._port = port
self._credentials = credentials
def _format_url(self, uri: str) -> str:
formatted_uri = uri
if uri.startswith("/"):
formatted_uri = formatted_uri[1:]
return f"{self._scheme}://{self._host}:{self._port}/{formatted_uri}"
def _request(self, method: str, uri: str, body: Optional[bytes] = None) -> requests.Response:
# Digester is initialized with HMAC-SHA-256 using the token key as the HMAC digest key.
digester = hmac.new(self._credentials.token_key.encode(), None, hashlib.sha256)
# OperationKey is the first HMAC digest link in the signature chain. This prevents replay attacks that seek to
# modify the request method or URI. It is composed of concatenating the request method and the request URI with
# no delimiter and computing the HMAC digest using the token key as the digest secret.
#
# Example: GET /api/v1/test/resource HTTP/1.1
# Signature Component: GET/api/v1/test/resource
digester.update(f"{method}{uri}".encode())
# Update the digester for further chaining
digester = hmac.new(digester.digest(), None, hashlib.sha256)
# DateKey is the next HMAC digest link in the signature chain. This encodes the RFC3339 formatted datetime
# value as part of the signature to the hour to prevent replay attacks that are older than max two hours. This
# value is added to the signature chain by cutting off all values from the RFC3339 formatted datetime from the
# hours value forward:
#
# Example: 2020-12-01T23:59:60Z
# Signature Component: 2020-12-01T23
datetime_formatted = datetime.datetime.now().astimezone().isoformat("T")
digester.update(datetime_formatted[:13].encode())
# Update the digester for further chaining
digester = hmac.new(digester.digest(), None, hashlib.sha256)
# Body signing is the last HMAC digest link in the signature chain. This encodes the request body as part of
# the signature to prevent replay attacks that seek to modify the payload of a signed request. In the case
# where there is no body content the HMAC digest is computed anyway, simply with no values written to the
# digester.
if body is not None:
digester.update(body)
# Perform the request with the signed and expected headers
return requests.request(
method=method,
url=self._format_url(uri),
headers={
"User-Agent": "bhe-python-sdk 0001",
"Authorization": f"bhesignature {self._credentials.token_id}",
"RequestDate": datetime_formatted,
"Signature": base64.b64encode(digester.digest()),
"Content-Type": "application/json",
},
data=body,
)
def get_version(self) -> APIVersion:
response = self._request("GET", "/api/version")
payload = response.json()
return APIVersion(api_version=payload["data"]["API"]["current_version"], server_version=payload["data"]["server_version"])
def get_domains(self) -> list[Domain]:
response = self._request('GET', '/api/v2/available-domains')
payload = response.json()['data']
domains = list()
for domain in payload:
domains.append(Domain(domain["name"], domain["id"], domain["collected"], domain["type"], domain["impactValue"]))
return domains
def get_paths(self, domain: Domain) -> list:
response = self._request('GET', '/api/v2/domains/' + domain.id + '/available-types')
path_ids = response.json()['data']
paths = list()
for path_id in path_ids:
# Get nice title from API and strip newline
path_title = self._request('GET', '/ui/findings/' + path_id + '/title.md')
# Create attackpath object
path = AttackPath(path_id, path_title.text.strip(), domain)
paths.append(path)
return paths
def get_path_principals(self, path: AttackPath) -> list:
# Get path details from API
response = self._request('GET', '/api/v2/domains/' + path.domain_id + '/details?finding=' + path.id + '&skip=0&limit=0&Accepted=eq:False')
payload = response.json()
# Build dictionary of impacted pricipals
if 'count' in payload:
path.impacted_principals = list()
for path_data in payload['data']:
# Check for both From and To to determine whether relational or configuration path
if (path.id.startswith('LargeDefault')):
from_principal = path_data['FromPrincipalProps']['name']
to_principal = path_data['ToPrincipalProps']['name']
principals = {
'Group': from_principal,
'Principal': to_principal
}
elif ('FromPrincipalProps' in path_data) and ('ToPrincipalProps' in path_data):
from_principal = path_data['FromPrincipalProps']['name']
to_principal = path_data['ToPrincipalProps']['name']
principals = {
'Non Tier Zero Principal': from_principal,
'Tier Zero Principal': to_principal
}
else:
principals = {
'User': path_data['Props']['name']
}
path.impacted_principals.append(principals)
path.principal_count = payload['count']
else:
path.principal_count = 0
return path
def get_path_timeline(self, path: AttackPath, from_timestamp: str, to_timestamp: str):
# Sparkline data
response = self._request('GET', '/api/v2/domains/' + path.domain_id + '/sparkline?finding=' + path.id + '&from=' + from_timestamp + '&to=' + to_timestamp)
exposure_data = response.json()['data']
events = list()
for event in exposure_data:
e = {}
e['finding_id'] = path.id
e['domain_id'] = path.domain_id
e['path_title'] = path.title
e['exposure'] = event['CompositeRisk']
e['finding_count'] = event['FindingCount']
e['principal_count'] = event['ImpactedAssetCount']
e['id'] = event['id']
e['created_at'] = event['created_at']
e['updated_at'] = event['updated_at']
e['deleted_at'] = event['deleted_at']
# Determine severity from exposure
e['severity'] = self.get_severity(e['exposure'])
events.append(e)
return events
def get_posture(self, from_timestamp: str, to_timestamp: str) -> list:
response = self._request('GET', '/api/v2/posture-stats?from=' + from_timestamp + '&to=' + to_timestamp)
payload = response.json()
return payload["data"]
def get_severity(self, exposure: int) -> str:
severity = 'Low'
if exposure > 40: severity = 'Moderate'
if exposure > 80: severity = 'High'
if exposure > 95: severity = 'Critical'
return severity
def main() -> None:
# This might be best loaded from a file
credentials = Credentials(
token_id=BHE_TOKEN_ID,
token_key=BHE_TOKEN_KEY,
)
# Create the client and perform an example call using token request signing
client = Client(scheme=BHE_SCHEME, host=BHE_DOMAIN, port=BHE_PORT, credentials=credentials)
version = client.get_version()
print("BHE Python API Client Example")
print(f"API version: {version.api_version} - Server version: {version.server_version}\n")
domains = client.get_domains()
print("Available Domains")
for domain in domains:
print(f"* {domain.name} (id: {domain.id}, collected: {domain.collected}, type: {domain.type}, exposure: {domain.impact_value})")
for domain in domains:
if domain.collected:
# Get paths for domain
attack_paths = client.get_paths(domain)
print(("\nProcessing %s attack paths for domain %s" % (len(attack_paths), domain.name)))
for attack_path in attack_paths:
print("Processing attack path %s" % attack_path.id)
# Get attack path principals
if (PRINT_PRINCIPALS):
path_principals = client.get_path_principals(attack_path)
print(path_principals.__dict__)
# Get attack path timeline
if (PRINT_ATTACK_PATH_TIMELINE_DATA):
path_events = client.get_path_timeline(
path = attack_path,
from_timestamp = DATA_START,
to_timestamp = DATA_END
)
print(path_events)
# Get posture data
if (PRINT_POSTURE_DATA):
posture_events = client.get_posture(
from_timestamp = DATA_START,
to_timestamp = DATA_END
)
print("%s events of posture data" % len(posture_events))
print(posture_events)
if __name__ == "__main__":
main()