-
Notifications
You must be signed in to change notification settings - Fork 58
/
Copy pathtest_etw.py
373 lines (292 loc) · 12.8 KB
/
test_etw.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
########################################################################
# Copyright 2017 FireEye Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
########################################################################
import unittest
import time
import ctypes as ct
import ctypes.wintypes as wt
import subprocess as sp
from etw import ETW, ProviderInfo
from etw.etw import TraceProperties, ProviderParameters, EventConsumer
from etw.GUID import GUID
from etw import evntrace as et
from etw import evntprov as ep
from etw.etw import get_keywords_bitmask
from .helpers import wininet as wi
from etw import common
class TestETW(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""
Prior to running each of our tests, we should start the ETW code, create and delete a share, and capture the
subsequent response.
:return: None
"""
# Instantiate our list where all of the results will be stored
cls.event_tufo_list = list()
cls.context_fields = {'Description', 'Task Name'}
cls.user_agent = 'TestAgent'
cls.url = 'www.gmail.com'
cls.port = 80
cls.verb = 'GET'
cls.size = 1337
return
def makeRequest(cls):
"""
Issue a WININET request based on the class parameters.
:return: None
"""
hInternet = wi.InternetOpenW(
cls.user_agent,
wi.INTERNET_OPEN_TYPE_DIRECT, None, None, 0)
if hInternet is None:
raise ct.WinError()
hSession = wi.InternetConnectW(hInternet, cls.url, cls.port, None, None, wi.INTERNET_SERVICE_HTTP, 0, 0)
if hSession is None:
raise ct.WinError()
hRequest = wi.HttpOpenRequestW(hSession, cls.verb, '', None, None, None, 0, 0)
if hRequest is None:
raise ct.WinError()
request_sent = wi.HttpSendRequestW(hRequest, None, 0, None, 0)
if request_sent == 0:
raise ct.WinError()
# Setup the necessary parameters to read the server's response
buff_size = wt.DWORD(cls.size)
buf = (ct.c_char * buff_size.value)()
keep_reading = 1
bytes_read = wt.DWORD(-1)
response_str = str()
while keep_reading == 1 and bytes_read.value != 0:
# Read the entire response.
keep_reading = wi.InternetReadFile(hRequest, buf, buff_size, ct.byref(bytes_read))
response_str += str(buf.value)
return response_str
def find_event(self, name):
"""
Retrieves an event from the event_tufo_list with the user's specified name. While the event
itself is a TuFo, we only return the dictionary portion since the name is only needed during the search.
:param name: The name of the event we want to find.
:return: An event matching the name specified or None if no events match.
"""
return next((tufo[1] for tufo in self.event_tufo_list if tufo[1]['Task Name'] == name), None)
def find_all_events(self, name):
"""
Retrieves all events matching the user's specified name from the event_tufo list. While the events themselves
are TuFos, we only return the dictionary portion since the name is only needed during the search.
:param name: The name of the events we want to find
:return: A list of all events matching the name. If no events are found, an empty list is returned.
"""
return [tufo[1] for tufo in self.event_tufo_list if tufo[1]['Task Name'] == name]
def trim_fields(self, event):
"""
We add additional fields for contextual information. In order to accurately test that we are parsing
the correct fields as reported by the event, we need to trim these off.
:return: A copy of the event without the contextual fields
"""
return {key: event[key] for key in event.keys() if key not in self.context_fields}
def test_etw_capture(self):
"""
Tests the etw capture
:return: None
"""
# Instantiate an ETW object
capture = ETW(providers=[ProviderInfo('Microsoft-Windows-WinINet',
GUID("{43D1A55C-76D6-4F7E-995C-64C711E5CAFE}"))],
event_callback=lambda event_tufo: self.event_tufo_list.append(event_tufo))
capture.start()
self.makeRequest()
# Ensure that we have a chance for all the events to come back
time.sleep(5)
# Stop the ETW instance
capture.stop()
event = self.find_event('WININET_READDATA')
self.assertTrue(event)
event = self.trim_fields(event)
# This event should have 3 fields
self.assertEqual(len(event), 3)
self.event_tufo = []
return
def test_etw_capture_multi_providers(self):
"""
Tests the etw capture class using multiple providers
:return: None
"""
# Instantiate an ETW object
providers = [ProviderInfo('Microsoft-Windows-WinINet',
GUID("{43D1A55C-76D6-4F7E-995C-64C711E5CAFE}")),
ProviderInfo('Microsoft-Windows-Kernel-Process',
GUID("{22FB2CD6-0E7B-422B-A0C7-2FAD1FD0E716}"))]
capture = ETW(providers=providers,
event_callback=lambda event_tufo: self.event_tufo_list.append(event_tufo))
capture.start()
# start ping
args = ['ping.exe']
p = sp.Popen(args, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
time.sleep(5)
p.kill()
self.makeRequest()
# Stop the ETW instance
capture.stop()
# check for process start
event = self.find_event('PROCESSSTART')
self.assertTrue(event)
event = self.trim_fields(event)
# This event should have 6 fields
self.assertGreaterEqual(len(event), 6)
event = self.find_event('WININET_READDATA')
self.assertTrue(event)
event = self.trim_fields(event)
# This event should have 3 fields
self.assertEqual(len(event), 3)
self.event_tufo = []
return
def test_etw_multi_providers_bitmask(self):
"""
Tests the etw capture class using multiple providers
:return: None
"""
# Instantiate an ProviderInfo object
provider = ProviderInfo('Microsoft-Windows-Kernel-Process',
GUID("{22FB2CD6-0E7B-422B-A0C7-2FAD1FD0E716}"),
any_keywords=['WINEVENT_KEYWORD_PROCESS'],
all_keywords=['WINEVENT_KEYWORD_THREAD'])
assert(provider.any_bitmask == 0x0000000000000010)
assert(provider.all_bitmask == 0x0000000000000020)
# add provider
provider = ProviderInfo('Microsoft-Windows-WinINet',
GUID("{43D1A55C-76D6-4F7E-995C-64C711E5CAFE}"),
any_keywords=['WININET_KEYWORD_HANDLES'],
all_keywords=['WININET_KEYWORD_HTTP'])
assert(provider.any_bitmask == 0x0000000000000001)
assert(provider.all_bitmask == 0x0000000000000002)
return
def test_etw_get_keywords_bitmask(self):
"""
Tests to ensure the correct bitmask is found for the provider (Windows Kernel Trace)
:return: None
"""
assert(get_keywords_bitmask(
GUID('{9E814AAD-3204-11D2-9A82-006008A86939}'),
['process']) == 0x0000000000000001)
return
def test_etw_nt_logger(self):
"""
Tests to ensure nt kernel logger capture works properly
:return: None
"""
capture = ETW(session_name='NT Kernel Logger',
providers=[ProviderInfo('Windows Kernel Trace',
GUID("{9E814AAD-3204-11D2-9A82-006008A86939}"),
any_keywords=['process'])],
event_callback=lambda event_tufo: self.event_tufo_list.append(event_tufo))
capture.start()
# start ping.exe
args = ['ping.exe']
p = sp.Popen(args, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
time.sleep(2)
p.kill()
capture.stop()
event = self.find_event('PROCESS')
self.assertTrue(event)
event = self.trim_fields(event)
# This event should have 10 fields
self.assertGreaterEqual(len(event), 10)
self.event_tufo = []
return
def test_etw_eq(self):
"""
Test container classes comparision
:return: None
"""
params = et.ENABLE_TRACE_PARAMETERS()
params.Version = 1
other_params = et.ENABLE_TRACE_PARAMETERS()
other_params.Version = 1
provider = ProviderInfo('Microsoft-Windows-Kernel-Process',
GUID("{22FB2CD6-0E7B-422B-A0C7-2FAD1FD0E716}"),
any_keywords=['WINEVENT_KEYWORD_PROCESS'],
params=ct.pointer(params))
other_provider = ProviderInfo('Microsoft-Windows-Kernel-Process',
GUID("{22FB2CD6-0E7B-422B-A0C7-2FAD1FD0E716}"),
any_keywords=['WINEVENT_KEYWORD_PROCESS'],
params=ct.pointer(other_params))
self.assertEqual(provider, other_provider)
other_params.Version = 2
self.assertNotEqual(provider, other_provider)
event_id_list = [54]
event_filter = ep.EVENT_FILTER_EVENT_ID(common.TRUE, event_id_list).get()
event_filters = [ep.EVENT_FILTER_DESCRIPTOR(ct.addressof(event_filter.contents),
ct.sizeof(event_filter.contents) +
ct.sizeof(wt.USHORT) * len(event_id_list),
ep.EVENT_FILTER_TYPE_EVENT_ID)]
properties = ProviderParameters(0, event_filters)
other_properties = ProviderParameters(0, event_filters)
self.assertEqual(properties, other_properties)
other_properties.get().contents.Version = 1
self.assertNotEqual(properties, other_properties)
params = TraceProperties(1024, 1024, 0, 10)
other_params = TraceProperties(1024, 1024, 0, 10)
self.assertEqual(params, other_params)
other_params.get().contents.BufferSize = 1025
self.assertNotEqual(params, other_params)
return
def test_callback_flag_good(self):
"""
Test to check good flag value
:return: None
"""
self.assertNotEqual(EventConsumer('test', None, None, None, common.RETURN_RAW_DATA_ONLY), None)
self.assertNotEqual(EventConsumer('test', None, None, None, common.RETURN_RAW_DATA_ON_ERROR), None)
self.assertNotEqual(EventConsumer('test', None, None, None, common.RETURN_ONLY_RAW_DATA_ON_ERROR), None)
self.assertNotEqual(EventConsumer('test', None, None, None, common.RETURN_RAW_UNFORMATTED_DATA), None)
def test_callback_flag_bad(self):
"""
Test to check bad flag value
:return: None
"""
consumer = None
try:
consumer = EventConsumer('test', None, None, None, callback_data_flag=1234)
except:
pass
self.assertEqual(consumer, None)
def test_etw_callback_wait(self):
"""
Tests the etw capture wait time
:return: None
"""
# Instantiate an ETW object
capture = ETW(providers=[ProviderInfo('Microsoft-Windows-Kernel-Process',
GUID("{22FB2CD6-0E7B-422B-A0C7-2FAD1FD0E716}"))],
event_callback=lambda event_tufo: self.event_tufo_list.append(event_tufo),
callback_wait_time=0.0025)
capture.start()
# start ping
args = ['ping.exe']
p = sp.Popen(args, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
time.sleep(5)
p.kill()
# Stop the ETW instance
capture.stop()
# check for process start
event = self.find_event('PROCESSSTART')
self.assertTrue(event)
event = self.trim_fields(event)
# This event should have 6 fields
self.assertGreaterEqual(len(event), 6)
self.event_tufo = []
return
if __name__ == '__main__':
unittest.main()