diff --git a/modules/ip_info/ip_info.py b/modules/ip_info/ip_info.py index 541f8005e..ca77a5b8a 100644 --- a/modules/ip_info/ip_info.py +++ b/modules/ip_info/ip_info.py @@ -565,12 +565,8 @@ def handle_new_ip(self, ip): def main(self): if msg:= self.get_msg('new_MAC'): data = json.loads(msg['data']) - mac_addr = data['MAC'] - host_name = data.get('host_name', False) - profileid = data['profileid'] - - if host_name: - self.db.add_host_name_to_profile(host_name, profileid) + mac_addr: str = data['MAC'] + profileid: str = data['profileid'] self.get_vendor(mac_addr, profileid) self.check_if_we_have_pending_mac_queries() diff --git a/slips_files/core/database/database_manager.py b/slips_files/core/database/database_manager.py index 1975b3739..cff6a503a 100644 --- a/slips_files/core/database/database_manager.py +++ b/slips_files/core/database/database_manager.py @@ -236,8 +236,12 @@ def get_organization_of_port(self, *args, **kwargs): def add_zeek_file(self, *args, **kwargs): return self.rdb.add_zeek_file(*args, **kwargs) - def get_all_zeek_file(self, *args, **kwargs): - return self.rdb.get_all_zeek_file(*args, **kwargs) + def get_all_zeek_files( + self, + *args, + **kwargs + ): + return self.rdb.get_all_zeek_files(*args, **kwargs) def get_gateway_ip(self, *args, **kwargs): return self.rdb.get_gateway_ip(*args, **kwargs) diff --git a/slips_files/core/database/redis_db/database.py b/slips_files/core/database/redis_db/database.py index 25ecc9016..85694edfb 100644 --- a/slips_files/core/database/redis_db/database.py +++ b/slips_files/core/database/redis_db/database.py @@ -537,7 +537,7 @@ def get_evidence_detection_threshold(self): return self.r.hget('analysis', 'evidence_detection_threshold') - def get_input_type(self): + def get_input_type(self) -> str: """ gets input type from the db """ @@ -1038,7 +1038,7 @@ def add_zeek_file(self, filename): """Add an entry to the list of zeek files""" self.r.sadd('zeekfiles', filename) - def get_all_zeek_file(self): + def get_all_zeek_files(self) -> set: """Return all entries from the list of zeek files""" return self.r.smembers('zeekfiles') diff --git a/slips_files/core/helpers/flow_handler.py b/slips_files/core/helpers/flow_handler.py index c4042c1ac..ff1307dc4 100644 --- a/slips_files/core/helpers/flow_handler.py +++ b/slips_files/core/helpers/flow_handler.py @@ -31,7 +31,7 @@ def new_dhcp(self,profileid, flow): self.db.publish('new_dhcp', json.dumps(to_send)) - def new_MAC(self, mac: str, ip: str, host_name=False): + def new_MAC(self, mac: str, ip: str): """ check if mac and ip aren't multicast or link-local and publish to new_MAC channel to get more info about the mac @@ -54,8 +54,6 @@ def new_MAC(self, mac: str, ip: str, host_name=False): 'MAC': mac, 'profileid': f'profile_{ip}' } - if host_name: - to_send['host_name'] = host_name self.db.publish('new_MAC', json.dumps(to_send)) @@ -81,9 +79,21 @@ def __init__(self, db, symbol_handler, flow): self.publisher = Publisher(self.db) self.flow = flow self.symbol = symbol_handler + self.running_non_stop: bool = self.is_running_non_stop() - def is_supported_flow(self): + def is_running_non_stop(self) -> bool: + """ + Slips runs non-stop in case of an interface or a growing zeek dir, + it only stops on ctrl+c + """ + if ( + self.db.get_input_type() == 'interface' + or + self.db.is_growing_zeek_dir() + ): + return True + def is_supported_flow(self): supported_types = ( 'ssh', 'ssl', @@ -168,6 +178,11 @@ def handle_conn(self): self.flow.smac ) + if self.running_non_stop: + # to avoid publishing duplicate MACs, when running on + # an interface, we should have an arp.log, so we'll publish + # MACs from there only + return self.publisher.new_MAC(self.flow.smac, self.flow.saddr) self.publisher.new_MAC(self.flow.dmac, self.flow.daddr) @@ -296,7 +311,6 @@ def handle_dhcp(self): self.publisher.new_MAC( self.flow.smac or False, self.flow.saddr, - host_name=(self.flow.host_name or False) ) self.db.add_mac_addr_to_profile( diff --git a/slips_files/core/input.py b/slips_files/core/input.py index 12ca43b7f..a7e4709ae 100644 --- a/slips_files/core/input.py +++ b/slips_files/core/input.py @@ -113,10 +113,16 @@ def init( self.is_profiler_done_event = is_profiler_done_event def is_done_processing(self): - """marks this process as done processing so slips.py would know when to terminate""" + """ + marks this process as done processing so + slips.py would know when to terminate + """ # signal slips.py that this process is done - # tell profiler that this process is done and no kmore flows are arriving - self.print(f"Telling Profiler to stop because no more input is arriving.", log_to_logfiles_only=True) + # tell profiler that this process is + # done and no more flows are arriving + self.print(f"Telling Profiler to stop because " + f"no more input is arriving.", + log_to_logfiles_only=True) self.profiler_queue.put('stop') self.print(f"Waiting for Profiler to stop.", log_to_logfiles_only=True) self.is_profiler_done_event.wait() @@ -325,7 +331,7 @@ def get_earliest_line(self): # It may happen that we check all the files in the folder, # and there is still no files for us. # To cover this case, just refresh the list of files - self.zeek_files = self.db.get_all_zeek_file() + self.zeek_files = self.db.get_all_zeek_files() # time.sleep(1) return False, False @@ -343,7 +349,7 @@ def get_earliest_line(self): return earliest_line, file_with_earliest_flow def read_zeek_files(self) -> int: - self.zeek_files = self.db.get_all_zeek_file() + self.zeek_files = self.db.get_all_zeek_files() self.open_file_handlers = {} self.file_time = {} self.cache_lines = {} @@ -380,7 +386,7 @@ def read_zeek_files(self) -> int: del self.file_time[file_with_earliest_flow] # Get the new list of files. Since new files may have been created by # Zeek while we were processing them. - self.zeek_files = self.db.get_all_zeek_file() + self.zeek_files = self.db.get_all_zeek_files() self.close_all_handles() return self.lines @@ -423,7 +429,8 @@ def read_zeek_folder(self): # This is the case that a folder full of zeek files is passed with -f # wait max 10 seconds before stopping slips if no new flows are read self.bro_timeout = 10 - if self.db.is_growing_zeek_dir(): + growing_zeek_dir: bool = self.db.is_growing_zeek_dir() + if growing_zeek_dir: # slips is given a dir that is growing i.e zeek dir running on an interface # don't stop zeek or slips self.bro_timeout = float('inf') @@ -446,20 +453,21 @@ def read_zeek_folder(self): if self.is_ignored_file(full_path): continue - if not self.db.is_growing_zeek_dir(): - # get the total number of flows slips is going to read (used later for the progress bar) + if not growing_zeek_dir: + # get the total number of flows slips is going to read + # (used later for the progress bar) total_flows += self.get_flows_number(full_path) # Add log file to the database self.db.add_zeek_file(full_path) - # in testing mode, we only need to read one zeek file to know # that this function is working correctly if self.testing: break - if total_flows == 0: + if total_flows == 0 and not growing_zeek_dir: + # we're given an empty dir/ zeek logfile self.is_done_processing() return True diff --git a/tests/test_inputProc.py b/tests/test_inputProc.py index 65384c55e..24dc09c63 100644 --- a/tests/test_inputProc.py +++ b/tests/test_inputProc.py @@ -34,7 +34,7 @@ def test_is_growing_zeek_dir( zeek_dir: str, is_tabs: bool, mock_rdb ): input = ModuleFactory().create_inputProcess_obj(zeek_dir, 'zeek_folder', mock_rdb) - mock_rdb.get_all_zeek_file.return_value = [os.path.join(zeek_dir, 'conn.log')] + mock_rdb.get_all_zeek_files.return_value = [os.path.join(zeek_dir, 'conn.log')] assert input.read_zeek_folder() is True