Skip to content

Commit

Permalink
Merge pull request #1012 from stratosphereips/alya/fix-hanging-flowal…
Browse files Browse the repository at this point in the history
…erts-threads

Fix hanging flowalerts threads by using async functions instead of TimerThread
  • Loading branch information
AlyaGomaa authored Oct 1, 2024
2 parents ad592b5 + af6b2fb commit 7350897
Show file tree
Hide file tree
Showing 22 changed files with 510 additions and 547 deletions.
4 changes: 3 additions & 1 deletion managers/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,9 @@ def should_run_non_stop(self) -> bool:
return True
return False

def shutdown_interactive(self, to_kill_first, to_kill_last):
def shutdown_interactive(
self, to_kill_first, to_kill_last
) -> Tuple[List[Process], List[Process]]:
"""
Shuts down modules in interactive mode only.
it won't work with the daemon's -S because the
Expand Down
81 changes: 38 additions & 43 deletions modules/flowalerts/conn.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import contextlib
import ipaddress
import json
Expand All @@ -6,7 +7,6 @@
import validators

from modules.flowalerts.dns import DNS
from modules.flowalerts.timer_thread import TimerThread
from slips_files.common.abstracts.flowalerts_analyzer import (
IFlowalertsAnalyzer,
)
Expand Down Expand Up @@ -416,16 +416,17 @@ def check_if_resolution_was_made_by_different_version(
pass
return False

def check_connection_without_dns_resolution(self, profileid, twid, flow):
async def check_connection_without_dns_resolution(
self, profileid, twid, flow
) -> bool:
"""
Checks if there's a flow to a dstip that has no cached DNS answer
"""
# The exceptions are:
# 1- Do not check for DNS requests
# 2- Ignore some IPs like private IPs, multicast, and broadcast

if self.should_ignore_conn_without_dns(flow):
return
return False

# Ignore some IP
## - All dhcp servers. Since is ok to connect to
Expand Down Expand Up @@ -458,45 +459,33 @@ def check_connection_without_dns_resolution(self, profileid, twid, flow):
if self.db.is_ip_resolved(flow.daddr, 24):
return False

if flow.uid not in self.connections_checked_in_conn_dns_timer_thread:
# comes here if we haven't started the timer
# thread for this connection before
# mark this connection as checked
self.connections_checked_in_conn_dns_timer_thread.append(flow.uid)
params = [profileid, twid, flow]
# There is no DNS resolution, but it can be that Slips is
# still reading it from the files.
# To give time to Slips to read all the files and get all the flows
# don't alert a Connection Without DNS until 5 seconds has passed
# in real time from the time of this checking.
timer = TimerThread(
15, self.check_connection_without_dns_resolution, params
)
timer.start()
else:
# It means we already checked this conn with the Timer process
# (we waited 15 seconds for the dns to arrive after
# the connection was made)
# but still no dns resolution for it.
# Sometimes the same computer makes requests using
# its ipv4 and ipv6 address, check if this is the case
if self.check_if_resolution_was_made_by_different_version(
profileid, flow.daddr
):
return False
# There is no DNS resolution, but it can be that Slips is
# still reading it from the files.
# To give time to Slips to read all the files and get all the flows
# don't alert a Connection Without DNS until 15 seconds has passed
# in real time from the time of this checking.
await asyncio.sleep(15)
if self.db.is_ip_resolved(flow.daddr, 24):
return False

if self.is_well_known_org(flow.daddr):
# if the SNI or rDNS of the IP matches a
# well-known org, then this is a FP
return False
# Reaching here means we already waited 15 seconds for the dns
# to arrive after the connection was made, but still no dns
# resolution for it.

self.set_evidence.conn_without_dns(twid, flow)
# This UID will never appear again, so we can remove it and
# free some memory
with contextlib.suppress(ValueError):
self.connections_checked_in_conn_dns_timer_thread.remove(
flow.uid
)
# Sometimes the same computer makes requests using
# its ipv4 and ipv6 address, check if this is the case
if self.check_if_resolution_was_made_by_different_version(
profileid, flow.daddr
):
return False

if self.is_well_known_org(flow.daddr):
# if the SNI or rDNS of the IP matches a
# well-known org, then this is a FP
return False

self.set_evidence.conn_without_dns(twid, flow)
return True

def check_conn_to_port_0(self, profileid, twid, flow):
"""
Expand Down Expand Up @@ -732,7 +721,7 @@ def is_dns_conn(flow):

self.set_evidence.conn_to_private_ip(twid, flow)

def analyze(self, msg):
async def analyze(self, msg):
if utils.is_msg_intended_for(msg, "new_flow"):
msg = json.loads(msg["data"])
profileid = msg["profileid"]
Expand All @@ -751,7 +740,13 @@ def analyze(self, msg):
self.check_different_localnet_usage(
twid, flow, what_to_check="srcip"
)
self.check_connection_without_dns_resolution(profileid, twid, flow)
task = asyncio.create_task(
self.check_connection_without_dns_resolution(
profileid, twid, flow
)
)
# to wait for these functions before flowalerts shuts down
self.flowalerts.tasks.append(task)
self.detect_connection_to_multiple_ports(profileid, twid, flow)
self.check_data_upload(profileid, twid, flow)
self.check_non_http_port_80_conns(twid, flow)
Expand Down
108 changes: 50 additions & 58 deletions modules/flowalerts/dns.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import asyncio
import collections
import contextlib
import json
import math
from typing import List
import validators

from modules.flowalerts.timer_thread import TimerThread
from slips_files.common.abstracts.flowalerts_analyzer import (
IFlowalertsAnalyzer,
)
Expand Down Expand Up @@ -157,20 +156,8 @@ def is_connection_made_by_different_version(self, profileid, twid, daddr):
# by this computer but using a different ip version
return True

def check_dns_without_connection(self, profileid, twid, flow):
"""
Makes sure all cached DNS answers are used in contacted_ips
"""
if not self.should_detect_dns_without_conn(flow):
return False

# One DNS query may not be answered exactly by UID,
# but the computer can re-ask the domain,
# and the next DNS resolution can be
# answered. So dont check the UID, check if the domain has an IP

# self.print(f'The DNS query to {domain} had as answers {answers} ')

def get_previous_domain_resolutions(self, query) -> List[str]:
prev_resolutions = []
# It can happen that this domain was already resolved
# previously, but with other IPs
# So we get from the DB all the IPs for this domain
Expand All @@ -180,25 +167,27 @@ def check_dns_without_connection(self, profileid, twid, flow):
# with AAAA, and the computer chooses the A address.
# Therefore, the 2nd DNS resolution
# would be treated as 'without connection', but this is false.
if prev_domain_resolutions := self.db.get_domain_data(flow.query):
prev_domain_resolutions = prev_domain_resolutions.get("IPs", [])
# if there's a domain in the cache
# (prev_domain_resolutions) that is not in the
# current answers given to this function,
# append it to the answers list
flow.answers.extend(
[
ans
for ans in prev_domain_resolutions
if ans not in flow.answers
]
)
if prev_domain_resolutions := self.db.get_domain_data(query):
prev_resolutions = prev_domain_resolutions.get("IPs", [])
return prev_resolutions

def is_any_flow_answer_contacted(self, profileid, twid, flow) -> bool:
"""
checks if any of the answers of the given dns flow were contacted
before
"""
# we're doing this to answer this question, was the query we asked
# the dns for, resolved before to an IP that is not in the
# current flow.answer AND that previous resolution IP was contancted?
# if so, we extend the flow.asnwers to include
# these IPs. the goal is to avoid FPs
flow.answers.extend(self.get_previous_domain_resolutions(flow.query))

if flow.answers == ["-"]:
# If no IPs are in the answer, we can not expect
# the computer to connect to anything
# self.print(f'No ips in the answer, so ignoring')
return False
return True

contacted_ips = self.db.get_all_contacted_ips_in_profileid_twid(
profileid, twid
Expand All @@ -220,38 +209,35 @@ def check_dns_without_connection(self, profileid, twid, flow):
)
):
# this dns resolution has a connection. We can exit
return False
return True

# Check if there was a connection to any of the CNAMEs
if self.is_cname_contacted(flow.answers, contacted_ips):
# this is not a DNS without resolution
return True

async def check_dns_without_connection(
self, profileid, twid, flow
) -> bool:
"""
Makes sure all cached DNS answers are there in contacted_ips
"""
if not self.should_detect_dns_without_conn(flow):
return False

# self.print(f'It seems that none of the IPs were contacted')
# Found a DNS query which none of its IPs was contacted
# It can be that Slips is still reading it from the files.
# Lets check back in some time
# Create a timer thread that will wait some seconds for the
# connection to arrive and then check again
if flow.uid not in self.connections_checked_in_dns_conn_timer_thread:
# comes here if we haven't started the timer
# thread for this dns before mark this dns as checked
self.connections_checked_in_dns_conn_timer_thread.append(flow.uid)
params = [profileid, twid, flow]
# self.print(f'Starting the timer to check on {domain}, uid {uid}.
# time {datetime.datetime.now()}')
timer = TimerThread(40, self.check_dns_without_connection, params)
timer.start()
else:
# It means we already checked this dns with the Timer process
# but still no connection for it.
self.set_evidence.dns_without_conn(twid, flow)
# This UID will never appear again, so we can remove it and
# free some memory
with contextlib.suppress(ValueError):
self.connections_checked_in_dns_conn_timer_thread.remove(
flow.uid
)
if self.is_any_flow_answer_contacted(profileid, twid, flow):
return False

# Found a DNS query and none of its answers were contacted
await asyncio.sleep(40)

if self.is_any_flow_answer_contacted(profileid, twid, flow):
return False

# Reaching here means we already waited some time for the connection
# of this dns to arrive but none was found
self.set_evidence.dns_without_conn(twid, flow)
return True

@staticmethod
def estimate_shannon_entropy(string):
Expand Down Expand Up @@ -408,14 +394,20 @@ def check_dns_arpa_scan(self, profileid, twid, flow):
self.dns_arpa_queries.pop(profileid)
return True

def analyze(self, msg):
async def analyze(self, msg):
if not utils.is_msg_intended_for(msg, "new_dns"):
return False
msg = json.loads(msg["data"])
profileid = msg["profileid"]
twid = msg["twid"]
flow = self.classifier.convert_to_flow_obj(msg["flow"])
self.check_dns_without_connection(profileid, twid, flow)
task = asyncio.create_task(
self.check_dns_without_connection(profileid, twid, flow)
)
# Allow the event loop to run the scheduled task
await asyncio.sleep(0)
# to wait for these functions before flowalerts shuts down
self.flowalerts.tasks.append(task)
self.check_high_entropy_dns_answers(twid, flow)
self.check_invalid_dns_answers(twid, flow)
self.detect_dga(profileid, twid, flow)
Expand Down
33 changes: 27 additions & 6 deletions modules/flowalerts/flowalerts.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import asyncio
import inspect
from asyncio import Task
from typing import List

from slips_files.common.slips_utils import utils
from slips_files.common.abstracts.module import IModule
from slips_files.common.abstracts.async_module import AsyncModule
from .conn import Conn
from .dns import DNS
from .downloaded_file import DownloadedFile
Expand All @@ -8,12 +13,11 @@
from .software import Software
from .ssh import SSH
from .ssl import SSL
from slips_files.core.helpers.whitelist.whitelist import Whitelist

from .tunnel import Tunnel
from slips_files.core.helpers.whitelist.whitelist import Whitelist


class FlowAlerts(IModule):
class FlowAlerts(AsyncModule):
name = "Flow Alerts"
description = (
"Alerts about flows: long connection, successful ssh, "
Expand All @@ -33,6 +37,8 @@ def init(self):
self.downloaded_file = DownloadedFile(self.db, flowalerts=self)
self.tunnel = Tunnel(self.db, flowalerts=self)
self.conn = Conn(self.db, flowalerts=self)
# list of async functions to await before flowalerts shuts down
self.tasks: List[Task] = []

def subscribe_to_channels(self):
channels = (
Expand All @@ -51,6 +57,9 @@ def subscribe_to_channels(self):
channel_obj = self.db.subscribe(channel)
self.channels.update({channel: channel_obj})

async def shutdown_gracefully(self):
await asyncio.gather(*self.tasks)

def pre_main(self):
utils.drop_root_privs()
self.analyzers_map = {
Expand All @@ -66,11 +75,23 @@ def pre_main(self):
"new_ssl": [self.ssl.analyze],
}

def main(self):
async def main(self):
for channel, analyzers in self.analyzers_map.items():
msg: dict = self.get_msg(channel)
if not msg:
continue

for analyzer in analyzers:
analyzer(msg)
# some analyzers are async functions
if inspect.iscoroutinefunction(analyzer):
# analyzer will run normally, until it finishes.
# tasks inside this analyzer will run asynchrously,
# and finish whenever they finish, we'll not wait for them
loop = asyncio.get_event_loop()
task = loop.create_task(analyzer(msg))
# to wait for these functions before flowalerts shuts down
self.tasks.append(task)
# Allow the event loop to run the scheduled task
await asyncio.sleep(0)
else:
analyzer(msg)
Loading

0 comments on commit 7350897

Please sign in to comment.