Skip to content

Commit

Permalink
Merge pull request #1022 from stratosphereips/alya/fix-stopping-slips
Browse files Browse the repository at this point in the history
fix race condition preventing the input and profiler semaphores to be acquired
  • Loading branch information
AlyaGomaa authored Oct 7, 2024
2 parents 4de5c5f + b2bd228 commit c4a83e7
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 40 deletions.
65 changes: 37 additions & 28 deletions managers/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,25 +492,21 @@ def stop_slips(self) -> bool:
if self.should_run_non_stop():
return False

if (
self.stop_slips_received()
or self.slips_is_done_receiving_new_flows()
):
return True

return False
return (
self.is_stop_msg_received() or self.is_done_receiving_new_flows()
)

def stop_slips_received(self):
def is_stop_msg_received(self) -> bool:
"""
returns true if the channel received the 'stop_slips' msg
returns true if the control_channel channel received the
'stop_slips' msg
"""
message = self.main.c1.get_message(timeout=0.01)
if (
return (
message
and utils.is_msg_intended_for(message, "control_channel")
and message["data"] == "stop_slips"
):
return True
)

def is_debugger_active(self) -> bool:
"""Returns true if the debugger is currently active"""
Expand All @@ -525,13 +521,11 @@ def should_run_non_stop(self) -> bool:
# these are the cases where slips should be running non-stop
# when slips is reading from a special module other than the input process
# this module should handle the stopping of slips
if (
return (
self.is_debugger_active()
or self.main.input_type in ("stdin", "cyst")
or self.main.is_interface
):
return True
return False
)

def shutdown_interactive(
self, to_kill_first, to_kill_last
Expand Down Expand Up @@ -573,23 +567,38 @@ def shutdown_interactive(
# all of them are killed
return None, None

def slips_is_done_receiving_new_flows(self) -> bool:
def can_acquire_semaphore(self, semaphore) -> bool:
"""
return True if the given semaphore can be aquired
"""
if semaphore.acquire(block=False):
# ok why are we releasing after aquiring?
# because once the module release the semaphore, this process
# needs to be able to acquire it as many times as it wants,
# not just once (which is what happens if we dont release)
semaphore.release()
return True
return False

def is_done_receiving_new_flows(self) -> bool:
"""
Determines if slips is still receiving new flows.
this method will return True when the input and profiler release
the semaphores signaling that they're done
If they're still processing it will return False
If they're still processing (we can't acquire the semaphore),
it will return False
"""
# try to acquire the semaphore without blocking
input_done_processing: bool = self.is_input_done.acquire(block=False)
profiler_done_processing: bool = self.is_profiler_done.acquire(
block=False
# the goal of using can_acquire_semaphore()
# is to avoid the race condition that happens when
# one of the 2 semaphores (input and profiler) is released and
# the other isnt
input_done_processing: bool = self.can_acquire_semaphore(
self.is_input_done
)

if input_done_processing and profiler_done_processing:
return True

# can't acquire the semaphore, processes are still running
return False
profiler_done_processing: bool = self.can_acquire_semaphore(
self.is_profiler_done
)
return input_done_processing and profiler_done_processing

def kill_daemon_children(self):
"""
Expand Down
22 changes: 11 additions & 11 deletions slips_files/core/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ def init(
self.is_profiler_done_event = is_profiler_done_event
self.is_running_non_stop: bool = self.db.is_running_non_stop()

def is_done_processing(self):
def mark_self_as_done_processing(self):
"""
marks this process as done processing so
slips.py would know when to terminate
marks this process as done processing and wait for the profiler to
stop so slips.py would know when to terminate
"""
# signal slips.py that this process is done
# tell profiler that this process is
Expand Down Expand Up @@ -474,14 +474,14 @@ def read_zeek_folder(self):

if total_flows == 0 and not growing_zeek_dir:
# we're given an empty dir/ zeek logfile
self.is_done_processing()
self.mark_self_as_done_processing()
return True

self.total_flows = total_flows
self.db.set_input_metadata({"total_flows": total_flows})
self.lines = self.read_zeek_files()
self.print_lines_read()
self.is_done_processing()
self.mark_self_as_done_processing()
return True

def print_lines_read(self):
Expand Down Expand Up @@ -549,7 +549,7 @@ def handle_binetflow(self):
if self.testing:
break

self.is_done_processing()
self.mark_self_as_done_processing()
return True

def handle_suricata(self):
Expand All @@ -567,7 +567,7 @@ def handle_suricata(self):
self.lines += 1
if self.testing:
break
self.is_done_processing()
self.mark_self_as_done_processing()
return True

def is_zeek_tabs_file(self, filepath: str) -> bool:
Expand Down Expand Up @@ -617,7 +617,7 @@ def handle_zeek_log_file(self):
# as we're running on an interface
self.bro_timeout = 30
self.lines = self.read_zeek_files()
self.is_done_processing()
self.mark_self_as_done_processing()
return True

def handle_nfdump(self):
Expand All @@ -628,7 +628,7 @@ def handle_nfdump(self):
self.nfdump_output = result.stdout.decode("utf-8")
self.lines = self.read_nfdump_output()
self.print_lines_read()
self.is_done_processing()
self.mark_self_as_done_processing()
return True

def start_observer(self):
Expand Down Expand Up @@ -684,7 +684,7 @@ def handle_pcap_and_interface(self) -> int:
self.is_zeek_tabs = False
self.lines = self.read_zeek_files()
self.print_lines_read()
self.is_done_processing()
self.mark_self_as_done_processing()

connlog_path = os.path.join(self.zeek_dir, "conn.log")

Expand Down Expand Up @@ -905,7 +905,7 @@ def handle_cyst(self):
self.lines += 1
self.print("Done reading 1 CYST flow.\n ", 0, 3)

self.is_done_processing()
self.mark_self_as_done_processing()

def give_profiler(self, line):
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/module_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def create_input_obj(
termination_event=Mock(),
)
input.db = mock_db
input.is_done_processing = do_nothing
input.mark_self_as_done_processing = do_nothing
input.bro_timeout = 1
# override the print function to avoid broken pipes
input.print = Mock()
Expand Down

0 comments on commit c4a83e7

Please sign in to comment.