Skip to content

Commit

Permalink
Merge pull request #426 from stratosphereips/alya/handle_publishing_d…
Browse files Browse the repository at this point in the history
…uplicate_MACs

handle looking up duplicate MAC addresses
  • Loading branch information
AlyaGomaa authored Dec 6, 2023
2 parents b06a56b + e40b8bc commit c3cd747
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 27 deletions.
8 changes: 2 additions & 6 deletions modules/ip_info/ip_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,12 +565,8 @@ def handle_new_ip(self, ip):
def main(self):
if msg:= self.get_msg('new_MAC'):
data = json.loads(msg['data'])
mac_addr = data['MAC']
host_name = data.get('host_name', False)
profileid = data['profileid']

if host_name:
self.db.add_host_name_to_profile(host_name, profileid)
mac_addr: str = data['MAC']
profileid: str = data['profileid']

self.get_vendor(mac_addr, profileid)
self.check_if_we_have_pending_mac_queries()
Expand Down
8 changes: 6 additions & 2 deletions slips_files/core/database/database_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,12 @@ def get_organization_of_port(self, *args, **kwargs):
def add_zeek_file(self, *args, **kwargs):
return self.rdb.add_zeek_file(*args, **kwargs)

def get_all_zeek_file(self, *args, **kwargs):
return self.rdb.get_all_zeek_file(*args, **kwargs)
def get_all_zeek_files(
self,
*args,
**kwargs
):
return self.rdb.get_all_zeek_files(*args, **kwargs)

def get_gateway_ip(self, *args, **kwargs):
return self.rdb.get_gateway_ip(*args, **kwargs)
Expand Down
4 changes: 2 additions & 2 deletions slips_files/core/database/redis_db/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ def get_evidence_detection_threshold(self):
return self.r.hget('analysis', 'evidence_detection_threshold')


def get_input_type(self):
def get_input_type(self) -> str:
"""
gets input type from the db
"""
Expand Down Expand Up @@ -1042,7 +1042,7 @@ def add_zeek_file(self, filename):
"""Add an entry to the list of zeek files"""
self.r.sadd('zeekfiles', filename)

def get_all_zeek_file(self):
def get_all_zeek_files(self) -> set:
"""Return all entries from the list of zeek files"""
return self.r.smembers('zeekfiles')

Expand Down
24 changes: 19 additions & 5 deletions slips_files/core/helpers/flow_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def new_dhcp(self,profileid, flow):
self.db.publish('new_dhcp', json.dumps(to_send))


def new_MAC(self, mac: str, ip: str, host_name=False):
def new_MAC(self, mac: str, ip: str):
"""
check if mac and ip aren't multicast or link-local
and publish to new_MAC channel to get more info about the mac
Expand All @@ -54,8 +54,6 @@ def new_MAC(self, mac: str, ip: str, host_name=False):
'MAC': mac,
'profileid': f'profile_{ip}'
}
if host_name:
to_send['host_name'] = host_name
self.db.publish('new_MAC', json.dumps(to_send))


Expand All @@ -81,9 +79,21 @@ def __init__(self, db, symbol_handler, flow):
self.publisher = Publisher(self.db)
self.flow = flow
self.symbol = symbol_handler
self.running_non_stop: bool = self.is_running_non_stop()

def is_supported_flow(self):
def is_running_non_stop(self) -> bool:
"""
Slips runs non-stop in case of an interface or a growing zeek dir,
it only stops on ctrl+c
"""
if (
self.db.get_input_type() == 'interface'
or
self.db.is_growing_zeek_dir()
):
return True

def is_supported_flow(self):
supported_types = (
'ssh',
'ssl',
Expand Down Expand Up @@ -168,6 +178,11 @@ def handle_conn(self):
self.flow.smac
)

if self.running_non_stop:
# to avoid publishing duplicate MACs, when running on
# an interface, we should have an arp.log, so we'll publish
# MACs from there only
return

self.publisher.new_MAC(self.flow.smac, self.flow.saddr)
self.publisher.new_MAC(self.flow.dmac, self.flow.daddr)
Expand Down Expand Up @@ -296,7 +311,6 @@ def handle_dhcp(self):
self.publisher.new_MAC(
self.flow.smac or False,
self.flow.saddr,
host_name=(self.flow.host_name or False)
)

self.db.add_mac_addr_to_profile(
Expand Down
30 changes: 19 additions & 11 deletions slips_files/core/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,16 @@ def init(
self.is_profiler_done_event = is_profiler_done_event

def is_done_processing(self):
"""marks this process as done processing so slips.py would know when to terminate"""
"""
marks this process as done processing so
slips.py would know when to terminate
"""
# signal slips.py that this process is done
# tell profiler that this process is done and no kmore flows are arriving
self.print(f"Telling Profiler to stop because no more input is arriving.", log_to_logfiles_only=True)
# tell profiler that this process is
# done and no more flows are arriving
self.print(f"Telling Profiler to stop because "
f"no more input is arriving.",
log_to_logfiles_only=True)
self.profiler_queue.put('stop')
self.print(f"Waiting for Profiler to stop.", log_to_logfiles_only=True)
self.is_profiler_done_event.wait()
Expand Down Expand Up @@ -325,7 +331,7 @@ def get_earliest_line(self):
# It may happen that we check all the files in the folder,
# and there is still no files for us.
# To cover this case, just refresh the list of files
self.zeek_files = self.db.get_all_zeek_file()
self.zeek_files = self.db.get_all_zeek_files()
# time.sleep(1)
return False, False

Expand All @@ -343,7 +349,7 @@ def get_earliest_line(self):
return earliest_line, file_with_earliest_flow

def read_zeek_files(self) -> int:
self.zeek_files = self.db.get_all_zeek_file()
self.zeek_files = self.db.get_all_zeek_files()
self.open_file_handlers = {}
self.file_time = {}
self.cache_lines = {}
Expand Down Expand Up @@ -380,7 +386,7 @@ def read_zeek_files(self) -> int:
del self.file_time[file_with_earliest_flow]
# Get the new list of files. Since new files may have been created by
# Zeek while we were processing them.
self.zeek_files = self.db.get_all_zeek_file()
self.zeek_files = self.db.get_all_zeek_files()

self.close_all_handles()
return self.lines
Expand Down Expand Up @@ -423,7 +429,8 @@ def read_zeek_folder(self):
# This is the case that a folder full of zeek files is passed with -f
# wait max 10 seconds before stopping slips if no new flows are read
self.bro_timeout = 10
if self.db.is_growing_zeek_dir():
growing_zeek_dir: bool = self.db.is_growing_zeek_dir()
if growing_zeek_dir:
# slips is given a dir that is growing i.e zeek dir running on an interface
# don't stop zeek or slips
self.bro_timeout = float('inf')
Expand All @@ -446,20 +453,21 @@ def read_zeek_folder(self):
if self.is_ignored_file(full_path):
continue

if not self.db.is_growing_zeek_dir():
# get the total number of flows slips is going to read (used later for the progress bar)
if not growing_zeek_dir:
# get the total number of flows slips is going to read
# (used later for the progress bar)
total_flows += self.get_flows_number(full_path)

# Add log file to the database
self.db.add_zeek_file(full_path)


# in testing mode, we only need to read one zeek file to know
# that this function is working correctly
if self.testing:
break

if total_flows == 0:
if total_flows == 0 and not growing_zeek_dir:
# we're given an empty dir/ zeek logfile
self.is_done_processing()
return True

Expand Down
2 changes: 1 addition & 1 deletion tests/test_inputProc.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_is_growing_zeek_dir(
zeek_dir: str, is_tabs: bool, mock_rdb
):
input = ModuleFactory().create_inputProcess_obj(zeek_dir, 'zeek_folder', mock_rdb)
mock_rdb.get_all_zeek_file.return_value = [os.path.join(zeek_dir, 'conn.log')]
mock_rdb.get_all_zeek_files.return_value = [os.path.join(zeek_dir, 'conn.log')]

assert input.read_zeek_folder() is True

Expand Down

0 comments on commit c3cd747

Please sign in to comment.