Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix hanging flowalerts threads by using async functions instead of TimerThread #1012

Merged
merged 45 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
23baea8
flowalerts.conn: use async timer instead of timerthread to avoid hang…
AlyaGomaa Sep 26, 2024
c66ea4c
flowalerts.dns: use async timer instead of timerthread to avoid hangi…
AlyaGomaa Sep 26, 2024
1b37445
Imodule: unify the return type of run()
AlyaGomaa Sep 26, 2024
183815b
flowalerts.ssh: use async timer instead of timerthread
AlyaGomaa Sep 26, 2024
0c5c6b1
update ssh unit tests
AlyaGomaa Sep 26, 2024
e52f0ee
module: add handlers for async main() and shutdown_gracefully() funct…
AlyaGomaa Sep 27, 2024
b755632
zeek.py: fix problem parsing FTP flows
AlyaGomaa Sep 27, 2024
aeea367
http: use the original conn.log flow of teh weird.log in set_evidence…
AlyaGomaa Sep 27, 2024
4e7d46d
module: use the main event loop to run async functions
AlyaGomaa Sep 27, 2024
348a622
module: move asyncmodule class to a separate file
AlyaGomaa Sep 27, 2024
8fce1a1
flowalerts: implement asyncmodule instead of IModule
AlyaGomaa Sep 27, 2024
c3f7b7d
ssl: use async instead of threads to wait for ssl conn.log flows
AlyaGomaa Sep 27, 2024
5dc6897
move get_original_conn_flow() to utils
AlyaGomaa Sep 27, 2024
867fd52
make analyze() asynchronous in conn.py and dns.py
AlyaGomaa Sep 27, 2024
c82f19f
update set_evidence_weird_http_method() unit test
AlyaGomaa Sep 27, 2024
e7c6382
update http unit tests
AlyaGomaa Oct 1, 2024
07dfcd3
update ssl unit tests
AlyaGomaa Oct 1, 2024
d6a38aa
update ssh unit tests
AlyaGomaa Oct 1, 2024
9ffd7f7
update input.py unit tests
AlyaGomaa Oct 1, 2024
8761b44
common_test_utils.py: add a mock to use for async functions
AlyaGomaa Oct 1, 2024
d3881b3
update dns.py unit tests
AlyaGomaa Oct 1, 2024
73cf821
update ssh.py unit tests
AlyaGomaa Oct 1, 2024
c4c62ec
flowalerts.conn: use async timer instead of timerthread to avoid hang…
AlyaGomaa Sep 26, 2024
cb6b122
flowalerts.dns: use async timer instead of timerthread to avoid hangi…
AlyaGomaa Sep 26, 2024
cf46d40
Imodule: unify the return type of run()
AlyaGomaa Sep 26, 2024
4f8a1bf
flowalerts.ssh: use async timer instead of timerthread
AlyaGomaa Sep 26, 2024
e2dc415
update ssh unit tests
AlyaGomaa Sep 26, 2024
94de7f5
module: add handlers for async main() and shutdown_gracefully() funct…
AlyaGomaa Sep 27, 2024
641231c
zeek.py: fix problem parsing FTP flows
AlyaGomaa Sep 27, 2024
235f58c
http: use the original conn.log flow of teh weird.log in set_evidence…
AlyaGomaa Sep 27, 2024
e621eb9
module: use the main event loop to run async functions
AlyaGomaa Sep 27, 2024
f54a0e4
module: move asyncmodule class to a separate file
AlyaGomaa Sep 27, 2024
75c70af
flowalerts: implement asyncmodule instead of IModule
AlyaGomaa Sep 27, 2024
c4dbd11
ssl: use async instead of threads to wait for ssl conn.log flows
AlyaGomaa Sep 27, 2024
70ab33c
move get_original_conn_flow() to utils
AlyaGomaa Sep 27, 2024
5dff960
make analyze() asynchronous in conn.py and dns.py
AlyaGomaa Sep 27, 2024
ae870df
update set_evidence_weird_http_method() unit test
AlyaGomaa Sep 27, 2024
49cfb8b
update http unit tests
AlyaGomaa Oct 1, 2024
21e0741
update ssl unit tests
AlyaGomaa Oct 1, 2024
978aa2e
update ssh unit tests
AlyaGomaa Oct 1, 2024
48284f7
update input.py unit tests
AlyaGomaa Oct 1, 2024
5d49d40
common_test_utils.py: add a mock to use for async functions
AlyaGomaa Oct 1, 2024
b33fb2d
update dns.py unit tests
AlyaGomaa Oct 1, 2024
4eec4b3
update ssh.py unit tests
AlyaGomaa Oct 1, 2024
af6b2fb
fix rebase issues
AlyaGomaa Oct 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion managers/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,9 @@ def should_run_non_stop(self) -> bool:
return True
return False

def shutdown_interactive(self, to_kill_first, to_kill_last):
def shutdown_interactive(
self, to_kill_first, to_kill_last
) -> Tuple[List[Process], List[Process]]:
"""
Shuts down modules in interactive mode only.
it won't work with the daemon's -S because the
Expand Down
81 changes: 38 additions & 43 deletions modules/flowalerts/conn.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import contextlib
import ipaddress
import json
Expand All @@ -6,7 +7,6 @@
import validators

from modules.flowalerts.dns import DNS
from modules.flowalerts.timer_thread import TimerThread
from slips_files.common.abstracts.flowalerts_analyzer import (
IFlowalertsAnalyzer,
)
Expand Down Expand Up @@ -416,16 +416,17 @@ def check_if_resolution_was_made_by_different_version(
pass
return False

def check_connection_without_dns_resolution(self, profileid, twid, flow):
async def check_connection_without_dns_resolution(
self, profileid, twid, flow
) -> bool:
"""
Checks if there's a flow to a dstip that has no cached DNS answer
"""
# The exceptions are:
# 1- Do not check for DNS requests
# 2- Ignore some IPs like private IPs, multicast, and broadcast

if self.should_ignore_conn_without_dns(flow):
return
return False

# Ignore some IP
## - All dhcp servers. Since is ok to connect to
Expand Down Expand Up @@ -458,45 +459,33 @@ def check_connection_without_dns_resolution(self, profileid, twid, flow):
if self.db.is_ip_resolved(flow.daddr, 24):
return False

if flow.uid not in self.connections_checked_in_conn_dns_timer_thread:
# comes here if we haven't started the timer
# thread for this connection before
# mark this connection as checked
self.connections_checked_in_conn_dns_timer_thread.append(flow.uid)
params = [profileid, twid, flow]
# There is no DNS resolution, but it can be that Slips is
# still reading it from the files.
# To give time to Slips to read all the files and get all the flows
# don't alert a Connection Without DNS until 5 seconds has passed
# in real time from the time of this checking.
timer = TimerThread(
15, self.check_connection_without_dns_resolution, params
)
timer.start()
else:
# It means we already checked this conn with the Timer process
# (we waited 15 seconds for the dns to arrive after
# the connection was made)
# but still no dns resolution for it.
# Sometimes the same computer makes requests using
# its ipv4 and ipv6 address, check if this is the case
if self.check_if_resolution_was_made_by_different_version(
profileid, flow.daddr
):
return False
# There is no DNS resolution, but it can be that Slips is
# still reading it from the files.
# To give time to Slips to read all the files and get all the flows
# don't alert a Connection Without DNS until 15 seconds has passed
# in real time from the time of this checking.
await asyncio.sleep(15)
if self.db.is_ip_resolved(flow.daddr, 24):
return False

if self.is_well_known_org(flow.daddr):
# if the SNI or rDNS of the IP matches a
# well-known org, then this is a FP
return False
# Reaching here means we already waited 15 seconds for the dns
# to arrive after the connection was made, but still no dns
# resolution for it.

self.set_evidence.conn_without_dns(twid, flow)
# This UID will never appear again, so we can remove it and
# free some memory
with contextlib.suppress(ValueError):
self.connections_checked_in_conn_dns_timer_thread.remove(
flow.uid
)
# Sometimes the same computer makes requests using
# its ipv4 and ipv6 address, check if this is the case
if self.check_if_resolution_was_made_by_different_version(
profileid, flow.daddr
):
return False

if self.is_well_known_org(flow.daddr):
# if the SNI or rDNS of the IP matches a
# well-known org, then this is a FP
return False

self.set_evidence.conn_without_dns(twid, flow)
return True

def check_conn_to_port_0(self, profileid, twid, flow):
"""
Expand Down Expand Up @@ -732,7 +721,7 @@ def is_dns_conn(flow):

self.set_evidence.conn_to_private_ip(twid, flow)

def analyze(self, msg):
async def analyze(self, msg):
if utils.is_msg_intended_for(msg, "new_flow"):
msg = json.loads(msg["data"])
profileid = msg["profileid"]
Expand All @@ -751,7 +740,13 @@ def analyze(self, msg):
self.check_different_localnet_usage(
twid, flow, what_to_check="srcip"
)
self.check_connection_without_dns_resolution(profileid, twid, flow)
task = asyncio.create_task(
self.check_connection_without_dns_resolution(
profileid, twid, flow
)
)
# to wait for these functions before flowalerts shuts down
self.flowalerts.tasks.append(task)
self.detect_connection_to_multiple_ports(profileid, twid, flow)
self.check_data_upload(profileid, twid, flow)
self.check_non_http_port_80_conns(twid, flow)
Expand Down
108 changes: 50 additions & 58 deletions modules/flowalerts/dns.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import asyncio
import collections
import contextlib
import json
import math
from typing import List
import validators

from modules.flowalerts.timer_thread import TimerThread
from slips_files.common.abstracts.flowalerts_analyzer import (
IFlowalertsAnalyzer,
)
Expand Down Expand Up @@ -157,20 +156,8 @@ def is_connection_made_by_different_version(self, profileid, twid, daddr):
# by this computer but using a different ip version
return True

def check_dns_without_connection(self, profileid, twid, flow):
"""
Makes sure all cached DNS answers are used in contacted_ips
"""
if not self.should_detect_dns_without_conn(flow):
return False

# One DNS query may not be answered exactly by UID,
# but the computer can re-ask the domain,
# and the next DNS resolution can be
# answered. So dont check the UID, check if the domain has an IP

# self.print(f'The DNS query to {domain} had as answers {answers} ')

def get_previous_domain_resolutions(self, query) -> List[str]:
prev_resolutions = []
# It can happen that this domain was already resolved
# previously, but with other IPs
# So we get from the DB all the IPs for this domain
Expand All @@ -180,25 +167,27 @@ def check_dns_without_connection(self, profileid, twid, flow):
# with AAAA, and the computer chooses the A address.
# Therefore, the 2nd DNS resolution
# would be treated as 'without connection', but this is false.
if prev_domain_resolutions := self.db.get_domain_data(flow.query):
prev_domain_resolutions = prev_domain_resolutions.get("IPs", [])
# if there's a domain in the cache
# (prev_domain_resolutions) that is not in the
# current answers given to this function,
# append it to the answers list
flow.answers.extend(
[
ans
for ans in prev_domain_resolutions
if ans not in flow.answers
]
)
if prev_domain_resolutions := self.db.get_domain_data(query):
prev_resolutions = prev_domain_resolutions.get("IPs", [])
return prev_resolutions

def is_any_flow_answer_contacted(self, profileid, twid, flow) -> bool:
"""
checks if any of the answers of the given dns flow were contacted
before
"""
# we're doing this to answer this question, was the query we asked
# the dns for, resolved before to an IP that is not in the
# current flow.answer AND that previous resolution IP was contancted?
# if so, we extend the flow.asnwers to include
# these IPs. the goal is to avoid FPs
flow.answers.extend(self.get_previous_domain_resolutions(flow.query))

if flow.answers == ["-"]:
# If no IPs are in the answer, we can not expect
# the computer to connect to anything
# self.print(f'No ips in the answer, so ignoring')
return False
return True

contacted_ips = self.db.get_all_contacted_ips_in_profileid_twid(
profileid, twid
Expand All @@ -220,38 +209,35 @@ def check_dns_without_connection(self, profileid, twid, flow):
)
):
# this dns resolution has a connection. We can exit
return False
return True

# Check if there was a connection to any of the CNAMEs
if self.is_cname_contacted(flow.answers, contacted_ips):
# this is not a DNS without resolution
return True

async def check_dns_without_connection(
self, profileid, twid, flow
) -> bool:
"""
Makes sure all cached DNS answers are there in contacted_ips
"""
if not self.should_detect_dns_without_conn(flow):
return False

# self.print(f'It seems that none of the IPs were contacted')
# Found a DNS query which none of its IPs was contacted
# It can be that Slips is still reading it from the files.
# Lets check back in some time
# Create a timer thread that will wait some seconds for the
# connection to arrive and then check again
if flow.uid not in self.connections_checked_in_dns_conn_timer_thread:
# comes here if we haven't started the timer
# thread for this dns before mark this dns as checked
self.connections_checked_in_dns_conn_timer_thread.append(flow.uid)
params = [profileid, twid, flow]
# self.print(f'Starting the timer to check on {domain}, uid {uid}.
# time {datetime.datetime.now()}')
timer = TimerThread(40, self.check_dns_without_connection, params)
timer.start()
else:
# It means we already checked this dns with the Timer process
# but still no connection for it.
self.set_evidence.dns_without_conn(twid, flow)
# This UID will never appear again, so we can remove it and
# free some memory
with contextlib.suppress(ValueError):
self.connections_checked_in_dns_conn_timer_thread.remove(
flow.uid
)
if self.is_any_flow_answer_contacted(profileid, twid, flow):
return False

# Found a DNS query and none of its answers were contacted
await asyncio.sleep(40)

if self.is_any_flow_answer_contacted(profileid, twid, flow):
return False

# Reaching here means we already waited some time for the connection
# of this dns to arrive but none was found
self.set_evidence.dns_without_conn(twid, flow)
return True

@staticmethod
def estimate_shannon_entropy(string):
Expand Down Expand Up @@ -408,14 +394,20 @@ def check_dns_arpa_scan(self, profileid, twid, flow):
self.dns_arpa_queries.pop(profileid)
return True

def analyze(self, msg):
async def analyze(self, msg):
if not utils.is_msg_intended_for(msg, "new_dns"):
return False
msg = json.loads(msg["data"])
profileid = msg["profileid"]
twid = msg["twid"]
flow = self.classifier.convert_to_flow_obj(msg["flow"])
self.check_dns_without_connection(profileid, twid, flow)
task = asyncio.create_task(
self.check_dns_without_connection(profileid, twid, flow)
)
# Allow the event loop to run the scheduled task
await asyncio.sleep(0)
# to wait for these functions before flowalerts shuts down
self.flowalerts.tasks.append(task)
self.check_high_entropy_dns_answers(twid, flow)
self.check_invalid_dns_answers(twid, flow)
self.detect_dga(profileid, twid, flow)
Expand Down
33 changes: 27 additions & 6 deletions modules/flowalerts/flowalerts.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import asyncio
import inspect
from asyncio import Task
from typing import List

from slips_files.common.slips_utils import utils
from slips_files.common.abstracts.module import IModule
from slips_files.common.abstracts.async_module import AsyncModule
from .conn import Conn
from .dns import DNS
from .downloaded_file import DownloadedFile
Expand All @@ -8,12 +13,11 @@
from .software import Software
from .ssh import SSH
from .ssl import SSL
from slips_files.core.helpers.whitelist.whitelist import Whitelist

from .tunnel import Tunnel
from slips_files.core.helpers.whitelist.whitelist import Whitelist


class FlowAlerts(IModule):
class FlowAlerts(AsyncModule):
name = "Flow Alerts"
description = (
"Alerts about flows: long connection, successful ssh, "
Expand All @@ -33,6 +37,8 @@ def init(self):
self.downloaded_file = DownloadedFile(self.db, flowalerts=self)
self.tunnel = Tunnel(self.db, flowalerts=self)
self.conn = Conn(self.db, flowalerts=self)
# list of async functions to await before flowalerts shuts down
self.tasks: List[Task] = []

def subscribe_to_channels(self):
channels = (
Expand All @@ -51,6 +57,9 @@ def subscribe_to_channels(self):
channel_obj = self.db.subscribe(channel)
self.channels.update({channel: channel_obj})

async def shutdown_gracefully(self):
await asyncio.gather(*self.tasks)

def pre_main(self):
utils.drop_root_privs()
self.analyzers_map = {
Expand All @@ -66,11 +75,23 @@ def pre_main(self):
"new_ssl": [self.ssl.analyze],
}

def main(self):
async def main(self):
for channel, analyzers in self.analyzers_map.items():
msg: dict = self.get_msg(channel)
if not msg:
continue

for analyzer in analyzers:
analyzer(msg)
# some analyzers are async functions
if inspect.iscoroutinefunction(analyzer):
# analyzer will run normally, until it finishes.
# tasks inside this analyzer will run asynchrously,
# and finish whenever they finish, we'll not wait for them
loop = asyncio.get_event_loop()
task = loop.create_task(analyzer(msg))
# to wait for these functions before flowalerts shuts down
self.tasks.append(task)
# Allow the event loop to run the scheduled task
await asyncio.sleep(0)
else:
analyzer(msg)
Loading
Loading