Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle looking up duplicate MAC addresses #426

Merged
merged 4 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions modules/ip_info/ip_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,12 +565,8 @@ def handle_new_ip(self, ip):
def main(self):
if msg:= self.get_msg('new_MAC'):
data = json.loads(msg['data'])
mac_addr = data['MAC']
host_name = data.get('host_name', False)
profileid = data['profileid']

if host_name:
self.db.add_host_name_to_profile(host_name, profileid)
mac_addr: str = data['MAC']
profileid: str = data['profileid']

self.get_vendor(mac_addr, profileid)
self.check_if_we_have_pending_mac_queries()
Expand Down
8 changes: 6 additions & 2 deletions slips_files/core/database/database_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,12 @@ def get_organization_of_port(self, *args, **kwargs):
def add_zeek_file(self, *args, **kwargs):
return self.rdb.add_zeek_file(*args, **kwargs)

def get_all_zeek_file(self, *args, **kwargs):
return self.rdb.get_all_zeek_file(*args, **kwargs)
def get_all_zeek_files(
self,
*args,
**kwargs
):
return self.rdb.get_all_zeek_files(*args, **kwargs)

def get_gateway_ip(self, *args, **kwargs):
return self.rdb.get_gateway_ip(*args, **kwargs)
Expand Down
4 changes: 2 additions & 2 deletions slips_files/core/database/redis_db/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ def get_evidence_detection_threshold(self):
return self.r.hget('analysis', 'evidence_detection_threshold')


def get_input_type(self):
def get_input_type(self) -> str:
"""
gets input type from the db
"""
Expand Down Expand Up @@ -1038,7 +1038,7 @@ def add_zeek_file(self, filename):
"""Add an entry to the list of zeek files"""
self.r.sadd('zeekfiles', filename)

def get_all_zeek_file(self):
def get_all_zeek_files(self) -> set:
"""Return all entries from the list of zeek files"""
return self.r.smembers('zeekfiles')

Expand Down
24 changes: 19 additions & 5 deletions slips_files/core/helpers/flow_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def new_dhcp(self,profileid, flow):
self.db.publish('new_dhcp', json.dumps(to_send))


def new_MAC(self, mac: str, ip: str, host_name=False):
def new_MAC(self, mac: str, ip: str):
"""
check if mac and ip aren't multicast or link-local
and publish to new_MAC channel to get more info about the mac
Expand All @@ -54,8 +54,6 @@ def new_MAC(self, mac: str, ip: str, host_name=False):
'MAC': mac,
'profileid': f'profile_{ip}'
}
if host_name:
to_send['host_name'] = host_name
self.db.publish('new_MAC', json.dumps(to_send))


Expand All @@ -81,9 +79,21 @@ def __init__(self, db, symbol_handler, flow):
self.publisher = Publisher(self.db)
self.flow = flow
self.symbol = symbol_handler
self.running_non_stop: bool = self.is_running_non_stop()

def is_supported_flow(self):
def is_running_non_stop(self) -> bool:
"""
Slips runs non-stop in case of an interface or a growing zeek dir,
it only stops on ctrl+c
"""
if (
self.db.get_input_type() == 'interface'
or
self.db.is_growing_zeek_dir()
):
return True

def is_supported_flow(self):
supported_types = (
'ssh',
'ssl',
Expand Down Expand Up @@ -168,6 +178,11 @@ def handle_conn(self):
self.flow.smac
)

if self.running_non_stop:
# to avoid publishing duplicate MACs, when running on
# an interface, we should have an arp.log, so we'll publish
# MACs from there only
return

self.publisher.new_MAC(self.flow.smac, self.flow.saddr)
self.publisher.new_MAC(self.flow.dmac, self.flow.daddr)
Expand Down Expand Up @@ -296,7 +311,6 @@ def handle_dhcp(self):
self.publisher.new_MAC(
self.flow.smac or False,
self.flow.saddr,
host_name=(self.flow.host_name or False)
)

self.db.add_mac_addr_to_profile(
Expand Down
30 changes: 19 additions & 11 deletions slips_files/core/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,16 @@ def init(
self.is_profiler_done_event = is_profiler_done_event

def is_done_processing(self):
"""marks this process as done processing so slips.py would know when to terminate"""
"""
marks this process as done processing so
slips.py would know when to terminate
"""
# signal slips.py that this process is done
# tell profiler that this process is done and no kmore flows are arriving
self.print(f"Telling Profiler to stop because no more input is arriving.", log_to_logfiles_only=True)
# tell profiler that this process is
# done and no more flows are arriving
self.print(f"Telling Profiler to stop because "
f"no more input is arriving.",
log_to_logfiles_only=True)
self.profiler_queue.put('stop')
self.print(f"Waiting for Profiler to stop.", log_to_logfiles_only=True)
self.is_profiler_done_event.wait()
Expand Down Expand Up @@ -325,7 +331,7 @@ def get_earliest_line(self):
# It may happen that we check all the files in the folder,
# and there is still no files for us.
# To cover this case, just refresh the list of files
self.zeek_files = self.db.get_all_zeek_file()
self.zeek_files = self.db.get_all_zeek_files()
# time.sleep(1)
return False, False

Expand All @@ -343,7 +349,7 @@ def get_earliest_line(self):
return earliest_line, file_with_earliest_flow

def read_zeek_files(self) -> int:
self.zeek_files = self.db.get_all_zeek_file()
self.zeek_files = self.db.get_all_zeek_files()
self.open_file_handlers = {}
self.file_time = {}
self.cache_lines = {}
Expand Down Expand Up @@ -380,7 +386,7 @@ def read_zeek_files(self) -> int:
del self.file_time[file_with_earliest_flow]
# Get the new list of files. Since new files may have been created by
# Zeek while we were processing them.
self.zeek_files = self.db.get_all_zeek_file()
self.zeek_files = self.db.get_all_zeek_files()

self.close_all_handles()
return self.lines
Expand Down Expand Up @@ -423,7 +429,8 @@ def read_zeek_folder(self):
# This is the case that a folder full of zeek files is passed with -f
# wait max 10 seconds before stopping slips if no new flows are read
self.bro_timeout = 10
if self.db.is_growing_zeek_dir():
growing_zeek_dir: bool = self.db.is_growing_zeek_dir()
if growing_zeek_dir:
# slips is given a dir that is growing i.e zeek dir running on an interface
# don't stop zeek or slips
self.bro_timeout = float('inf')
Expand All @@ -446,20 +453,21 @@ def read_zeek_folder(self):
if self.is_ignored_file(full_path):
continue

if not self.db.is_growing_zeek_dir():
# get the total number of flows slips is going to read (used later for the progress bar)
if not growing_zeek_dir:
# get the total number of flows slips is going to read
# (used later for the progress bar)
total_flows += self.get_flows_number(full_path)

# Add log file to the database
self.db.add_zeek_file(full_path)


# in testing mode, we only need to read one zeek file to know
# that this function is working correctly
if self.testing:
break

if total_flows == 0:
if total_flows == 0 and not growing_zeek_dir:
# we're given an empty dir/ zeek logfile
self.is_done_processing()
return True

Expand Down
2 changes: 1 addition & 1 deletion tests/test_inputProc.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_is_growing_zeek_dir(
zeek_dir: str, is_tabs: bool, mock_rdb
):
input = ModuleFactory().create_inputProcess_obj(zeek_dir, 'zeek_folder', mock_rdb)
mock_rdb.get_all_zeek_file.return_value = [os.path.join(zeek_dir, 'conn.log')]
mock_rdb.get_all_zeek_files.return_value = [os.path.join(zeek_dir, 'conn.log')]

assert input.read_zeek_folder() is True

Expand Down
Loading