diff --git a/modules/http_analyzer/http_analyzer.py b/modules/http_analyzer/http_analyzer.py index 969cd00e2..543294a7a 100644 --- a/modules/http_analyzer/http_analyzer.py +++ b/modules/http_analyzer/http_analyzer.py @@ -1,14 +1,19 @@ from slips_files.common.abstracts import Module import multiprocessing +from scapy.all import sniff, IP, TCP +from collections import defaultdict from slips_files.core.database.database import __database__ from slips_files.common.config_parser import ConfigParser +from typing import Optional +from user_agents import parse as ua_parse from slips_files.common.slips_utils import utils import sys import traceback import json import urllib import requests +import hashlib class Module(Module, multiprocessing.Process): @@ -53,6 +58,52 @@ def print(self, text, verbose=1, debug=0): def read_configuration(self): conf = ConfigParser() self.pastebin_downloads_threshold = conf.get_pastebin_download_threshold() + + def detect_executable_mime_types(self, resp_mime_types, profileid, twid, uid, timestamp): + if resp_mime_types: + executable_mime_types = [ + 'application/x-msdownload', + 'application/x-ms-dos-executable', + 'application/x-ms-exe', + 'application/x-exe', + 'application/x-winexe', + 'application/x-winhlp', + 'application/x-winhelp', + 'application/octet-stream' + ] + + for mime_type in resp_mime_types: + if mime_type in executable_mime_types: + self.print(f'Detected executable mime type: {mime_type}', 0, 1) + self.report_executable_mime_type( + mime_type, + profileid, + twid, + uid, + timestamp + ) + break + + def detect_quantum_insert_mots(self, packet): + """ + Detect Quantum Insert attacks with More-On-The-Side (MOTS) technique. + """ + if IP in packet and TCP in packet: + src_ip = packet[IP].src + dst_ip = packet[IP].dst + src_port = packet[TCP].sport + dst_port = packet[TCP].dport + seq = packet[TCP].seq + payload = packet[TCP].payload + payload_hash = hashlib.sha256(bytes(payload)).hexdigest() + + connection_key = (src_ip, dst_ip, src_port, dst_port, seq) + if connection_key in self.packet_hashes: + # Check if payloads are different for the same connection_key + if payload_hash != self.packet_hashes[connection_key]: + self.print(f"Potential Quantum Insert MOTS detected: {connection_key}", verbose=1) + else: + self.packet_hashes[connection_key] = payload_hash def check_suspicious_user_agents( self, uid, host, uri, timestamp, user_agent, profileid, twid @@ -453,6 +504,7 @@ def shutdown_gracefully(self): def run(self): utils.drop_root_privs() + self.packet_hashes = defaultdict(str) # Main loop function while True: try: @@ -545,6 +597,16 @@ def run(self): twid, uid ) + + self.detect_executable_mime_types( + resp_mime_types, + profileid, + twid, + uid, + timestamp + ) + + sniff(filter="tcp", prn=self.detect_quantum_insert_mots, count=10000) except KeyboardInterrupt: self.shutdown_gracefully()