Skip to content

Commit

Permalink
Merge pull request #1114 from stratosphereips/alya/small_bug_fixes
Browse files Browse the repository at this point in the history
Add multiple reconnection attempts to telnet detection
  • Loading branch information
AlyaGomaa authored Dec 11, 2024
2 parents f02645e + be788bd commit 5bbb0e2
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 39 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
- Add multiple telnet reconnection attempts detection

1.1.4.1 (Dec 3rd, 2024)
- Fix abstract class starting with the rest of the modules.
- Fix the updating of the MAC vendors database used in slips.
Expand Down
69 changes: 63 additions & 6 deletions modules/flowalerts/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
from slips_files.common.flow_classifier import FlowClassifier


NOT_ESTAB = "Not Established"
ESTAB = "Established"


class Conn(IFlowalertsAnalyzer):
def init(self):
# get the default gateway
Expand All @@ -38,6 +42,7 @@ def init(self):
self.classifier = FlowClassifier()
self.our_ips = utils.get_own_ips()
self.input_type: str = self.db.get_input_type()
self.multiple_reconnection_attempts_threshold = 5

def read_configuration(self):
conf = ConfigParser()
Expand Down Expand Up @@ -171,7 +176,7 @@ def check_unknown_port(self, profileid, twid, flow):
"""
if not flow.dport:
return
if flow.interpreted_state != "Established":
if flow.interpreted_state != ESTAB:
# detect unknown ports on established conns only
return False

Expand All @@ -195,6 +200,54 @@ def check_unknown_port(self, profileid, twid, flow):
self.set_evidence.unknown_port(twid, flow)
return True

def is_telnet(self, flow) -> bool:
try:
dport = int(flow.dport)
except ValueError:
# binetflow icmp ports are hex strings
return False

telnet_ports = (23, 2323)
return dport in telnet_ports and flow.proto.lower() == "tcp"

def check_multiple_telnet_reconnection_attempts(
self, profileid, twid, flow
):
if flow.interpreted_state != NOT_ESTAB:
return

if not self.is_telnet(flow):
return

key = f"{flow.saddr}-{flow.daddr}-telnet"
# add this conn to the stored number of reconnections
current_reconnections = self.db.get_reconnections_for_tw(
profileid, twid
)
try:
reconnections, uids = current_reconnections[key]
reconnections += 1
uids.append(flow.uid)
current_reconnections[key] = (reconnections, uids)
except KeyError:
current_reconnections[key] = (1, [flow.uid])
reconnections = 1

if reconnections < 4:
# update the reconnections ctr in the db
self.db.set_reconnections(profileid, twid, current_reconnections)
return

self.set_evidence.multiple_telnet_reconnection_attempts(
twid, flow, reconnections, current_reconnections[key][1]
)

# reset the reconnection attempts of this src->dst since an evidence
# is set
current_reconnections[key] = (0, [])

self.db.set_reconnections(profileid, twid, current_reconnections)

def check_multiple_reconnection_attempts(self, profileid, twid, flow):
"""
Alerts when 5+ reconnection attempts from the same source IP to
Expand All @@ -219,7 +272,8 @@ def check_multiple_reconnection_attempts(self, profileid, twid, flow):
current_reconnections[key] = (1, [flow.uid])
reconnections = 1

if reconnections < 5:
if reconnections < self.multiple_reconnection_attempts_threshold:
self.db.set_reconnections(profileid, twid, current_reconnections)
return

self.set_evidence.multiple_reconnection_attempts(
Expand Down Expand Up @@ -511,7 +565,7 @@ def check_conn_to_port_0(self, profileid, twid, flow):
)

def detect_connection_to_multiple_ports(self, profileid, twid, flow):
if flow.proto != "tcp" or flow.interpreted_state != "Established":
if flow.proto != "tcp" or flow.interpreted_state != ESTAB:
return

dport_name = flow.appproto
Expand All @@ -525,7 +579,7 @@ def detect_connection_to_multiple_ports(self, profileid, twid, flow):
# Connection to multiple ports to the destination IP
if profileid.split("_")[1] == flow.saddr:
direction = "Dst"
state = "Established"
state = ESTAB
protocol = "TCP"
role = "Client"
type_data = "IPs"
Expand Down Expand Up @@ -565,7 +619,7 @@ def detect_connection_to_multiple_ports(self, profileid, twid, flow):
# Happens in the mode 'all'
elif profileid.split("_")[-1] == flow.daddr:
direction = "Src"
state = "Established"
state = ESTAB
protocol = "TCP"
role = "Server"
type_data = "IPs"
Expand Down Expand Up @@ -608,7 +662,7 @@ def check_non_http_port_80_conns(self, twid, flow):
str(flow.dport) == "80"
and flow.proto.lower() == "tcp"
and str(flow.appproto).lower() != "http"
and flow.interpreted_state == "Established"
and flow.interpreted_state == ESTAB
and (flow.sbytes + flow.dbytes) != 0
):
self.set_evidence.non_http_port_80_conn(twid, flow)
Expand Down Expand Up @@ -734,6 +788,9 @@ async def analyze(self, msg):
self.check_long_connection(twid, flow)
self.check_unknown_port(profileid, twid, flow)
self.check_multiple_reconnection_attempts(profileid, twid, flow)
self.check_multiple_telnet_reconnection_attempts(
profileid, twid, flow
)
self.check_conn_to_port_0(profileid, twid, flow)
self.check_different_localnet_usage(
twid, flow, what_to_check="dstip"
Expand Down
14 changes: 7 additions & 7 deletions modules/flowalerts/dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,17 +309,17 @@ def check_high_entropy_dns_answers(self, twid, flow):
)

def check_invalid_dns_answers(self, twid, flow):
# this function is used to check for certain IP
# answers to DNS queries being blocked
# (perhaps by ad blockers) and set to the following IP values
# currently hardcoding blocked ips
invalid_answers = {"127.0.0.1", "0.0.0.0"}
"""
this function is used to check for private IPs in the answers of
a dns queries.
probably means the queries is being blocked
(perhaps by ad blockers) and set to a private IP value
"""
if not flow.answers:
return

for answer in flow.answers:
if answer in invalid_answers and flow.query != "localhost":
# blocked answer found
if utils.is_private_ip(answer) and flow.query != "localhost":
self.set_evidence.invalid_dns_answer(twid, flow, answer)
# delete answer from redis cache to prevent
# associating this dns answer with this domain/query and
Expand Down
39 changes: 39 additions & 0 deletions modules/flowalerts/set_evidence.py
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,45 @@ def self_signed_certificates(self, twid, flow) -> None:
)
self.db.set_evidence(evidence)

def multiple_telnet_reconnection_attempts(
self, twid, flow, reconnections, uids: List[str]
):
"""
Set evidence for 4+ telnet unsuccessful attempts.
"""
confidence: float = 0.5
threat_level: ThreatLevel = ThreatLevel.MEDIUM

twid: int = int(twid.replace("timewindow", ""))

description = (
f"Multiple Telnet reconnection attempts from IP: {flow.saddr} "
f"to Destination IP: {flow.daddr} "
f"reconnections: {reconnections}"
)
evidence: Evidence = Evidence(
evidence_type=EvidenceType.MULTIPLE_RECONNECTION_ATTEMPTS,
attacker=Attacker(
direction=Direction.SRC,
attacker_type=IoCType.IP,
value=flow.saddr,
),
victim=Victim(
direction=Direction.DST,
victim_type=IoCType.IP,
value=flow.daddr,
),
threat_level=threat_level,
confidence=confidence,
description=description,
profile=ProfileID(ip=flow.saddr),
timewindow=TimeWindow(number=twid),
uid=uids,
timestamp=flow.starttime,
)

self.db.set_evidence(evidence)

def multiple_reconnection_attempts(
self, twid, flow, reconnections, uids: List[str]
) -> None:
Expand Down
50 changes: 26 additions & 24 deletions slips_files/common/slips_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
import sys
import ipaddress
import aid_hash
from typing import Any, Optional
from typing import (
Any,
Optional,
Union,
)
from dataclasses import is_dataclass, asdict
from enum import Enum

Expand Down Expand Up @@ -265,7 +269,7 @@ def convert_format(self, ts, required_format: str):

# convert to the req format
if required_format == "iso":
return datetime_obj.astimezone().isoformat()
return datetime_obj.astimezone(tz=self.local_tz).isoformat()
elif required_format == "unixtimestamp":
return datetime_obj.timestamp()
else:
Expand Down Expand Up @@ -390,20 +394,19 @@ def is_port_in_use(self, port: int) -> bool:
sock.close()
return True

def is_private_ip(self, ip_obj: ipaddress) -> bool:
"""
This function replaces the ipaddress library 'is_private'
because it does not work correctly and it does not ignore
the ips 0.0.0.0 or 255.255.255.255
"""
# Is it a well-formed ipv4 or ipv6?
r_value = False
if ip_obj and ip_obj.is_private:
if ip_obj != ipaddress.ip_address(
"0.0.0.0"
) and ip_obj != ipaddress.ip_address("255.255.255.255"):
r_value = True
return r_value
def is_private_ip(
self, ip: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, str]
) -> bool:
ip_classes = {ipaddress.IPv4Address, ipaddress.IPv6Address}
for class_ in ip_classes:
if isinstance(ip, class_):
return ip and ip.is_private

if self.detect_ioc_type(ip) != "ip":
return False
# convert the given str ip to obj
ip_obj = ipaddress.ip_address(ip)
return ip_obj.is_private

def is_ignored_ip(self, ip: str) -> bool:
"""
Expand All @@ -414,16 +417,15 @@ def is_ignored_ip(self, ip: str) -> bool:
ip_obj = ipaddress.ip_address(ip)
except (ipaddress.AddressValueError, ValueError):
return True

# Is the IP multicast, private? (including localhost)
# The broadcast address 255.255.255.255 is reserved.
return bool(
(
ip_obj.is_multicast
or self.is_private_ip(ip_obj)
or ip_obj.is_link_local
or ip_obj.is_reserved
or ".255" in ip_obj.exploded
)
return (
ip_obj.is_multicast
or self.is_private_ip(ip_obj)
or ip_obj.is_link_local
or ip_obj.is_loopback
or ip_obj.is_reserved
)

def get_sha256_hash(self, filename: str):
Expand Down
72 changes: 72 additions & 0 deletions tests/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,78 @@ def test_check_unknown_port_true_case(mocker):
mock_set_evidence.assert_called_once_with(twid, flow)


@pytest.mark.parametrize(
"origstate, saddr, daddr, dport, uids, interpreted_state, expected_calls",
[
( # Testcase1:5 rejections, evidence should be set
"REJ",
"192.168.1.1",
"192.168.1.2",
23,
[f"uid_{i}" for i in range(4)],
"Not Established",
1,
),
( # Testcase2: Less than 5 rejections, no evidence
"RST",
"192.168.1.1",
"192.168.1.2",
2323,
[f"uid_{i}" for i in range(4)],
"Not Established",
1,
),
( # Testcase3: Non-REJ state, no evidence
"Established",
"192.168.1.1",
"192.168.1.2",
23,
["uid_1"],
"Established",
0,
),
],
)
def test_check_multiple_telnet_reconnection_attempts(
origstate, saddr, daddr, dport, uids, interpreted_state, expected_calls
):
"""
Tests the check_multiple_telnet_reconnection_attempts function
with various scenarios.
"""
conn = ModuleFactory().create_conn_analyzer_obj()
conn.set_evidence.multiple_telnet_reconnection_attempts = Mock()
conn.db.get_reconnections_for_tw.return_value = {}

for uid in uids:
flow = Conn(
starttime="1726249372.312124",
uid=uid,
saddr=saddr,
daddr=daddr,
dur=1,
proto="tcp",
appproto="",
sport="0",
dport=dport,
spkts=0,
dpkts=0,
sbytes=0,
dbytes=0,
smac="",
dmac="",
state=origstate,
history="",
)
flow.interpreted_state = interpreted_state
conn.check_multiple_telnet_reconnection_attempts(profileid, twid, flow)

assert (
conn.set_evidence.multiple_telnet_reconnection_attempts.call_count
== expected_calls
)


@pytest.mark.parametrize(
"origstate, saddr, daddr, dport, uids, expected_calls",
[
Expand Down
4 changes: 2 additions & 2 deletions tests/test_slips_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ def test_assert_microseconds(input_value, expected_output):
# testcase4: Public IPv4 address
(ipaddress.ip_address("8.8.8.8"), False),
# testcase5: Special IP address 0.0.0.0
(ipaddress.ip_address("0.0.0.0"), False),
(ipaddress.ip_address("0.0.0.0"), True),
# testcase6: Broadcast IP address 255.255.255.255
(ipaddress.ip_address("255.255.255.255"), False),
(ipaddress.ip_address("255.255.255.255"), True),
],
)
def test_is_private_ip(ip_address, expected_result):
Expand Down

0 comments on commit 5bbb0e2

Please sign in to comment.