-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
ACTIIndicatorFeed.py
149 lines (123 loc) · 6.89 KB
/
ACTIIndicatorFeed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
from typing import Dict, Union
from CommonServerPython import *
from JSONFeedApiModule import * # noqa: E402
def custom_build_iterator(client: Client, feed: Dict, limit, **kwargs) -> List:
"""
Implement the http_request with API that works with pagination and filtering. Uses the integration context to
save last fetch time to each indicator type
Args:
client: Client manage all http requests
feed: dictionary holds all data needed to the specific service (Services- IP, Domain, URL)
limit: maximum number of indicators to fetch
Returns:
list of indicators returned from api. Each indicator is represented in dictionary
"""
fetch_time = demisto.params().get('fetch_time', '14 days')
params: dict = feed.get('filters', {})
current_indicator_type = feed.get('indicator_type', '')
start_date, end_date = parse_date_range(fetch_time, utc=True)
integration_context = get_integration_context()
last_fetch = integration_context.get(f'{current_indicator_type}_fetch_time')
if last_fetch:
start_date = last_fetch # pragma: no cover
page_number = 1
params['end_date'] = end_date
params['start_date'] = start_date
params['page_size'] = 200
if not limit:
limit = 20000 # This limit was added to make sure we do not hit a timeout on the fetch
integration_context[f'{current_indicator_type}_fetch_time'] = str(params['end_date'])
set_integration_context(integration_context)
more_indicators = True
result: list = []
while more_indicators:
params['page'] = page_number
demisto.debug(f"Initiating API call to ACTI with url: {feed.get('url', client.url)} ,with parameters: "
f"{params} and page number: {page_number} ")
try:
r = requests.get(
url=feed.get('url', client.url),
verify=client.verify,
auth=client.auth,
cert=client.cert,
headers=client.headers,
params=params,
**kwargs
)
r.raise_for_status()
data = r.json()
if data.get('total_size'):
result.extend(jmespath.search(expression=feed.get('extractor'), data=data))
more_indicators = data.get('more')
page_number += 1
if len(result) >= limit:
break # pragma: no cover
except ValueError as VE:
raise ValueError(f'Could not parse returned data to Json. \n\nError massage: {VE}') # pragma: no cover
except TypeError as TE:
raise TypeError(f'Error massage: {TE}\n\n Try To check extractor value')
except ConnectionError as exception: # pragma: no cover
# Get originating Exception in Exception chain
error_class = str(exception.__class__) # pragma: no cover
err_type = f"""<{error_class[error_class.find("'") + 1: error_class.rfind("'")]}>""" # pragma: no cover
err_msg = 'Verify that the server URL parameter' \
' is correct and that you have access to the server from your host.' \
'\nError Type: {}\nError Number: [{}]\nMessage: {}\n' \
.format(err_type, exception.errno, exception.strerror)
raise DemistoException(err_msg, exception) # pragma: no cover
demisto.debug(f"Received in total {len(result)} indicators from ACTI Feed")
return result
def create_fetch_configuration(indicators_type: list, filters: dict, params: dict) -> Dict[str, dict]:
mapping_by_indicator_type = { # pragma: no cover
'IP': {
'last_seen_as': 'malwaretypes',
'threat_types': 'primarymotivation',
'malware_family': 'malwarefamily',
'severity': 'sourceoriginalseverity'},
'Domain': {
'last_seen_as': 'malwaretypes',
'threat_types': 'primarymotivation',
'malware_family': 'malwarefamily',
'severity': 'sourceoriginalseverity'},
'URL': {
'last_seen_as': 'malwaretypes',
'threat_types': 'primarymotivation',
'malware_family': 'malwarefamily',
'severity': 'sourceoriginalseverity'}
}
url_by_type = {"IP": 'https://api.intelgraph.idefense.com/rest/threatindicator/v0/ip', # pragma: no cover
"Domain": 'https://api.intelgraph.idefense.com/rest/threatindicator/v0/domain',
"URL": 'https://api.intelgraph.idefense.com/rest/threatindicator/v0/url'}
common_conf = {'extractor': 'results',
'indicator': 'display_text',
'insecure': params.get('insecure', False),
'custom_build_iterator': custom_build_iterator,
'filters': filters}
indicators_configuration = {}
for ind in indicators_type:
indicators_configuration[ind] = dict(common_conf)
indicators_configuration[ind].update({'url': url_by_type[ind]})
indicators_configuration[ind].update({'indicator_type': ind})
indicators_configuration[ind].update({'mapping': mapping_by_indicator_type[ind]})
return indicators_configuration
def build_feed_filters(params: dict) -> Dict[str, Optional[Union[str, list]]]:
filters = {'severity.from': params.get('severity'),
'threat_types.values': params.get('threat_type'),
'confidence.from': params.get('confidence_from'),
'malware_family.values': params.get('malware_family', '').split(',')
if params.get('malware_family') else None}
return {k: v for k, v in filters.items() if v is not None}
def main(): # pragma: no cover
params = demisto.params()
filters: Dict[str, Optional[Union[str, list]]] = build_feed_filters(params)
indicators_type: list = argToList(params.get('indicator_type', []))
params['feed_name_to_config'] = create_fetch_configuration(indicators_type, filters, params)
PACK_VERSION = get_pack_version()
DEMISTO_VERSION = demisto.demistoVersion()
DEMISTO_VERSION = f'{DEMISTO_VERSION["version"]}.{DEMISTO_VERSION["buildNumber"]}'
params['headers'] = {"Content-Type": "application/json",
'auth-token': params.get('api_token').get("password"),
'User-Agent': f'AccentureCTI Pack/{PACK_VERSION} Palo Alto XSOAR/{DEMISTO_VERSION}'}
feed_main(params, 'ACTI Indicator Feed', 'acti')
if __name__ in ('__main__', '__builtin__', 'builtins'):
main()