From c5af79ecb62f6d9a8e29536806714200299643df Mon Sep 17 00:00:00 2001 From: nmasdoufi-ol Date: Thu, 28 Nov 2024 18:24:57 +0100 Subject: [PATCH] Add Detection for CVE-2023-28461 --- agent/exploits/cve_2023_28461.py | 107 +++++++++++++++++++++++ tests/exploits/cve_2023_28461_test.py | 120 ++++++++++++++++++++++++++ 2 files changed, 227 insertions(+) create mode 100644 agent/exploits/cve_2023_28461.py create mode 100644 tests/exploits/cve_2023_28461_test.py diff --git a/agent/exploits/cve_2023_28461.py b/agent/exploits/cve_2023_28461.py new file mode 100644 index 0000000..2b3a3c2 --- /dev/null +++ b/agent/exploits/cve_2023_28461.py @@ -0,0 +1,107 @@ +"""Agent Asteroid implementation for CVE-2023-28461""" + +import datetime +import logging +import re +import ssl +from typing import Any +from urllib import parse as urlparse + +import urllib3 +from packaging import version + +from agent import definitions +from agent import exploits_registry +from agent.exploits import webexploit + +VULNERABILITY_TITLE = ( + "Array Networks Array AG Series and vxAG Remote Code Execution Vulnerability" +) +VULNERABILITY_REFERENCE = "CVE-2023-28461" +VULNERABILITY_DESCRIPTION = """A critical vulnerability in Array Networks Array AG Series and vxAG +SSL VPN gateways allows remote code execution by exploiting an HTTP header with the 'flags' attribute +to browse the filesystem without authentication.""" +RISK_RATING = "CRITICAL" +MAX_VULNERABLE_VERSION = version.parse("9.4.0.481") + +DEFAULT_TIMEOUT = datetime.timedelta(seconds=90) + + +@exploits_registry.register +class CVE202328461Exploit(webexploit.WebExploit): + accept_request = definitions.Request( + method="GET", path="/prx/000/http/localhost/login" + ) + accept_pattern = [re.compile(r"Rel_AG_(\d+_\d+_\d+_\d+)")] + metadata = definitions.VulnerabilityMetadata( + title=VULNERABILITY_TITLE, + description=VULNERABILITY_DESCRIPTION, + reference=VULNERABILITY_REFERENCE, + risk_rating=RISK_RATING, + ) + + def _create_ssl_context(self) -> Any: + """Creates an SSL context that allows insecure connections.""" + ctx = urllib3.util.ssl_.create_urllib3_context() # type: ignore + ctx.load_default_certs() + ctx.options |= 0x4 # ssl.OP_LEGACY_SERVER_CONNECT + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + return ctx + + def _send_ssl_request(self, url: str) -> Any: + """Send a GET request using urllib3 with a custom SSL context.""" + ssl_context = self._create_ssl_context() + with urllib3.PoolManager(ssl_context=ssl_context) as http: + try: + response = http.request( + "GET", url, timeout=DEFAULT_TIMEOUT.total_seconds() + ) + return response.data.decode("utf-8") + except urllib3.exceptions.HTTPError as e: + logging.error("Error during SSL request: %s", e) + return "" + + def accept(self, target: definitions.Target) -> bool: + """Checks if the target matches the accept pattern. + + Args: + target: Target to verify + + Returns: + True if the target appears valid; otherwise False. + """ + target_endpoint = urlparse.urljoin(target.origin, self.accept_request.path) + response_body = self._send_ssl_request(target_endpoint) + + for pattern in self.accept_pattern: + if pattern.search(response_body) is not None: + return True + + return False + + def check(self, target: definitions.Target) -> list[definitions.Vulnerability]: + """Rule to detect specific vulnerability on a specific target. + + Args: + target: Target to scan + + Returns: + List of identified vulnerabilities. + """ + vulnerabilities: list[definitions.Vulnerability] = [] + target_endpoint = urlparse.urljoin(target.origin, self.accept_request.path) + response_body = self._send_ssl_request(target_endpoint) + + for pattern in self.accept_pattern: + match = pattern.search(response_body) + if match is not None: + version_str = match.group(1).replace("_", ".") + target_version = version.parse(version_str) + + if target_version <= MAX_VULNERABLE_VERSION: + vulnerability = self._create_vulnerability(target) + vulnerabilities.append(vulnerability) + break + + return vulnerabilities diff --git a/tests/exploits/cve_2023_28461_test.py b/tests/exploits/cve_2023_28461_test.py new file mode 100644 index 0000000..7f202dc --- /dev/null +++ b/tests/exploits/cve_2023_28461_test.py @@ -0,0 +1,120 @@ +"""Unit tests for Agent Asteroid: CVE-2023-28461""" + +import requests_mock as req_mock +from agent import definitions +from agent.exploits import cve_2023_28461 +from unittest.mock import patch + +VULNERABLE_RESPONSE = """ + + + Login + + + +""" + +SAFE_RESPONSE = """ + + + Login + + + +""" + +NO_VERSION_RESPONSE = """ + + + Login + + + +""" + + +def testCVE202328461_whenVulnerable_reportFinding( + requests_mock: req_mock.mocker.Mocker, +) -> None: + """CVE-2023-28461 unit test: case when the target is vulnerable.""" + requests_mock.get( + "http://localhost:80/prx/000/http/localhost/login", + text=VULNERABLE_RESPONSE, + status_code=200, + ) + + # Mocking the '_send_ssl_request' method to avoid real requests + with patch.object( + cve_2023_28461.CVE202328461Exploit, + "_send_ssl_request", + return_value=VULNERABLE_RESPONSE, + ): + exploit_instance = cve_2023_28461.CVE202328461Exploit() + target = definitions.Target("http", "localhost", 80) + + accept = exploit_instance.accept(target) + vulnerabilities = exploit_instance.check(target) + + assert accept is True + assert len(vulnerabilities) > 0 + vulnerability = vulnerabilities[0] + assert ( + vulnerability.entry.title + == "Array Networks Array AG Series and vxAG Remote Code Execution Vulnerability" + ) + assert ( + vulnerability.technical_detail + == "http://localhost:80 is vulnerable to CVE-2023-28461, Array Networks Array AG Series and vxAG Remote Code Execution Vulnerability" + ) + + +def testCVE202328461_whenSafe_reportNothing( + requests_mock: req_mock.mocker.Mocker, +) -> None: + """CVE-2023-28461 unit test: case when the target is safe.""" + requests_mock.get( + "http://localhost:80/prx/000/http/localhost/login", + text=SAFE_RESPONSE, + status_code=200, + ) + + # Mocking the '_send_ssl_request' method to avoid real requests + with patch.object( + cve_2023_28461.CVE202328461Exploit, + "_send_ssl_request", + return_value=SAFE_RESPONSE, + ): + exploit_instance = cve_2023_28461.CVE202328461Exploit() + target = definitions.Target("http", "localhost", 80) + + accept = exploit_instance.accept(target) + vulnerabilities = exploit_instance.check(target) + + assert accept is True + assert len(vulnerabilities) == 0 + + +def testCVE202328461_whenNoVersionFound_reportNothing( + requests_mock: req_mock.mocker.Mocker, +) -> None: + """CVE-2023-28461 unit test: case when the server does not include a version.""" + requests_mock.get( + "http://localhost:80/prx/000/http/localhost/login", + text=NO_VERSION_RESPONSE, + status_code=200, + ) + + # Mocking the '_send_ssl_request' method to avoid real requests + with patch.object( + cve_2023_28461.CVE202328461Exploit, + "_send_ssl_request", + return_value=NO_VERSION_RESPONSE, + ): + exploit_instance = cve_2023_28461.CVE202328461Exploit() + target = definitions.Target("http", "localhost", 80) + + accept = exploit_instance.accept(target) + vulnerabilities = exploit_instance.check(target) + + assert accept is False + assert len(vulnerabilities) == 0