forked from redcanaryco/surveyor
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcommon.py
224 lines (184 loc) · 7.73 KB
/
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import logging
import os
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Tuple, Optional, Any
from help import log_echo
class AuthenticationError(Exception):
pass
@dataclass(eq=True, frozen=True)
class Tag:
tag: str
data: Optional[str] = None
@dataclass(eq=True, frozen=True)
class Result:
hostname: str
username: str
path: str
command_line: str
other_data: Optional[Tuple] = None # use tuples as they are immutable
class Product(ABC):
"""
Base class for surveyor product implementations.
Subclasses must implement all abstract methods and invoke this class's constructor.
"""
product: Optional[str] = None # a string describing the product (e.g. cbr/cbth/defender/s1)
profile: str # the profile is used to authenticate to the target platform
_results: dict[Tag, list[Result]]
log: logging.Logger
_tqdm_echo: bool = False
def __init__(self, product, profile, tqdm_echo: bool = False, **kwargs):
self.profile = profile
self.product = product
self._tqdm_echo = tqdm_echo
self.log = logging.getLogger(f'surveyor.{self.product}')
if not self.profile:
self.profile = 'default'
self._results = dict()
self.log.debug(f'Authenticating to {self.product}')
self._authenticate()
self.log.debug(f'Authenticated')
@abstractmethod
def _authenticate(self) -> None:
"""
Authenticate to the target product API.
"""
raise NotImplementedError()
# noinspection PyMethodMayBeStatic
def base_query(self) -> dict:
"""
Get base query parameters for the product.
"""
return dict()
@abstractmethod
def build_query(self, filters: dict) -> Any:
"""
Build a base query for the product.
"""
raise NotImplementedError()
@abstractmethod
def process_search(self, tag: Tag, base_query: dict, query: str) -> None:
"""
Perform a process search.
"""
raise NotImplementedError()
@abstractmethod
def nested_process_search(self, tag: Tag, criteria: dict, base_query: dict) -> None:
"""
Performed a nested process search.
"""
raise NotImplementedError()
def has_results(self) -> bool:
"""
Test whether product has any search results.
"""
return len(self._results) > 0
def clear_results(self) -> None:
"""
Clear all stored results.
"""
self._results.clear()
def get_results(self, final_call: bool = True) -> dict[Tag, list[Result]]:
"""
Get results from all process_search and nested_process_search calls.
:param final_call: Indicates whether this is the final time get_results will be called for this
set of process searches.
:returns: A dictionary whose keys represent the tags used to identify searches. The dictionary values
are lists containing the search results as tuples with members: hostname, username, path, command_line.
"""
return self._results
# noinspection PyMethodMayBeStatic
def get_other_row_headers(self) -> list[str]:
"""
Retrieve any additional headers this product includes in results.
"""
return list()
def _add_results(self, results: list[Result], tag: Optional[Tag] = None):
"""
Add results to the result store.
"""
if not tag:
tag = Tag('_default')
if tag not in self._results:
self._results[tag] = list()
self._results[tag].extend(results)
def _echo(self, message: str, level: int = logging.DEBUG):
"""
Write a message to STDOUT and the debug log stream.
"""
log_echo(message, self.log, level, use_tqdm=self._tqdm_echo)
def sigma_translation(product: str, sigma_rules: list, pq: bool = False) -> dict:
"""
Translates a list of sigma rules into the target product language
Parameters
----------
product : str
Name of target product
sigma_rules : list
List of files containing sigma rules or YML-formatted strings
Does not support a mixed list of files and strings
pq : bool
Only used for SentinelOne translations (default is False)
If true, translates into PowerQuery syntax
Otherwise, uses DeepVisibility
"""
supports_json_ouput = True
try:
from sigma.collection import SigmaCollection # type: ignore
from sigma.plugins import SigmaPluginDirectory # type: ignore
plugins = SigmaPluginDirectory.default_plugin_directory()
except Exception as e:
raise e
if product in ('cbr','cbc'):
plugins.get_plugin_by_id('carbonblack').install()
from sigma.backends.carbonblack import CarbonBlackBackend # type: ignore
if product == 'cbr':
from sigma.pipelines.carbonblack import CarbonBlackResponse_pipeline as cb_pipeline # type: ignore
else:
from sigma.pipelines.carbonblack import CarbonBlack_pipeline as cb_pipeline # type: ignore
backend = CarbonBlackBackend(cb_pipeline())
elif product == 's1':
if pq:
plugins.get_plugin_by_id('sentinelone-pq').install()
from sigma.backends.sentinelone_pq import SentinelOnePQBackend # type: ignore
backend = SentinelOnePQBackend()
else:
plugins.get_plugin_by_id('sentinelone').install()
from sigma.backends.sentinelone import SentinelOneBackend # type: ignore
backend = SentinelOneBackend()
elif product == 'dfe':
supports_json_ouput = False
plugins.get_plugin_by_id('microsoft365defender').install()
from sigma.backends.kusto import KustoBackend # type: ignore
from sigma.pipelines.microsoft365defender import microsoft_365_defender_pipeline # type: ignore
backend = KustoBackend(microsoft_365_defender_pipeline())
elif product == 'cortex':
plugins.get_plugin_by_id('cortexxdr').install()
from sigma.backends.cortexxdr import CortexXDRBackend # type: ignore
backend = CortexXDRBackend()
are_files = [os.path.isfile(i) for i in sigma_rules]
if all(are_files): # if all items in the list are files
if product == "dfe":
rule_collection = [SigmaCollection.load_ruleset([i]) for i in sigma_rules] # load each file as a separate rule collection for DFE
else:
rule_collection = SigmaCollection.load_ruleset(sigma_rules) # type: ignore
elif not any(are_files): # if none of the items in the list are files, assume YML formatted strings
if product == 'dfe':
rule_collection = [SigmaCollection.from_yaml(i) for i in sigma_rules] # load each YML string as a separate rule collection for DFE
else:
rule_collection = SigmaCollection.merge([SigmaCollection.from_yaml(i) for i in sigma_rules]) # type: ignore
else:
logging.error("There appears to be a mix of files and YML strings. Cannot process a mixed list of values. Aborting.")
return {'queries': []}
if supports_json_ouput:
return backend.convert(rule_collection, "json") # type: ignore
else:
results: dict = {"queries":[]}
for r in rule_collection:
results['queries'].append({
'query': backend.convert_rule(r)[0] if product != 'dfe' else backend.convert(r)[0], # type: ignore
'id': r.id if product != 'dfe' else r.rules[0].id, # type: ignore
'title': r.title if product != 'dfe' else r.rules[0].title, # type: ignore
'description': r.description if product != 'dfe' else r.rules[0].description # type: ignore
})
return results