forked from carbonblack/cbfeeds
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvalidate_feed.py
150 lines (119 loc) · 4.73 KB
/
validate_feed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import sys
import json
import optparse
import cbfeeds
def build_cli_parser():
"""
generate OptionParser to handle command line switches
"""
usage = "usage: %prog [options]"
desc = "Validate a Carbon Black feed"
cmd_parser = optparse.OptionParser(usage=usage, description=desc)
cmd_parser.add_option("-f", "--feedfile", action="store", type="string", dest="feed_filename",
help="Feed Filename to validate")
cmd_parser.add_option("-p", "--pedantic", action="store_true", default=False, dest="pedantic",
help="Validates that no non-standard JSON elements exist")
cmd_parser.add_option("-e", "--exclude", action="store", default=None, dest="exclude",
help="Filename of 'exclude' list - newline delimited indicators to consider invalid")
cmd_parser.add_option("-i", "--include", action="store", default=None, dest="include",
help="Filename of 'include' list - newline delimited indicators to consider valid")
return cmd_parser
def validate_file(feed_filename):
"""
validate that the file exists and is readable
"""
f = open(feed_filename)
contents = f.read()
return contents
def validate_json(contents):
"""
validate that the file is well-formed JSON
"""
return json.loads(contents, strict=False)
def validate_feed(feed, pedantic=False):
"""
validate that the file is valid as compared to the CB feeds schema
"""
# verify that we have both of the required feedinfo and reports elements
#
if "feedinfo" not in feed:
raise Exception("No 'feedinfo' element found!")
if "reports" not in feed:
raise Exception("No 'reports' element found!")
# set up the cbfeed object
#
feed = cbfeeds.CbFeed(feed["feedinfo"], feed["reports"])
# validate the feed
# this validates that all required fields are present, and that
# all required values are within valid ranges
#
feed.validate(pedantic)
return feed
def validate_against_include_exclude(feed, include, exclude):
"""
ensure that no feed indicators are 'excluded' or blacklisted
"""
for ioc in feed.iter_iocs():
if ioc["ioc"] in exclude and not ioc["ioc"] in include:
raise Exception(ioc)
def gen_include_exclude_sets(include_filename, exclude_filename):
"""
generate an include and an exclude set of indicators by
reading indicators from a flat, newline-delimited file
"""
include = set()
exclude = set()
if include_filename:
for indicator in open(include_filename).readlines():
include.add(indicator.strip())
if exclude_filename:
for indicator in open(exclude_filename).readlines():
exclude.add(indicator.strip())
return include, exclude
if __name__ == "__main__":
parser = build_cli_parser()
options, args = parser.parse_args(sys.argv)
if not options.feed_filename:
print("-> Must specify a feed filename to validate; use the -f switch or --help for usage")
sys.exit(0)
# generate include and exclude (whitelist and blacklist) sets of indicators
# feed validation will fail if a feed ioc is blacklisted unless it is also whitelisted
#
include, exclude = gen_include_exclude_sets(options.include, options.exclude)
try:
contents = validate_file(options.feed_filename)
print("-> Validated that file exists and is readable")
except Exception as e:
print("-> Unable to validate that file exists and is readable")
print("-> Details:")
print()
print(e)
sys.exit(0)
try:
feed = validate_json(contents)
print("-> Validated that feed file is valid JSON")
except Exception as e:
print("-> Unable to validate that file is valid JSON")
print("-> Details:")
print()
print(e)
sys.exit(0)
try:
feed = validate_feed(feed, pedantic=options.pedantic)
print("-> Validated that the feed file includes all necessary CB elements")
print("-> Validated that all element values are within CB feed requirements")
if options.pedantic:
print("-> Validated that the feed includes no non-CB elements")
except Exception as e:
print("-> Unable to validate that the file is a valid CB feed")
print("-> Details:")
print()
print(e)
sys.exit(0)
if len(exclude) > 0 or len(include) > 0:
try:
validate_against_include_exclude(feed, include, exclude)
print("-> Validated against include and exclude lists")
except Exception as e:
print("-> Unable to validate against the include and exclude lists")
print(e)