-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
Copy pathCertificateReputation.py
323 lines (255 loc) · 13.5 KB
/
CertificateReputation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
import demistomock as demisto
from CommonServerPython import * # noqa # pylint: disable=unused-wildcard-import
from CommonServerUserPython import * # noqa # pylint: disable=unused-wildcard-import
import dateparser
from datetime import timedelta
from enum import Enum
from typing import Any
# Threshold defining "long expiration". When validity_not_after - validity_not_before is greater than
# this value the certificate will be marked as Long Expiration
# We use 398 days here (See https://blog.mozilla.org/security/2020/07/09/reducing-tls-certificate-lifespans-to-398-days/)
LONG_EXPIRATION_DT = timedelta(days=398)
''' STANDALONE FUNCTION '''
class CertificateValidationTag(Enum):
EXPIRED = 'EXPIRED'
NOT_VALID_YET = 'NOT_VALID_YET'
INVALID_VALIDITY_WINDOW = 'INVALID_VALIDITY_WINDOW'
LONG_EXPIRATION = 'LONG_EXPIRATION'
WILDCARD_CERTIFICATE = 'WILDCARD_CERTIFICATE'
SELF_ISSUED = 'SELF_ISSUED'
DOMAIN_CONTROL_VALIDATED = 'DOMAIN_CONTROL_VALIDATED'
SELF_SIGNED = 'SELF_SIGNED'
INVALID_DISTINGUISHED_NAMES = 'INVALID_DISTINGUISHED_NAMES'
def get_indicator_from_value(indicator_value: str):
try:
res = demisto.executeCommand("findIndicators", {'query': f'value:"{indicator_value}" and type:Certificate'})
indicator = res[0]['Contents'][0]
return indicator
except BaseException:
return None
def indicator_set_validation_checks(ivalue: str, checks: list[CertificateValidationTag]) -> None:
# we call setIndicator for each check because if you pass the full list to setIndicator at once
# it will just set the field with the stringified version of the list
for c in checks:
demisto.executeCommand('setIndicator', {
"value": ivalue,
"type": "Certificate",
"certificatevalidationchecks": c.value
})
def certificate_fields_to_context(certindicator_fields: dict[str, Any]) -> dict[str, Any] | None:
pem: str | None
if (pem := certindicator_fields.get('pem')) is None:
return None
result = demisto.executeCommand('CertificateExtract', {'pem': pem})
if len(result) == 0:
return None
context = result[0]
if (entry_context := context.get('EntryContext')) is None:
return None
for k in list(entry_context.keys()):
if k != Common.Certificate.CONTEXT_PATH:
entry_context.pop(k)
demisto.debug(f"{entry_context!r}")
return entry_context
def dbot_context(value: str, certificate_context: dict[str, Any]
) -> tuple[list[CertificateValidationTag], list[str], dict[str, Any]]:
comments: list[str] = []
tags: list[CertificateValidationTag] = []
some_checks_not_performed: bool = False
current_score = Common.DBotScore.NONE
# check for validity over time
now = dateparser.parse('now Z')
if now is None:
raise RuntimeError("Dateparser failed parsing 'now Z'")
validity_not_after = certificate_context.get('ValidityNotAfter')
parsed_validity_not_after = None
if validity_not_after is not None:
parsed_validity_not_after = dateparser.parse(validity_not_after)
validity_not_before = certificate_context.get('ValidityNotBefore')
parsed_validity_not_before = None
if validity_not_before is not None:
parsed_validity_not_before = dateparser.parse(validity_not_before)
if parsed_validity_not_after is None:
comments.append('ValidityNotAfter not specified, expiration check not performed')
some_checks_not_performed = True
elif now > parsed_validity_not_after:
comments.append(f'{CertificateValidationTag.EXPIRED.value} '
f'Certificate expired: {now.strftime("%Y-%m-%dT%H:%M:%S.000Z")} > '
f'{validity_not_after}')
tags.append(CertificateValidationTag.EXPIRED)
current_score = max(current_score, Common.DBotScore.SUSPICIOUS)
if parsed_validity_not_before is None:
comments.append('ValidityNotBefore not specified, validation check not performed')
some_checks_not_performed = True
elif now < parsed_validity_not_before:
comments.append(f'{CertificateValidationTag.NOT_VALID_YET.value} '
f'Certificate not valid yet: {now.strftime("%Y-%m-%dT%H:%M:%S.000Z")} < '
f'{validity_not_before}')
tags.append(CertificateValidationTag.NOT_VALID_YET)
current_score = max(current_score, Common.DBotScore.SUSPICIOUS)
# check for blatant issues with validity dates and long expiration
if parsed_validity_not_after is not None and parsed_validity_not_before is not None:
if parsed_validity_not_before >= parsed_validity_not_after:
comments.append(f'{CertificateValidationTag.INVALID_VALIDITY_WINDOW.value} '
f'Certificate is invalid: Validity not before {validity_not_before} > '
f'Validity not after {validity_not_after}')
tags.append(CertificateValidationTag.INVALID_VALIDITY_WINDOW)
current_score = Common.DBotScore.BAD
elif (parsed_validity_not_after - parsed_validity_not_before) > LONG_EXPIRATION_DT:
comments.append(
f'{CertificateValidationTag.LONG_EXPIRATION.value} Certificate has long expiration (> {LONG_EXPIRATION_DT})')
tags.append(CertificateValidationTag.LONG_EXPIRATION)
current_score = max(current_score, Common.DBotScore.SUSPICIOUS)
# check for wildcard names
names = certificate_context.get('Name')
if names is None:
comments.append('Name not specified')
some_checks_not_performed = True
else:
wildcard_name = next((n for n in names if n.startswith('*.')), None)
if wildcard_name is not None:
comments.append(
f'{CertificateValidationTag.WILDCARD_CERTIFICATE.value} Certificate contains at least one name with wildcard')
tags.append(CertificateValidationTag.WILDCARD_CERTIFICATE)
current_score = max(current_score, Common.DBotScore.SUSPICIOUS)
# check on subject and issuer
subject_dn = certificate_context.get('SubjectDN')
if subject_dn is None:
comments.append('SubjectDN not specified')
some_checks_not_performed = True
issuer_dn = certificate_context.get('IssuerDN')
if issuer_dn is None:
comments.append('IssuerDN not specified')
some_checks_not_performed = True
# self-issued iff subject_dn == issuer_dn
if subject_dn is not None and issuer_dn is not None and subject_dn == issuer_dn:
comments.append(f'{CertificateValidationTag.SELF_ISSUED.value} Self-Issued certificate')
tags.append(CertificateValidationTag.SELF_ISSUED)
current_score = max(current_score, Common.DBotScore.SUSPICIOUS)
# domain control validated:
# - if there is only a CN element in the subject DN
# - if there is a OU=Domain Control Validated in the subject DN
if subject_dn is not None:
# replace \, and \+ with the long escaping \2c and \2b
long_escaped_subject_dn = subject_dn.replace("\\,", "\\2c")
long_escaped_subject_dn = long_escaped_subject_dn.replace("\\+", "\\2b")
# we then split RDN (separated by ,) and multi-valued RDN (sep by +)
rdns = long_escaped_subject_dn.replace('+', ',').split(',')
# check conditions
dv: bool = len(rdns) == 1 and rdns[0].startswith('CN=')
dv = dv or next((rdn for rdn in rdns if rdn.strip() == "OU=Domain Control Validated"), None) is not None
if dv:
comments.append(f'{CertificateValidationTag.DOMAIN_CONTROL_VALIDATED.value} Certificate is Domain Control Validated')
tags.append(CertificateValidationTag.DOMAIN_CONTROL_VALIDATED)
current_score = max(current_score, Common.DBotScore.SUSPICIOUS)
# self-signed iff subject key identifier == authority key identifier
extensions = certificate_context.get('Extension')
if extensions is None:
comments.append('No Extensions available, some checks could not be performed')
some_checks_not_performed = True
else:
subject_key_identifier = next((e.get('Value') for e in extensions
if e.get('OID') == '2.5.29.14'), None) # disable-secrets-detection
authority_key_identifier = next((e.get('Value') for e in extensions
if e.get('OID') == '2.5.29.35'), None) # disable-secrets-detection
subject_key_identifier_digest = None
authority_key_identifier_ki = None
if subject_key_identifier is None or (subject_key_identifier_digest := subject_key_identifier.get('Digest')) is None:
some_checks_not_performed = True
comments.append('Valid SubjectKeyIdentifier Extension not available, some checks not performed')
if authority_key_identifier is None or (
authority_key_identifier_ki := authority_key_identifier.get('KeyIdentifier')) is None:
some_checks_not_performed = True
comments.append('Valid AuthorityKeyIdentifier Extension not available, some checks not performed')
if (subject_key_identifier_digest is not None
and authority_key_identifier_ki is not None
and subject_key_identifier_digest == authority_key_identifier_ki):
comments.append(f'{CertificateValidationTag.SELF_SIGNED.value} Self-Signed Certificate')
tags.append(CertificateValidationTag.SELF_SIGNED)
current_score = max(current_score, Common.DBotScore.SUSPICIOUS)
elif (subject_key_identifier_digest is not None
and authority_key_identifier_ki is None
and subject_dn is not None
and issuer_dn is not None
and subject_dn == issuer_dn):
comments.append(f'{CertificateValidationTag.SELF_SIGNED.value} Self-Signed Certificate')
tags.append(CertificateValidationTag.SELF_SIGNED)
current_score = max(current_score, Common.DBotScore.SUSPICIOUS)
# if self-signed we also check this is self-issued
if (CertificateValidationTag.SELF_SIGNED in tags
and subject_dn is not None
and issuer_dn is not None
and subject_dn != issuer_dn):
comments.append(f'{CertificateValidationTag.INVALID_DISTINGUISHED_NAMES.value}'
' Self-Signed Certificate with different Issuer DN and Subject DN')
tags.append(CertificateValidationTag.INVALID_DISTINGUISHED_NAMES)
current_score = Common.DBotScore.BAD
if not some_checks_not_performed:
# if we didn't have to skip any check we can mark the cert as good
# if the current score is not already higher (worse) than GOOD
current_score = max(current_score, Common.DBotScore.GOOD)
return tags, comments, {
'DBotScore': {
'Score': current_score,
'Vendor': 'X509Certificate',
'Type': 'certificate',
'Indicator': value
}
}
''' COMMAND FUNCTION '''
def certificate_reputation_command(args: dict[str, Any]) -> dict[str, Any]:
input_ = args.get('input')
if input_ is None:
raise ValueError("input argument is required")
update_indicator = argToBoolean(args.get('update_indicator', 'true'))
indicator = get_indicator_from_value(input_)
if indicator is None:
return {
'Type': entryTypes['note'],
'HumanReadable': '*No matching indicators*',
'ReadableContentsFormat': formats['markdown']
}
comments: list[str] = []
indicator_value = indicator.get('value')
if indicator_value is None:
raise ValueError("Matching indicator has no value (this should not be possible)")
standard_context = {}
if (fields := indicator.get('CustomFields')) is not None:
if 'pem' not in fields:
comments.append("*PEM field is empty*")
elif (certificate_context := certificate_fields_to_context(fields)) is not None:
standard_context.update(certificate_context)
if (certificate_stdcontext := standard_context.get(Common.Certificate.CONTEXT_PATH)) is not None:
demisto.debug(f"{certificate_stdcontext!r}")
if isinstance(certificate_stdcontext, list):
certificate_stdcontext = certificate_stdcontext[0] if certificate_stdcontext else {}
demisto.debug(f"{certificate_stdcontext!r}")
tags, check_comments, dbot_score = dbot_context(indicator_value, certificate_stdcontext)
standard_context.update(dbot_score)
if update_indicator:
# we use this because it seems that enrichIndicators is ignoring additional context
# in the output
indicator_set_validation_checks(indicator_value, tags)
readable_output = f"Score for {indicator_value} is {standard_context['DBotScore']['Score']}\n"
readable_output += "## Notes\n"
readable_output += '\n'.join(comments)
readable_output += '\n'
readable_output += '\n'.join(check_comments)
return {
'Type': entryTypes['note'],
'EntryContext': standard_context,
'Contents': standard_context['DBotScore']['Score'],
'ContentsFormat': formats['text'],
'HumanReadable': readable_output,
'ReadableContentsFormat': formats['markdown'],
'IgnoreAutoExtract': True
}
''' MAIN FUNCTION '''
def main():
try:
return_results(certificate_reputation_command(demisto.args()))
except Exception as ex:
return_error(f'Failed to execute CertificateReputation. Error: {str(ex)}')
''' ENTRY POINT '''
if __name__ in ('__main__', '__builtin__', 'builtins'):
main()