From 3ac9b8d400a650d33cdf951f22d611ffd05dd982 Mon Sep 17 00:00:00 2001 From: alya Date: Thu, 18 Jan 2024 14:48:15 +0200 Subject: [PATCH 1/6] db: refactor get_timewindow() --- managers/process_manager.py | 6 +- slips_files/core/database/database_manager.py | 4 +- .../core/database/redis_db/profile_handler.py | 164 ++++++++++-------- tests/test_database.py | 6 +- 4 files changed, 102 insertions(+), 78 deletions(-) diff --git a/managers/process_manager.py b/managers/process_manager.py index a0e633e92..65d3f2470 100644 --- a/managers/process_manager.py +++ b/managers/process_manager.py @@ -562,9 +562,11 @@ def shutdown_gracefully(self): else: flows_count: int = self.main.db.get_flows_count() - self.main.print(f"Total flows read (without altflows): {flows_count}", log_to_logfiles_only=True) + self.main.print(f"Total flows read (without altflows): " + f"{flows_count}", log_to_logfiles_only=True) - hitlist: Tuple[List[Process], List[Process]] = self.get_hitlist_in_order() + hitlist: Tuple[List[Process], List[Process]] + hitlist = self.get_hitlist_in_order() to_kill_first: List[Process] = hitlist[0] to_kill_last: List[Process] = hitlist[1] self.termination_event.set() diff --git a/slips_files/core/database/database_manager.py b/slips_files/core/database/database_manager.py index 0eb2133fb..8a2d99b6e 100644 --- a/slips_files/core/database/database_manager.py +++ b/slips_files/core/database/database_manager.py @@ -661,8 +661,8 @@ def getTWofTime(self, *args, **kwargs): def addNewOlderTW(self, *args, **kwargs): return self.rdb.addNewOlderTW(*args, **kwargs) - def addNewTW(self, *args, **kwargs): - return self.rdb.addNewTW(*args, **kwargs) + def add_new_tw(self, *args, **kwargs): + return self.rdb.add_new_tw(*args, **kwargs) def getTimeTW(self, *args, **kwargs): return self.rdb.getTimeTW(*args, **kwargs) diff --git a/slips_files/core/database/redis_db/profile_handler.py b/slips_files/core/database/redis_db/profile_handler.py index 44d239950..3f747bf2a 100644 --- a/slips_files/core/database/redis_db/profile_handler.py +++ b/slips_files/core/database/redis_db/profile_handler.py @@ -2,7 +2,7 @@ import redis import time import json -from typing import Tuple, Union, Dict +from typing import Tuple, Union, Dict, Optional import traceback import ipaddress import sys @@ -78,25 +78,33 @@ def set_dhcp_flow(self, profileid, twid, requested_addr, uid): def get_timewindow(self, flowtime, profileid): """ - This function should get the id of the TW in the database where the flow belong. - If the TW is not there, we create as many tw as necessary in the future or past until we get the correct TW for this flow. - - We use this function to avoid retrieving all the data from the DB for the complete profile. We use a separate table for the TW per profile. + This function returns the TW in the database where the flow belongs. + If the TW is not there, we create as many tw as necessary in the future + or past until we get the correct TW for this flow. + - We use this function to avoid retrieving all the data from the DB + for the complete profile. + We use a separate table for the TW per profile. -- Returns the time window id THIS IS NOT WORKING: - - The empty profiles in the middle are not being created!!! - - The Dtp ips are stored in the first time win + - The empty tws in the middle are not being created!!! + - The Dtp ips are stored in the first tw """ try: - # First check if we are not in the last TW. Since this will be the majority of cases - try: - [(lasttwid, lasttw_start_time)] = self.get_last_twid_of_profile(profileid) + # First check if we are not in the last TW. Since this will be + # the majority of cases + last_twid: Optional[Tuple[str, float]] + last_twid = self.get_last_twid_of_profile(profileid) + if last_twid: + lasttwid: str + lasttw_start_time: float + lasttwid, lasttw_start_time = last_twid lasttw_start_time = float(lasttw_start_time) lasttw_end_time = lasttw_start_time + self.width flowtime = float(flowtime) self.print( - f'The last TW id for profile {profileid} was {lasttwid}. Start:{lasttw_start_time}. End: {lasttw_end_time}', - 3, - 0, + f'The last TW id for profile {profileid} was {lasttwid}. ' + f'Start:{lasttw_start_time}. End: {lasttw_end_time}', + 3, 0, ) # There was a last TW, so check if the current flow belongs here. if ( @@ -104,58 +112,63 @@ def get_timewindow(self, flowtime, profileid): and lasttw_start_time <= flowtime ): self.print( - f'The flow ({flowtime}) is on the last time window ({lasttw_end_time})', - 3, - 0, + f'The flow ({flowtime}) is on the last time window' + f' ({lasttw_end_time})', + 3, 0, ) twid = lasttwid elif lasttw_end_time <= flowtime: # The flow was not in the last TW, its NEWER than it self.print( - f'The flow ({flowtime}) is NOT on the last time window ({lasttw_end_time}). Its newer', - 3, - 0, + f'The flow ({flowtime}) is NOT on the last time ' + f'window ({lasttw_end_time}). Its newer', + 3, 0, ) amount_of_new_tw = int( (flowtime - lasttw_end_time) / self.width ) self.print( - f'We have to create {amount_of_new_tw} empty TWs in the middle.', - 3, - 0, + f'We have to create {amount_of_new_tw}' + f' empty TWs in the middle.', 3, 0, ) temp_end = lasttw_end_time for _ in range(amount_of_new_tw + 1): new_start = temp_end - twid = self.addNewTW(profileid, new_start) - self.print(f'Creating the TW id {twid}. Start: {new_start}.', 3, 0) + twid = self.add_new_tw(profileid, new_start) + self.print(f'Creating the TW id {twid}. ' + f'Start: {new_start}.', 3, 0) temp_end = new_start + self.width else: # The flow was not in the last TW, its OLDER that it self.print( - f'The flow ({flowtime}) is NOT on the last time window ({lasttw_end_time}). Its older', - 3, - 0, + f'The flow ({flowtime}) is NOT on the last time ' + f'window ({lasttw_end_time}). Its older', + 3, 0, ) if data := self.getTWofTime(profileid, flowtime): # We found a TW where this flow belongs to (twid, tw_start_time) = data return twid else: - # There was no TW that included the time of this flow, so create them in the past + # There was no TW that included the time of this + # flow, so create them in the past # How many new TW we need in the past? - # amount_of_new_tw is the total amount of tw we should have under the new situation + # amount_of_new_tw is the total amount of tw we + # should have under the new situation amount_of_new_tw = int( (lasttw_end_time - flowtime) / self.width ) - # amount_of_current_tw is the real amount of tw we have now + # amount_of_current_tw is the real amount of tw we + # have now amount_of_current_tw = ( self.get_number_of_tws_in_profile(profileid) ) - # diff is the new ones we should add in the past. (Yes, we could have computed this differently) + # diff is the new ones we should add in the past. + # (Yes, we could have computed this differently) diff = amount_of_new_tw - amount_of_current_tw - self.print(f'We need to create {diff + 1} TW before the first', 3, 0) + self.print(f'We need to create {diff + 1} ' + f'TW before the first', 3, 0) # Get the first TW [ (firsttwid, firsttw_start_time) @@ -167,15 +180,19 @@ def get_timewindow(self, flowtime, profileid): new_start = temp_start # The method to add an older TW is the same as # to add a new one, just the starttime changes - twid = self.addNewOlderTW( + twid: str = self.addNewOlderTW( profileid, new_start ) - self.print(f'Creating the new older TW id {twid}. Start: {new_start}.', 3, 0) + self.print(f'Creating the new older TW id {twid}.' + f' Start: {new_start}.', 3, 0) temp_start = new_start - self.width - except ValueError: + else: # There is no last tw. So create the first TW - # If the option for only-one-tw was selected, we should create the TW at least 100 years before the flowtime, to cover for - # 'flows in the past'. Which means we should cover for any flow that is coming later with time before the first flow + # If the option for only-one-tw was selected, we should + # create the TW at least 100 years before the flowtime, + # to cover for 'flows in the past'. Which means we should + # cover for any flow that is coming later with time before the + # first flow if self.width == 9999999999: # Seconds in 1 year = 31536000 startoftw = float(flowtime - (31536000 * 100)) @@ -183,11 +200,12 @@ def get_timewindow(self, flowtime, profileid): startoftw = flowtime # Add this TW, of this profile, to the DB - twid = self.addNewTW(profileid, startoftw) - # self.print("First TW ({}) created for profile {}.".format(twid, profileid), 0, 1) + twid: str = self.add_new_tw(profileid, startoftw) + return twid except Exception as e: self.print('Error in get_timewindow().', 0, 1) + self.print(traceback.print_exc(), 0, 1) self.print(e, 0, 1) def add_out_http( @@ -1248,65 +1266,69 @@ def addNewOlderTW(self, profileid, startoftw): Return the id of the timewindow just created """ # Get the first twid and obtain the new tw id - try: - (firstid, firstid_time) = self.getFirstTWforProfile(profileid)[ - 0 - ] - # We have a first id - # Decrement it!! - twid = 'timewindow' + str( - int(firstid.split('timewindow')[1]) - 1 - ) - except IndexError: - # Very weird error, since the first TW MUST exist. What are we doing here? - pass + (firstid, firstid_time) = self.getFirstTWforProfile(profileid)[ + 0 + ] + # We have a first id + # Decrement it!! + twid = 'timewindow' + str( + int(firstid.split('timewindow')[1]) - 1 + ) # Add the new TW to the index of TW - data = {str(twid): float(startoftw)} - self.r.zadd(f'tws{profileid}', data) + timewindows: Dict[str, float] = {twid: float(startoftw)} + self.r.zadd(f'tws{profileid}', timewindows) self.print(f'Created and added to DB the new older ' f'TW with id {twid}. Time: {startoftw} ' ,0,4) - # The creation of a TW now does not imply that it was modified. You need to put data to mark is at modified + # The creation of a TW now does not imply that it was modified. + # You need to put data to mark is at modified return twid except redis.exceptions.ResponseError as e: self.print('error in addNewOlderTW in database.py', 0, 1) self.print(type(e), 0, 1) self.print(e, 0, 1) - def addNewTW(self, profileid, startoftw): + def add_new_tw(self, profileid, startoftw) -> str: try: """ - Creates or adds a new timewindow to the list of tw for the given profile + Creates or adds a new timewindow to the list of tw for the + given profile Add the twid to the ordered set of a given profile - Return the id of the timewindow just created - We should not mark the TW as modified here, since there is still no data on it, and it may remain without data. + Returns the id of the timewindow just created """ # Get the last twid and obtain the new tw id - try: - (lastid, lastid_time) = self.get_last_twid_of_profile(profileid)[0] - # We have a last id - # Increment it - twid = 'timewindow' + str( - int(lastid.split('timewindow')[1]) + 1 - ) - except IndexError: + last_twid: Optional[Tuple[str, float]] + last_twid = self.get_last_twid_of_profile(profileid) + if last_twid: + last_tw: str + last_tw_starttime: float + last_tw, last_tw_starttime = last_twid + last_tw_number = int(last_tw.split("timewindow")[1]) + # Increment the last timewindow + twid = f'timewindow{last_tw_number + 1}' + else: # There is no first TW, create it twid = 'timewindow1' + # Add the new TW to the index of TW - data = {twid: float(startoftw)} - self.r.zadd(f'tws{profileid}', data) - self.print(f'Created and added to DB for profile ' - f'{profileid} on TW with id {twid}. Time: {startoftw} ', 0, 4) + self.r.zadd(f'tws{profileid}', {twid: float(startoftw)}) + self.print(f'Created and added to DB for ' + f'{profileid}: a new tw: {twid}. ' + f' with starttime : {startoftw} ', + 0, 4) - # The creation of a TW now does not imply that it was modified. You need to put data to mark is at modified + # The creation of a TW now does not imply that it was modified. + # You need to put data to mark is at modified. # When a new TW is created for this profile, - # change the threat level of the profile to 0(info) and confidence to 0.05 + # change the threat level of the profile to 0(info) + # and confidence to 0.05 self.update_threat_level(profileid, 'info', 0.5) return twid except redis.exceptions.ResponseError as e: self.print('Error in addNewTW', 0, 1) + self.print(traceback.print_exc(), 0, 1) self.print(e, 0, 1) def getTimeTW(self, profileid, twid): diff --git a/tests/test_database.py b/tests/test_database.py index 908a783c1..2e8fe3b99 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -55,9 +55,9 @@ def test_timewindows(): # add a profile db.addProfile(profileid, '00:00', '1') # add a tw to that profile (first tw) - db.addNewTW(profileid, 0.0) + db.add_new_tw(profileid, 0.0) # add a new tw (last tw) - db.addNewTW(profileid, 5.0) + db.add_new_tw(profileid, 5.0) assert db.getFirstTWforProfile(profileid) == [('timewindow1', 0.0)] assert db.get_last_twid_of_profile(profileid) == [('timewindow2', 5.0)] @@ -71,7 +71,7 @@ def test_add_ips(): # add a profile db.addProfile(profileid, '00:00', '1') # add a tw to that profile - db.addNewTW(profileid, 0.0) + db.add_new_tw(profileid, 0.0) columns = { 'dport': 80, 'sport': 80, From 8fbea449c2e0dc1890bc15cd8113cf301972db9b Mon Sep 17 00:00:00 2001 From: alya Date: Thu, 18 Jan 2024 14:48:30 +0200 Subject: [PATCH 2/6] db: refactor get_last_twid_of_profile() --- .../core/database/redis_db/profile_handler.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/slips_files/core/database/redis_db/profile_handler.py b/slips_files/core/database/redis_db/profile_handler.py index 3f747bf2a..c2c6209f9 100644 --- a/slips_files/core/database/redis_db/profile_handler.py +++ b/slips_files/core/database/redis_db/profile_handler.py @@ -1212,13 +1212,17 @@ def get_profiles_len(self) -> int: profiles_n = self.r.scard('profiles') return 0 if not profiles_n else int(profiles_n) - def get_last_twid_of_profile(self, profileid): - """Return the last TW id and the starttime of the given profile id""" - return ( - self.r.zrange(f'tws{profileid}', -1, -1, withscores=True) - if profileid - else False - ) + def get_last_twid_of_profile(self, profileid: str) -> Tuple[str, float]: + """ + Returns the last TW id and the starttime of the given profile id + """ + if profileid: + res = self.r.zrange( + f'tws{profileid}', -1, -1, withscores=True) + if res: + twid, starttime = res[0] + return twid, starttime + def getFirstTWforProfile(self, profileid): """Return the first TW id and the time for the given profile id""" From 61c1fdfdd3b2ec5689864d9a9b48c304e3c79c3b Mon Sep 17 00:00:00 2001 From: alya Date: Thu, 18 Jan 2024 15:54:14 +0200 Subject: [PATCH 3/6] update get_last_twid_of_profile() unit test --- slips_files/core/database/redis_db/profile_handler.py | 6 ++++-- tests/test_database.py | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/slips_files/core/database/redis_db/profile_handler.py b/slips_files/core/database/redis_db/profile_handler.py index c2c6209f9..08b829d2a 100644 --- a/slips_files/core/database/redis_db/profile_handler.py +++ b/slips_files/core/database/redis_db/profile_handler.py @@ -253,8 +253,10 @@ def add_out_http( http_flow.pop('flow', None) http_flow['uid'] = flow.uid - # Check if the host domain AND the url is detected by the threat intelligence. - # not all flows have a host value so don't send empty hosts to ti module. + # Check if the host domain AND the url is detected by the threat + # intelligence. + # not all flows have a host value so don't send empty hosts to ti + # module. if len(flow.host) > 2: self.give_threat_intelligence(profileid, twid, diff --git a/tests/test_database.py b/tests/test_database.py index 2e8fe3b99..93f83348f 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -59,7 +59,7 @@ def test_timewindows(): # add a new tw (last tw) db.add_new_tw(profileid, 5.0) assert db.getFirstTWforProfile(profileid) == [('timewindow1', 0.0)] - assert db.get_last_twid_of_profile(profileid) == [('timewindow2', 5.0)] + assert db.get_last_twid_of_profile(profileid) == ('timewindow2', 5.0) def getSlipsInternalTime(): From c5edca9efcc0a345365bee5cdc544d47e8d35ba0 Mon Sep 17 00:00:00 2001 From: alya Date: Mon, 22 Jan 2024 15:46:06 +0200 Subject: [PATCH 4/6] use lowercase in method names in the db --- modules/leak_detector/leak_detector.py | 2 +- modules/p2ptrust/utils/go_director.py | 2 +- slips_files/core/database/database_manager.py | 12 ++++++------ tests/test_database.py | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/modules/leak_detector/leak_detector.py b/modules/leak_detector/leak_detector.py index 53a23b50f..7d970892b 100644 --- a/modules/leak_detector/leak_detector.py +++ b/modules/leak_detector/leak_detector.py @@ -198,7 +198,7 @@ def set_evidence_yara_match(self, info: dict): return # in which tw is this ts? - twid = self.db.getTWofTime(profileid, ts) + twid = self.db.get_tw_of_ts(profileid, ts) # convert ts to a readable format ts = utils.convert_format(ts, utils.alerts_format) if twid: diff --git a/modules/p2ptrust/utils/go_director.py b/modules/p2ptrust/utils/go_director.py index fcab41b67..f34ef57b7 100644 --- a/modules/p2ptrust/utils/go_director.py +++ b/modules/p2ptrust/utils/go_director.py @@ -479,7 +479,7 @@ def set_evidence_p2p_report(self, ip, reporter, score, confidence, timestamp, pr description = f'attacking another peer: {reporter_ip} ({reporter}). threat level: {threat_level} ' \ f'confidence: {confidence} {ip_identification}' # get the tw of this report time - if twid := self.db.getTWofTime(profileid_of_attacker, timestamp): + if twid := self.db.get_tw_of_ts(profileid_of_attacker, timestamp): twid = twid[0] else: # create a new twid for the attacker profile that has the diff --git a/slips_files/core/database/database_manager.py b/slips_files/core/database/database_manager.py index 8a2d99b6e..113105931 100644 --- a/slips_files/core/database/database_manager.py +++ b/slips_files/core/database/database_manager.py @@ -652,14 +652,14 @@ def get_profiles_len(self, *args, **kwargs): def get_last_twid_of_profile(self, *args, **kwargs): return self.rdb.get_last_twid_of_profile(*args, **kwargs) - def getFirstTWforProfile(self, *args, **kwargs): - return self.rdb.getFirstTWforProfile(*args, **kwargs) + def get_first_twid_for_profile(self, *args, **kwargs): + return self.rdb.get_first_twid_for_profile(*args, **kwargs) - def getTWofTime(self, *args, **kwargs): - return self.rdb.getTWofTime(*args, **kwargs) + def get_tw_of_ts(self, *args, **kwargs): + return self.rdb.get_tw_of_ts(*args, **kwargs) - def addNewOlderTW(self, *args, **kwargs): - return self.rdb.addNewOlderTW(*args, **kwargs) + def add_new_older_tw(self, *args, **kwargs): + return self.rdb.add_new_older_tw(*args, **kwargs) def add_new_tw(self, *args, **kwargs): return self.rdb.add_new_tw(*args, **kwargs) diff --git a/tests/test_database.py b/tests/test_database.py index 93f83348f..d78560e2e 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -58,7 +58,7 @@ def test_timewindows(): db.add_new_tw(profileid, 0.0) # add a new tw (last tw) db.add_new_tw(profileid, 5.0) - assert db.getFirstTWforProfile(profileid) == [('timewindow1', 0.0)] + assert db.get_first_twid_for_profile(profileid) == [('timewindow1', 0.0)] assert db.get_last_twid_of_profile(profileid) == ('timewindow2', 5.0) From e3b89b3c33b62afebe426cd34ccb3887eb6a3c20 Mon Sep 17 00:00:00 2001 From: alya Date: Tue, 23 Jan 2024 14:02:30 +0200 Subject: [PATCH 5/6] db: split get_timewindow() into smaller functions --- .../core/database/redis_db/profile_handler.py | 304 ++++++++++-------- 1 file changed, 168 insertions(+), 136 deletions(-) diff --git a/slips_files/core/database/redis_db/profile_handler.py b/slips_files/core/database/redis_db/profile_handler.py index 08b829d2a..11c208e61 100644 --- a/slips_files/core/database/redis_db/profile_handler.py +++ b/slips_files/core/database/redis_db/profile_handler.py @@ -2,9 +2,10 @@ import redis import time import json -from typing import Tuple, Union, Dict, Optional +from typing import Tuple, Union, Dict, Optional, List import traceback import ipaddress +from math import ceil import sys import validators from slips_files.common.abstracts.observer import IObservable @@ -76,6 +77,89 @@ def set_dhcp_flow(self, profileid, twid, requested_addr, uid): self.r.hset('DHCP_flows', f'{profileid}_{twid}', json.dumps(flow)) + def create_new_tws_as_needed(self, + flowtime: float, + lasttw_end_time: float, + profileid: str, + ) -> str: + """ + calculates how many new tws need to be added, adds them and returns + the id of the timewindow the given flowtime belongs to. + """ + # The flow was not in the last TW, its NEWER than it + amount_of_new_tw = int( + (flowtime - lasttw_end_time) / self.width + ) + self.print( + f'We have to create {amount_of_new_tw}' + f' empty TWs in the middle.', 3, 0, + ) + + tw_start_time = lasttw_end_time + for _ in range(amount_of_new_tw + 1): + # new_start = tw_start_time + twid: str = self.add_new_tw(profileid, tw_start_time) + self.print(f'Creating the TW id {twid}. ' + f'Start: {tw_start_time}.', 3, 0) + tw_start_time += self.width + return twid + + def create_old_tws_as_needed(self, + flowtime: float, + last_tw_end_time: float, + profileid: str) -> str: + """ + :param last_tw_end_time: this is the end time of the last + registered timewindow aka oldest seen tw in the given traffic + + """ + # There was no TW that included the time of this + # flow, so create them in the past + + # How many new TW we need in the past? + # amount_of_new_tw is the total amount of tw we + # should have under the new situation + amount_of_new_tw = ceil( + (last_tw_end_time - flowtime) / self.width + ) + # this is the real amount of tw we have now + amount_of_current_tw = ( + self.get_number_of_tws_in_profile(profileid) + ) + # diff is the new ones we should add in the past. + # (Yes, we could have computed this differently) + # this plus one is because we're using the last_tw_end_time to get + # the amount of new tws + # so we're adding the first timewindow with this +1 + diff = amount_of_new_tw - amount_of_current_tw + self.print(f'We need to create {diff} ' + f'TW before the first', 3, 0) + + # Get the first TW (it may be anegative tw like timewindow-4) + first_tw: Optional[Tuple[str, float]] + first_tw = self.get_first_twid_for_profile(profileid) + if first_tw: + first_tw_number = int(first_tw[0].replace("timewindow",'')) + first_tw_start_time: float = first_tw[1] + + # The start of the older TW should be the + # first - the width + tw_start_time = first_tw_start_time - self.width + # the number of the tw to add + new_tw_number = first_tw_number - 1 + for _ in range(diff): + twid: str = self.add_new_older_tw( + profileid, tw_start_time, new_tw_number + ) + self.print(f'Creating the new older TW id {twid}.' + f' Start: {tw_start_time}.', 3, 0) + tw_start_time -= self.width + new_tw_number -= 1 + + return twid + + + def get_timewindow(self, flowtime, profileid): """ This function returns the TW in the database where the flow belongs. @@ -96,112 +180,52 @@ def get_timewindow(self, flowtime, profileid): last_twid = self.get_last_twid_of_profile(profileid) if last_twid: lasttwid: str - lasttw_start_time: float - lasttwid, lasttw_start_time = last_twid - lasttw_start_time = float(lasttw_start_time) - lasttw_end_time = lasttw_start_time + self.width + last_tw_start_time: float + lasttwid, last_tw_start_time = last_twid + last_tw_start_time = float(last_tw_start_time) + last_tw_end_time: float = last_tw_start_time + self.width flowtime = float(flowtime) - self.print( - f'The last TW id for profile {profileid} was {lasttwid}. ' - f'Start:{lasttw_start_time}. End: {lasttw_end_time}', - 3, 0, - ) - # There was a last TW, so check if the current flow belongs here. - if ( - lasttw_end_time > flowtime - and lasttw_start_time <= flowtime - ): - self.print( - f'The flow ({flowtime}) is on the last time window' - f' ({lasttw_end_time})', - 3, 0, - ) - twid = lasttwid - elif lasttw_end_time <= flowtime: - # The flow was not in the last TW, its NEWER than it - self.print( - f'The flow ({flowtime}) is NOT on the last time ' - f'window ({lasttw_end_time}). Its newer', - 3, 0, - ) - amount_of_new_tw = int( - (flowtime - lasttw_end_time) / self.width - ) - self.print( - f'We have to create {amount_of_new_tw}' - f' empty TWs in the middle.', 3, 0, - ) - temp_end = lasttw_end_time - for _ in range(amount_of_new_tw + 1): - new_start = temp_end - twid = self.add_new_tw(profileid, new_start) - self.print(f'Creating the TW id {twid}. ' - f'Start: {new_start}.', 3, 0) - temp_end = new_start + self.width - else: - # The flow was not in the last TW, its OLDER that it - self.print( - f'The flow ({flowtime}) is NOT on the last time ' - f'window ({lasttw_end_time}). Its older', - 3, 0, + # since there was a last TW, so check if the current flow + # belongs here. + if last_tw_end_time > flowtime >= last_tw_start_time: + return lasttwid + + # does it belong to a newer tw? + if last_tw_end_time <= flowtime: + return self.create_new_tws_as_needed( + flowtime, + last_tw_end_time, + profileid ) - if data := self.getTWofTime(profileid, flowtime): - # We found a TW where this flow belongs to - (twid, tw_start_time) = data - return twid - else: - # There was no TW that included the time of this - # flow, so create them in the past - # How many new TW we need in the past? - # amount_of_new_tw is the total amount of tw we - # should have under the new situation - amount_of_new_tw = int( - (lasttw_end_time - flowtime) / self.width - ) - # amount_of_current_tw is the real amount of tw we - # have now - amount_of_current_tw = ( - self.get_number_of_tws_in_profile(profileid) - ) - # diff is the new ones we should add in the past. - # (Yes, we could have computed this differently) - diff = amount_of_new_tw - amount_of_current_tw - self.print(f'We need to create {diff + 1} ' - f'TW before the first', 3, 0) - # Get the first TW - [ - (firsttwid, firsttw_start_time) - ] = self.getFirstTWforProfile(profileid) - firsttw_start_time = float(firsttw_start_time) - # The start of the new older TW should be the first - the width - temp_start = firsttw_start_time - self.width - for _ in range(diff + 1): - new_start = temp_start - # The method to add an older TW is the same as - # to add a new one, just the starttime changes - twid: str = self.addNewOlderTW( - profileid, new_start - ) - self.print(f'Creating the new older TW id {twid}.' - f' Start: {new_start}.', 3, 0) - temp_start = new_start - self.width - else: - # There is no last tw. So create the first TW - # If the option for only-one-tw was selected, we should - # create the TW at least 100 years before the flowtime, - # to cover for 'flows in the past'. Which means we should - # cover for any flow that is coming later with time before the - # first flow - if self.width == 9999999999: - # Seconds in 1 year = 31536000 - startoftw = float(flowtime - (31536000 * 100)) - else: - startoftw = flowtime - # Add this TW, of this profile, to the DB - twid: str = self.add_new_tw(profileid, startoftw) + tw_of_ts: Optional[Tuple[str, float]] + if tw_of_ts := self.get_tw_of_ts(profileid, flowtime): + # We found an existing TW where this flow belongs to + return tw_of_ts[0] + + # The flow was not in the last TW, its OLDER that it + twid: str = self.create_old_tws_as_needed( + flowtime, + last_tw_end_time, + profileid) + return twid + + + # There is no last tw. So create the first TW + # If the option for only-one-tw was selected, we should + # create the TW at least 100 years before the flowtime, + # to cover for 'flows in the past'. Which means we should + # cover for any flow that is coming later with time before the + # first flow + if self.width == 9999999999: + # Seconds in 1 year = 31536000 + startoftw = float(flowtime - (31536000 * 100)) + else: + startoftw = flowtime + # Add this TW, of this profile, to the DB + twid: str = self.add_new_tw(profileid, startoftw) return twid except Exception as e: self.print('Error in get_timewindow().', 0, 1) @@ -1216,25 +1240,33 @@ def get_profiles_len(self) -> int: def get_last_twid_of_profile(self, profileid: str) -> Tuple[str, float]: """ - Returns the last TW id and the starttime of the given profile id + Returns the last TW id (aka tw with the greatest ts seen so far) and + the starttime of the given profile id """ if profileid: - res = self.r.zrange( - f'tws{profileid}', -1, -1, withscores=True) + res = self.r.zrange(f'tws{profileid}', -1, -1, withscores=True) if res: twid, starttime = res[0] return twid, starttime - def getFirstTWforProfile(self, profileid): - """Return the first TW id and the time for the given profile id""" - return ( - self.r.zrange(f'tws{profileid}', 0, 0, withscores=True) - if profileid - else False - ) + def get_first_twid_for_profile(self, profileid: str) -> \ + Optional[Tuple[str, float]]: + """ + Return the first TW id and the time for the given profile id + the returned twid may be a negative tw for example tw-1, depends on + what tw was last registered + """ + if profileid: + res: List[Tuple[str,float]] + res = self.r.zrange(f'tws{profileid}', 0, 0, withscores=True) + if res: + tw: str + starttime_of_tw: float + tw, starttime_of_tw = res[0] + return tw, starttime_of_tw - def getTWofTime(self, profileid, time): + def get_tw_of_ts(self, profileid, time) -> Optional[Tuple[str, float]]: """ Return the TW id and the time for the TW that includes the given time. The score in the DB is the start of the timewindow, so we should search @@ -1265,35 +1297,35 @@ def getTWofTime(self, profileid, time): return data - def addNewOlderTW(self, profileid, startoftw): + def add_new_older_tw(self, + profileid: str, + tw_start_time: float, + tw_number: int + ): + """ + Creates or adds a new timewindow that is OLDER than the + first we have + :param tw_start_time: start time of timewindow to add + :param tw_number: number of timewindow to add + Returns the id of the timewindow just created + """ try: - """ - Creates or adds a new timewindow that is OLDER than the first we have - Return the id of the timewindow just created - """ - # Get the first twid and obtain the new tw id - (firstid, firstid_time) = self.getFirstTWforProfile(profileid)[ - 0 - ] - # We have a first id - # Decrement it!! - twid = 'timewindow' + str( - int(firstid.split('timewindow')[1]) - 1 - ) - # Add the new TW to the index of TW - timewindows: Dict[str, float] = {twid: float(startoftw)} - self.r.zadd(f'tws{profileid}', timewindows) - self.print(f'Created and added to DB the new older ' - f'TW with id {twid}. Time: {startoftw} ' - ,0,4) + twid: str = f'timewindow{tw_number}' + timewindows: Dict[str, float] = {twid: tw_start_time} + self.r.zadd(f'tws{profileid}', timewindows) - # The creation of a TW now does not imply that it was modified. - # You need to put data to mark is at modified - return twid + self.print(f'Created and added to DB the new older ' + f'TW with id {twid}. Time: {tw_start_time} ' + ,0,4) + + # The creation of a TW now does not imply that it was modified. + # You need to put data to mark is at modified + return twid except redis.exceptions.ResponseError as e: self.print('error in addNewOlderTW in database.py', 0, 1) self.print(type(e), 0, 1) self.print(e, 0, 1) + self.print(traceback.print_exc(), 0, 1) def add_new_tw(self, profileid, startoftw) -> str: try: From 5d2391121fdbc1ea8a730e99c8915049e7dcdce8 Mon Sep 17 00:00:00 2001 From: alya Date: Tue, 23 Jan 2024 14:05:41 +0200 Subject: [PATCH 6/6] update unit tests --- tests/test_database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_database.py b/tests/test_database.py index d78560e2e..d904e1b69 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -58,7 +58,7 @@ def test_timewindows(): db.add_new_tw(profileid, 0.0) # add a new tw (last tw) db.add_new_tw(profileid, 5.0) - assert db.get_first_twid_for_profile(profileid) == [('timewindow1', 0.0)] + assert db.get_first_twid_for_profile(profileid) == ('timewindow1', 0.0) assert db.get_last_twid_of_profile(profileid) == ('timewindow2', 5.0)