Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alya add alerts table to sqlite db with details of slips alerts and timewindows #392

Merged
merged 5 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions slips_files/core/database/database_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from slips_files.core.database.redis_db.database import RedisDB
from slips_files.core.database.sqlite_db.database import SQLiteDB

from slips_files.common.config_parser import ConfigParser

class DBManager:
"""
Expand Down Expand Up @@ -37,11 +37,16 @@ def __new__(
# we just want to connect to redis to get the PIDs
cls.sqlite = None
if start_sqlite:
cls.sqlite = SQLiteDB(output_dir)
cls.sqlite = SQLiteDB(output_dir, output_queue)

cls.rdb = RedisDB(redis_port, output_queue, **kwargs)
return cls._instances[redis_port]

@classmethod
def read_configuration(cls):
conf = ConfigParser()
cls.width = conf.get_tw_width_as_float()

def get_sqlite_db_path(self) -> str:
return self.sqlite.get_db_path()

Expand Down Expand Up @@ -857,6 +862,12 @@ def get_commit(self, *args, **kwargs):
def get_branch(self, *args, **kwargs):
return self.rdb.get_branch(*args, **kwargs)

def add_alert(self, alert: dict):
twid_starttime: float = self.rdb.getTimeTW(alert['profileid'], alert['twid'])
twid_endtime: float = twid_starttime + RedisDB.width
alert.update({'tw_start': twid_starttime, 'tw_end': twid_endtime})
return self.sqlite.add_alert(alert)

def close(self, *args, **kwargs):
self.rdb.r.close()
self.rdb.rcache.close()
Expand Down
57 changes: 49 additions & 8 deletions slips_files/core/database/sqlite_db/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@

class SQLiteDB():
"""Stores all the flows slips reads and handles labeling them"""
name = "SQLiteDB"
_obj = None
# used to lock each call to commit()
cursor_lock = Lock()
trial = 0

def __new__(cls, output_dir):
def __new__(cls, output_dir, output_queue):
# To treat the db as a singelton
if cls._obj is None or not isinstance(cls._obj, cls):
cls.output_queue = output_queue
cls._obj = super(SQLiteDB, cls).__new__(SQLiteDB)
cls._flows_db = os.path.join(output_dir, 'flows.sqlite')
cls._init_db()
Expand All @@ -29,7 +32,8 @@ def init_tables(cls):
"""creates the tables we're gonna use"""
table_schema = {
'flows': "uid TEXT PRIMARY KEY, flow TEXT, label TEXT, profileid TEXT, twid TEXT, aid TEXT",
'altflows': "uid TEXT PRIMARY KEY, flow TEXT, label TEXT, profileid TEXT, twid TEXT, flow_type TEXT"
'altflows': "uid TEXT PRIMARY KEY, flow TEXT, label TEXT, profileid TEXT, twid TEXT, flow_type TEXT",
'alerts': 'alert_id TEXT PRIMARY KEY, alert_time TEXT, ip_alerted TEXT, timewindow TEXT, tw_start TEXT, tw_end TEXT, label TEXT'
}
for table_name, schema in table_schema.items():
cls.create_table(table_name, schema)
Expand All @@ -47,6 +51,27 @@ def create_table(cls, table_name, schema):
cls.cursor.execute(query)
cls.conn.commit()

def print(self, text, verbose=1, debug=0):
"""
Function to use to print text using the outputqueue of slips.
Slips then decides how, when and where to print this text by taking all the processes into account
:param verbose:
0 - don't print
1 - basic operation/proof of work
2 - log I/O operations and filenames
3 - log database/profile/timewindow changes
:param debug:
0 - don't print
1 - print exceptions
2 - unsupported and unhandled types (cases that may cause errors)
3 - red warnings that needs examination - developer warnings
:param text: text to print. Can include format like 'Test {}'.format('here')
"""
levels = f'{verbose}{debug}'
try:
self.output_queue.put(f'{levels}|{self.name}|{text}')
except AttributeError:
pass

def get_db_path(self) -> str:
"""
Expand Down Expand Up @@ -246,6 +271,24 @@ def add_altflow(
parameters,
)

def add_alert(self, alert: dict):
"""
adds an alert to the alerts table
alert param should contain alert_id, alert_ts, ip_alerted, twid, tw_start, tw_end, label
"""
# 'alerts': 'alert_id TEXT PRIMARY KEY, alert_time TEXT, ip_alerted TEXT, timewindow TEXT, tw_start TEXT, tw_end TEXT, label TEXT'
self.execute(
'INSERT OR REPLACE INTO alerts (alert_id, ip_alerted, timewindow, tw_start, tw_end, label, alert_time) '
'VALUES (?, ?, ?, ?, ?, ?, ?);',
(alert['alert_ID'],
alert['profileid'].split()[-1],
alert['twid'],
alert['tw_start'],
alert['tw_end'],
alert['label'],
alert['time_detected'])
)



def insert(self, table_name, values):
Expand Down Expand Up @@ -314,7 +357,6 @@ def execute(self, query, params=None):
since sqlite is terrible with multi-process applications
this should be used instead of all calls to commit() and execute()
"""

try:
self.cursor_lock.acquire(True)
#start a transaction
Expand All @@ -324,31 +366,30 @@ def execute(self, query, params=None):
self.cursor.execute(query)
else:
self.cursor.execute(query, params)

self.conn.commit()

self.cursor_lock.release()
# counter for the number of times we tried executing a tx and failed
self.trial = 0

except sqlite3.Error as e:
self.cursor_lock.release()
if self.trial >= 2:
# tried 2 times to exec a query and it's still failing
self.trial = 0
# discard query
self.conn.rollback()
print(f"Error executing query: {query} - {e}. Query discarded")
self.print(f"Error executing query: {query} - {e}. Query discarded", 0, 1)

elif "database is locked" in str(e):
# keep track of failed trials
self.trial += 1
self.cursor_lock.release()

# Retry after a short delay
sleep(0.1)
self.execute(query, params=params)
else:
self.conn.rollback()
# An error occurred during execution
self.conn.rollback()
# print(f"Re-trying to execute query ({query}). reason: {e}")
# keep track of failed trials
self.trial += 1
Expand Down
43 changes: 29 additions & 14 deletions slips_files/core/evidenceProcess.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,33 @@ def label_flows_causing_alert(self):
uids: list = self.db.get_flows_causing_evidence(evidence_id)
self.db.set_flow_label(uids, 'malicious')

def handle_new_alert(self, alert_ID: str, tw_evidence: dict):
"""
saves alert details in the db and informs exporting modules about it
"""
profile, srcip, twid, _ = alert_ID.split('_')
profileid = f'{profile}_{srcip}'
self.db.set_evidence_causing_alert(
profileid,
twid,
alert_ID,
self.IDs_causing_an_alert
)
alert_details = {
'alert_ID': alert_ID,
'profileid': profileid,
'twid': twid,
}
self.db.publish('new_alert', json.dumps(alert_details))
#store the alerts in the alerts table
alert_details.update(
{'time_detected': utils.convert_format(datetime.now(), 'unixtimestamp'),
'label': 'malicious'})
self.db.add_alert(alert_details)
self.label_flows_causing_alert()
self.send_to_exporting_module(tw_evidence)


def main(self):
while not self.should_stop():
if msg := self.get_msg('evidence_added'):
Expand Down Expand Up @@ -646,20 +673,8 @@ def main(self):
# store the alert in our database
# the alert ID is profileid_twid + the ID of the last evidence causing this alert
alert_ID = f'{profileid}_{twid}_{ID}'
self.db.set_evidence_causing_alert(
profileid,
twid,
alert_ID,
self.IDs_causing_an_alert
)
to_send = {
'alert_ID': alert_ID,
'profileid': profileid,
'twid': twid,
}
self.db.publish('new_alert', json.dumps(to_send))
self.label_flows_causing_alert()
self.send_to_exporting_module(tw_evidence)

self.handle_new_alert(alert_ID, tw_evidence)

# print the alert
alert_to_print = (
Expand Down