Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a function to detect MOTS #259

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions modules/http_analyzer/http_analyzer.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@

from slips_files.common.abstracts import Module
import multiprocessing
from scapy.all import sniff, IP, TCP
from collections import defaultdict
from slips_files.core.database.database import __database__
from slips_files.common.config_parser import ConfigParser
from typing import Optional
from user_agents import parse as ua_parse
from slips_files.common.slips_utils import utils
import sys
import traceback
import json
import urllib
import requests
import hashlib


class Module(Module, multiprocessing.Process):
Expand Down Expand Up @@ -53,6 +58,52 @@ def print(self, text, verbose=1, debug=0):
def read_configuration(self):
conf = ConfigParser()
self.pastebin_downloads_threshold = conf.get_pastebin_download_threshold()

def detect_executable_mime_types(self, resp_mime_types, profileid, twid, uid, timestamp):
if resp_mime_types:
executable_mime_types = [
'application/x-msdownload',
'application/x-ms-dos-executable',
'application/x-ms-exe',
'application/x-exe',
'application/x-winexe',
'application/x-winhlp',
'application/x-winhelp',
'application/octet-stream'
]

for mime_type in resp_mime_types:
if mime_type in executable_mime_types:
self.print(f'Detected executable mime type: {mime_type}', 0, 1)
self.report_executable_mime_type(
mime_type,
profileid,
twid,
uid,
timestamp
)
break

def detect_quantum_insert_mots(self, packet):
"""
Detect Quantum Insert attacks with More-On-The-Side (MOTS) technique.
"""
if IP in packet and TCP in packet:
src_ip = packet[IP].src
dst_ip = packet[IP].dst
src_port = packet[TCP].sport
dst_port = packet[TCP].dport
seq = packet[TCP].seq
payload = packet[TCP].payload
payload_hash = hashlib.sha256(bytes(payload)).hexdigest()

connection_key = (src_ip, dst_ip, src_port, dst_port, seq)
if connection_key in self.packet_hashes:
# Check if payloads are different for the same connection_key
if payload_hash != self.packet_hashes[connection_key]:
self.print(f"Potential Quantum Insert MOTS detected: {connection_key}", verbose=1)
else:
self.packet_hashes[connection_key] = payload_hash

def check_suspicious_user_agents(
self, uid, host, uri, timestamp, user_agent, profileid, twid
Expand Down Expand Up @@ -453,6 +504,7 @@ def shutdown_gracefully(self):

def run(self):
utils.drop_root_privs()
self.packet_hashes = defaultdict(str)
# Main loop function
while True:
try:
Expand Down Expand Up @@ -545,6 +597,16 @@ def run(self):
twid,
uid
)

self.detect_executable_mime_types(
resp_mime_types,
profileid,
twid,
uid,
timestamp
)

sniff(filter="tcp", prn=self.detect_quantum_insert_mots, count=10000)

except KeyboardInterrupt:
self.shutdown_gracefully()
Expand Down