Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split DKEvent from other and use CCL_SEGB package #841

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ bencoding
biplist
blackboxprotobuf
bs4
ccl-segb @ git+https://github.com/cclgroupltd/ccl-segb.git@main
mmh3
nska-deserialize>=1.3.1
nska_deserialize
numpy
packaging==20.1
packaging==24.1
pandas
pathlib2==2.3.5
PGPy
Expand Down
81 changes: 81 additions & 0 deletions scripts/artifacts/biomeDKInfocus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import os
import blackboxprotobuf
from scripts.artifact_report import ArtifactHtmlReport
from scripts.ilapfuncs import logfunc, tsv, timeline, is_platform_windows, open_sqlite_db_readonly, convert_utc_human_to_timezone, timestampsconv
from ccl_segb import ccl_segb
from ccl_segb.ccl_segb_common import EntryState

def get_biomeDKInfocus(files_found, report_folder, seeker, wrap_text, timezone_offset):

typess = {'1': {'type': 'message', 'message_typedef': {'1': {'type': 'str', 'name': ''}, '2': {'type': 'message', 'message_typedef': {'1': {'type': 'int', 'name': ''}, '2': {'type': 'int', 'name': ''}}, 'name': ''}}, 'name': ''},
'2': {'type': 'double', 'name': ''}, '3': {'type': 'double', 'name': ''}, '4': {'type': 'message', 'message_typedef': {'1': {'type': 'message', 'message_typedef': {'1': {'type': 'int', 'name': ''}, '2': {'type': 'int', 'name': ''}}, 'name': ''}, '3': {'type': 'str', 'name': ''}}, 'name': ''}, '5': {'type': 'str', 'name': ''}, '7': {'type': 'message', 'message_typedef': {'1': {'type': 'message', 'message_typedef': {}, 'name': ''}, '2': {'type': 'message', 'message_typedef': {'1': {'type': 'message', 'message_typedef': {'1': {'type': 'int', 'name': ''}, '2': {'type': 'int', 'name': ''}}, 'name': ''}, '3': {'type': 'str', 'name': ''}}, 'name': ''}, '3': {'type': 'int', 'name': ''}}, 'name': ''}, '8': {'type': 'double', 'name': ''}, '10': {'type': 'int', 'name': ''}}

data_list = []

for file_found in files_found:
file_found = str(file_found)
filename = os.path.basename(file_found)
if filename.startswith('.'):
continue
if os.path.isfile(file_found):
if 'tombstone' in file_found:
continue
else:
report_file = os.path.dirname(file_found)
else:
continue

for record in ccl_segb.read_segb_file(file_found):
if record.state == EntryState.Written:
protostuff, types = blackboxprotobuf.decode_message(record.data, typess)
#print(protostuff)

activity = (protostuff['1']['1'])
timestart = (timestampsconv(protostuff['2']))
timestart = convert_utc_human_to_timezone(timestart, timezone_offset)

timeend = (timestampsconv(protostuff['3']))
timeend = convert_utc_human_to_timezone(timeend, timezone_offset)

timewrite = (timestampsconv(protostuff['8']))
timewrite = convert_utc_human_to_timezone(timewrite, timezone_offset)

actionguid = (protostuff['5'])
bundleid = (protostuff['4']['3'])
if protostuff.get('7', '') != '':
if isinstance(protostuff['7'], list):
transition = (protostuff['7'][0]['2']['3'])
else:
transition = (protostuff['7']['2']['3'])
else:
transition = ''


data_list.append((timestart, timeend, timewrite, activity, bundleid, transition, actionguid, filename))

if len(data_list) > 0:

description = ''
report = ArtifactHtmlReport(f'Biome DKEvent AppInFocus')
report.start_artifact_report(report_folder, f'Biome DKEvent AppInFocus', description)
report.add_script()
data_headers = ('Time Start','Time End','Time Write','Activity','Bundle ID','Transition','Action GUID', 'Filename')
report.write_artifact_data_table(data_headers, data_list, report_file)
report.end_artifact_report()

tsvname = f'Biome DKEvent AppInFocus'
tsv(report_folder, data_headers, data_list, tsvname)

tlactivity = f'Biome DKEvent AppInFocus'
timeline(report_folder, tlactivity, data_list, data_headers)

else:
logfunc(f'No data available for Biome AppInFocus')


__artifacts__ = {
"biomeDKInFocus": (
"Biome in Focus",
('*/biome/streams/restricted/_DKEvent.App.InFocus/local/*'),
get_biomeDKInfocus)
}
196 changes: 43 additions & 153 deletions scripts/artifacts/biomeInfocus.py
Original file line number Diff line number Diff line change
@@ -1,75 +1,16 @@
import os
import struct
import blackboxprotobuf
from datetime import datetime, timezone
from time import mktime
from io import StringIO
from io import BytesIO
from scripts.artifact_report import ArtifactHtmlReport
from scripts.ilapfuncs import logfunc, tsv, timeline, is_platform_windows, open_sqlite_db_readonly, convert_utc_human_to_timezone, timestampsconv

def utf8_in_extended_ascii(input_string, *, raise_on_unexpected=False):
"""Returns a tuple of bool (whether mis-encoded utf-8 is present) and str (the converted string)"""
output = [] # individual characters, join at the end
is_in_multibyte = False # True if we're currently inside a utf-8 multibyte character
multibytes_expected = 0
multibyte_buffer = []
mis_encoded_utf8_present = False

def handle_bad_data(index, character):
if not raise_on_unexpected: # not raising, so we dump the buffer into output and append this character
output.extend(multibyte_buffer)
multibyte_buffer.clear()
output.append(character)
nonlocal is_in_multibyte
is_in_multibyte = False
nonlocal multibytes_expected
multibytes_expected = 0
else:
raise ValueError(f"Expected multibyte continuation at index: {index}")

for idx, c in enumerate(input_string):
code_point = ord(c)
if code_point <= 0x7f or code_point > 0xf4: # ASCII Range data or higher than you get for mis-encoded utf-8:
if not is_in_multibyte:
output.append(c) # not in a multibyte, valid ascii-range data, so we append
else:
handle_bad_data(idx, c)
else: # potentially utf-8
if (code_point & 0xc0) == 0x80: # continuation byte
if is_in_multibyte:
multibyte_buffer.append(c)
else:
handle_bad_data(idx, c)
else: # start-byte
if not is_in_multibyte:
assert multibytes_expected == 0
assert len(multibyte_buffer) == 0
while (code_point & 0x80) != 0:
multibytes_expected += 1
code_point <<= 1
multibyte_buffer.append(c)
is_in_multibyte = True
else:
handle_bad_data(idx, c)

if is_in_multibyte and len(multibyte_buffer) == multibytes_expected: # output utf-8 character if complete
utf_8_character = bytes(ord(x) for x in multibyte_buffer).decode("utf-8")
output.append(utf_8_character)
multibyte_buffer.clear()
is_in_multibyte = False
multibytes_expected = 0
mis_encoded_utf8_present = True

if multibyte_buffer: # if we have left-over data
handle_bad_data(len(input_string), "")

return mis_encoded_utf8_present, "".join(output)
from ccl_segb import ccl_segb
from ccl_segb.ccl_segb_common import EntryState

def get_biomeInfocus(files_found, report_folder, seeker, wrap_text, timezone_offset):

typess = {'1': {'type': 'message', 'message_typedef': {'1': {'type': 'str', 'name': ''}, '2': {'type': 'message', 'message_typedef': {'1': {'type': 'int', 'name': ''}, '2': {'type': 'int', 'name': ''}}, 'name': ''}}, 'name': ''},
'2': {'type': 'double', 'name': ''}, '3': {'type': 'double', 'name': ''}, '4': {'type': 'message', 'message_typedef': {'1': {'type': 'message', 'message_typedef': {'1': {'type': 'int', 'name': ''}, '2': {'type': 'int', 'name': ''}}, 'name': ''}, '3': {'type': 'str', 'name': ''}}, 'name': ''}, '5': {'type': 'str', 'name': ''}, '7': {'type': 'message', 'message_typedef': {'1': {'type': 'message', 'message_typedef': {}, 'name': ''}, '2': {'type': 'message', 'message_typedef': {'1': {'type': 'message', 'message_typedef': {'1': {'type': 'int', 'name': ''}, '2': {'type': 'int', 'name': ''}}, 'name': ''}, '3': {'type': 'str', 'name': ''}}, 'name': ''}, '3': {'type': 'int', 'name': ''}}, 'name': ''}, '8': {'type': 'double', 'name': ''}, '10': {'type': 'int', 'name': ''}}

typess = {'10': {'name': '', 'type': 'str'}, '2': {'name': '', 'type': 'int'}, '3': {'name': '', 'type': 'int'},
'4': {'name': '', 'type': 'double'}, '6': {'name': '', 'type': 'str'}, '9': {'name': '', 'type': 'str'}}

data_list = []

for file_found in files_found:
file_found = str(file_found)
Expand All @@ -80,99 +21,48 @@ def get_biomeInfocus(files_found, report_folder, seeker, wrap_text, timezone_off
if 'tombstone' in file_found:
continue
else:
pass
report_file = os.path.dirname(file_found)
else:
continue

with open(file_found, 'rb') as file:
data = file.read()

data_list = []
headerloc = data.index(b'SEGB')
#print(headerloc)

b = data
ab = BytesIO(b)
ab.seek(headerloc)
ab.read(4) #Main header
#print('---- Start of Notifications ----')

while True:
#print('----')
sizeofnotificatoninhex = (ab.read(4))
try:
sizeofnotificaton = (struct.unpack_from("<i",sizeofnotificatoninhex)[0])
except:
break
if sizeofnotificaton == 0:
break

ignore1 = ab.read(28)

protostuff = ab.read(sizeofnotificaton)
checkforempty = BytesIO(protostuff)
check = checkforempty.read(1)
if check == b'\x00':
pass
else:
protostuff, types = blackboxprotobuf.decode_message(protostuff,typess)
#print(protostuff)

activity = (protostuff['1']['1'])
timestart = (timestampsconv(protostuff['2']))



for record in ccl_segb.read_segb_file(file_found):
if record.state == EntryState.Written:
protostuff, types = blackboxprotobuf.decode_message(record.data, typess)

bundleid = (protostuff['6'])
timestart = (timestampsconv(protostuff['4']))
timestart = convert_utc_human_to_timezone(timestart, timezone_offset)

timeend = (timestampsconv(protostuff['3']))
timeend = convert_utc_human_to_timezone(timeend, timezone_offset)

timewrite = (timestampsconv(protostuff['8']))
timewrite = convert_utc_human_to_timezone(timewrite, timezone_offset)

actionguid = (protostuff['5'])
bundleid = (protostuff['4']['3'])
if protostuff.get('7', '') != '':
if isinstance(protostuff['7'], list):
transition = (protostuff['7'][0]['2']['3'])
else:
transition = (protostuff['7']['2']['3'])
else:
transition = ''


data_list.append((timestart, timeend, timewrite, activity, bundleid, transition, actionguid))

modresult = (sizeofnotificaton % 8)
resultante = 8 - modresult

if modresult == 0:
pass
else:
ab.read(resultante)
#print("--------")

if len(data_list) > 0:

description = ''
report = ArtifactHtmlReport(f'Biome AppInFocus')
report.start_artifact_report(report_folder, f'Biome AppInFocus - {filename}', description)
report.add_script()
data_headers = ('Time Start','Time End','Time Write','Activity','Bundle ID','Transition','Action GUID')
report.write_artifact_data_table(data_headers, data_list, file_found)
report.end_artifact_report()

tsvname = f'Biome AppInFocus- {filename}'
tsv(report_folder, data_headers, data_list, tsvname) # TODO: _csv.Error: need to escape, but no escapechar set

tlactivity = f'Biome AppInFocus - {filename}'
timeline(report_folder, tlactivity, data_list, data_headers)

else:
logfunc(f'No data available for Biome AppInFocus')


foreground = ('Foreground' if protostuff['3'] == 1 else 'Background')

data_list.append((timestart, bundleid, foreground, filename))


if len(data_list) > 0:

description = ''
report = ArtifactHtmlReport(f'Biome AppInFocus')
report.start_artifact_report(report_folder, f'Biome AppInFocus', description)
report.add_script()
data_headers = ('Time','Bundle ID','Action', 'Filename')
report.write_artifact_data_table(data_headers, data_list, report_file)
report.end_artifact_report()

tsvname = f'Biome AppInFocus'
tsv(report_folder, data_headers, data_list, tsvname)

tlactivity = f'Biome AppInFocus'
timeline(report_folder, tlactivity, data_list, data_headers)

else:
logfunc(f'No data available for Biome AppInFocus')


__artifacts__ = {
"biomeInFocus": (
"Biome in Focus",
('*/biome/streams/restricted/_DKEvent.App.InFocus/local/*','*/biome/streams/restricted/App.InFocus/local/*'),
('*/biome/streams/restricted/App.InFocus/local/*'),
get_biomeInfocus)
}