From 6d9bdaa3bf6e8e79fa61de0e44154cefca7db6f4 Mon Sep 17 00:00:00 2001 From: nmasdoufi-ol Date: Fri, 11 Oct 2024 16:55:00 +0100 Subject: [PATCH 1/2] Add CVE-2024-23113 --- agent/exploits/cve_2024_23113.py | 263 ++++++++++++++++++++++++++ tests/exploits/cve_2024_23113_test.py | 243 ++++++++++++++++++++++++ 2 files changed, 506 insertions(+) create mode 100644 agent/exploits/cve_2024_23113.py create mode 100644 tests/exploits/cve_2024_23113_test.py diff --git a/agent/exploits/cve_2024_23113.py b/agent/exploits/cve_2024_23113.py new file mode 100644 index 0000000..2d40d8a --- /dev/null +++ b/agent/exploits/cve_2024_23113.py @@ -0,0 +1,263 @@ +"""Agent Asteroid implementation for CVE-2024-23113""" + +import datetime +import logging +import re +import socket + +import requests +from ostorlab.agent.kb import kb +from ostorlab.agent.mixins import agent_report_vulnerability_mixin +from pysnmp import hlapi # type: ignore + +from agent import definitions +from agent import exploits_registry + +VULNERABILITY_TITLE = "Critical Fortinet Format String Vulnerability" +VULNERABILITY_REFERENCE = "CVE-2024-23113" +VULNERABILITY_DESCRIPTION = """A use of externally-controlled format string in Fortinet FortiOS versions 7.4.0 through 7.4.2, 7.2.0 through + 7.2.6, 7.0.0 through 7.0.13, FortiProxy versions 7.4.0 through 7.4.2, 7.2.0 through 7.2.8, 7.0.0 through 7.0.14, + FortiPAM versions 1.2.0, 1.1.0 through 1.1.2, 1.0.0 through 1.0.3, FortiSwitchManager versions 7.2.0 through 7.2.3, + 7.0.0 through 7.0.3 allows attacker to execute unauthorized code or commands via specially crafted packets.""" + +DEFAULT_TIMEOUT = TIMEOUT = datetime.timedelta(seconds=10) + +VULNERABLE_VERSIONS = { + "FortiOS": [ + (7, 0, 0), + (7, 0, 1), + (7, 0, 2), + (7, 0, 3), + (7, 0, 4), + (7, 0, 5), + (7, 0, 6), + (7, 0, 7), + (7, 0, 8), + (7, 0, 9), + (7, 0, 10), + (7, 0, 11), + (7, 0, 12), + (7, 0, 13), + (7, 2, 0), + (7, 2, 1), + (7, 2, 2), + (7, 2, 3), + (7, 2, 4), + (7, 2, 5), + (7, 2, 6), + (7, 4, 0), + (7, 4, 1), + (7, 4, 2), + ], + "FortiProxy": [ + (7, 0, 0), + (7, 0, 1), + (7, 0, 2), + (7, 0, 3), + (7, 0, 4), + (7, 0, 5), + (7, 0, 6), + (7, 0, 7), + (7, 0, 8), + (7, 0, 9), + (7, 0, 10), + (7, 0, 11), + (7, 0, 12), + (7, 0, 13), + (7, 0, 14), + (7, 2, 0), + (7, 2, 1), + (7, 2, 2), + (7, 2, 3), + (7, 2, 4), + (7, 2, 5), + (7, 2, 6), + (7, 2, 7), + (7, 2, 8), + (7, 4, 0), + (7, 4, 1), + (7, 4, 2), + ], + "FortiPAM": [ + (1, 0, 0), + (1, 0, 1), + (1, 0, 2), + (1, 0, 3), + (1, 1, 0), + (1, 1, 1), + (1, 1, 2), + (1, 2, 0), + ], + "FortiSwitchManager": [ + (7, 0, 0), + (7, 0, 1), + (7, 0, 2), + (7, 0, 3), + (7, 2, 0), + (7, 2, 1), + (7, 2, 2), + (7, 2, 3), + ], +} + + +def _get_fortinet_version_snmp(host: str) -> tuple[str, tuple[int, int, int]] | None: + version = None + product = None + iterator = hlapi.getCmd( + hlapi.SnmpEngine(), + hlapi.CommunityData("public", mpModel=1), + hlapi.UdpTransportTarget((host, 161), timeout=DEFAULT_TIMEOUT.seconds), + hlapi.ContextData(), + hlapi.ObjectType(hlapi.ObjectIdentity("1.3.6.1.2.1.47.1.1.1.1.10.1")), + ) + error_indication, error_status, error_index, var_binds = next(iterator) + + if error_indication is not None: + logging.error("SNMP error_indication:: %s", error_indication) + return None + elif error_status is not None: + logging.error("SNMP error_status: %s at %s", error_status, error_index) + return None + else: + for var_bind in var_binds: + description = var_bind[1].prettyPrint() + logging.info("Fortinet description found: %s", description) + match = re.search( + r"(FortiOS|FortiProxy|FortiPAM|FortiSwitchManager).*v(\d+)\.(\d+)\.(\d+)", + description, + ) + if match is not None: + product = match.group(1) + version_parts = list(map(int, match.groups()[1:])) + version = tuple(version_parts + [0] * (3 - len(version_parts))) + logging.info("%s version extracted: %s", product, version) + + if product is not None and isinstance(version, tuple) and len(version) == 3: + return product, version + return None + + +def _get_fortinet_version_http(host: str) -> tuple[str, tuple[int, int, int]] | None: + try: + response = requests.get( + f"https://{host}", verify=False, timeout=DEFAULT_TIMEOUT.seconds + ) + content = response.text + + # FortiSwitchManager detection + match = re.search( + r"FortiSwitchManager (\d+\.\d+\.\d+)", content + ) + if match is not None: + version = tuple(int(v) for v in match.group(1).split(".")) + return "FortiSwitchManager", version # type: ignore + + # FortiPAM detection + match = re.search(r"FortiPAM (\d+\.\d+)", content) + if match is not None: + version_parts = [int(v) for v in match.group(1).split(".")] + # Ensure we always have 3 parts in the version tuple + version = tuple(version_parts + [0] * (3 - len(version_parts))) + return "FortiPAM", version # type: ignore + + except requests.RequestException: + logging.error("Failed to connect to %s via HTTPS", host) + + return None + + +def _get_fortinet_version_tcp(host: str) -> tuple[str, tuple[int, int, int]] | None: + try: + with socket.create_connection( + (host, 53), timeout=DEFAULT_TIMEOUT.seconds + ) as sock: + data = sock.recv(1024).decode("utf-8") + match = re.search(r"(\d+\.\d+\.\d+)-.*-FortiOS", data) + if match: + version = tuple(map(int, match.group(1).split("."))) + return "FortiOS", version # type: ignore + except (socket.error, UnicodeDecodeError): + logging.error(f"Failed to connect to {host} on port 53") + + return None + + +def _get_fortinet_version(host: str) -> tuple[str, tuple[int, int, int]] | None: + # Try HTTP First + result = _get_fortinet_version_http(host) + if result is not None: + return result + + # Try TCP if HTTP Fails + result = _get_fortinet_version_tcp(host) + if result is not None: + return result + + # Try SNMP if HTTP Fails + result = _get_fortinet_version_snmp(host) + if result is not None: + return result + + return None + + +def _create_vulnerability( + target: definitions.Target, product: str, version: tuple[int, int, int] +) -> definitions.Vulnerability: + entry = kb.Entry( + title=VULNERABILITY_TITLE, + risk_rating="HIGH", + short_description=VULNERABILITY_DESCRIPTION, + description=f"{VULNERABILITY_DESCRIPTION} Detected {product} version: {'.'.join(map(str, version))}", + references={ + "fortiguard.com": "https://www.fortiguard.com/psirt/FG-IR-24-029", + "nvd.nist.gov": f"https://nvd.nist.gov/vuln/detail/{VULNERABILITY_REFERENCE}", + }, + recommendation=( + f"- Update {product} to the latest non-vulnerable version. " + "- If immediate updating is not possible, restrict access to trusted IP addresses and monitor for suspicious activities." + ), + security_issue=True, + privacy_issue=False, + has_public_exploit=True, + targeted_by_malware=True, + targeted_by_ransomware=False, + targeted_by_nation_state=True, + ) + technical_detail = ( + f"{product} device at {target.origin} is running a vulnerable version: {'.'.join(map(str, version))}. " + f"This version is susceptible to CVE-2024-23113. Immediate action is required." + ) + vulnerability = definitions.Vulnerability( + entry=entry, + technical_detail=technical_detail, + risk_rating=agent_report_vulnerability_mixin.RiskRating.HIGH, + ) + return vulnerability + + +@exploits_registry.register +class CVE202423113Exploit(definitions.Exploit): + """ + CVE-2024-23113: Fortinet Products Format String Vulnerability + """ + + def accept(self, target: definitions.Target) -> bool: + result = _get_fortinet_version(target.host) + if result is not None: + product, version = result + return version in VULNERABLE_VERSIONS.get(product, []) + return False + + def check(self, target: definitions.Target) -> list[definitions.Vulnerability]: + vulnerabilities: list[definitions.Vulnerability] = [] + result = _get_fortinet_version(target.host) + + if result is not None: + product, version = result + if version in VULNERABLE_VERSIONS.get(product, []): + vulnerability = _create_vulnerability(target, product, version) + vulnerabilities.append(vulnerability) + + return vulnerabilities diff --git a/tests/exploits/cve_2024_23113_test.py b/tests/exploits/cve_2024_23113_test.py new file mode 100644 index 0000000..9d2791a --- /dev/null +++ b/tests/exploits/cve_2024_23113_test.py @@ -0,0 +1,243 @@ +"""Unit tests for Agent Asteroid: CVE-2024-23113""" + +from unittest import mock + +import pytest +from pytest_mock import plugin +from ostorlab.agent.mixins import agent_report_vulnerability_mixin + +from agent import definitions +from agent.exploits import cve_2024_23113 + + +def testCVE202423113_whenVulnerable_reportFinding( + mocker: plugin.MockerFixture, +) -> None: + """CVE-2024-23113 unit test: case when target is vulnerable.""" + + vulnerable_version = "7.0.5" + mock_var_bind = mock.MagicMock() + mock_var_bind.__getitem__.return_value.prettyPrint.return_value = ( + f"FortiOS v{vulnerable_version}" + ) + + mock_iterator = mock.MagicMock() + mock_iterator.__next__.return_value = (None, None, None, [mock_var_bind]) + mocker.patch("pysnmp.hlapi.getCmd", return_value=mock_iterator) + + exploit_instance = cve_2024_23113.CVE202423113Exploit() + + target = definitions.Target("udp", "192.168.1.1", 161) + + accept = exploit_instance.accept(target) + vulnerabilities = exploit_instance.check(target) + + assert accept is True + assert len(vulnerabilities) == 1 + vulnerability = vulnerabilities[0] + assert vulnerability.entry.title == cve_2024_23113.VULNERABILITY_TITLE + assert vulnerability.entry.risk_rating == "HIGH" + assert vulnerability.risk_rating == agent_report_vulnerability_mixin.RiskRating.HIGH + assert ( + f"FortiOS device at udp://192.168.1.1:161 is running a vulnerable version: {vulnerable_version}" + in vulnerability.technical_detail + ) + + +def testCVE202423113_whenSafe_reportNothing( + mocker: plugin.MockerFixture, +) -> None: + """CVE-2024-23113 unit test: case when target is not vulnerable.""" + + safe_version = "8.0.0" + mock_var_bind = mock.MagicMock() + mock_var_bind.__getitem__.return_value.prettyPrint.return_value = ( + f"FortiOS v{safe_version}" + ) + + mock_iterator = mock.MagicMock() + mock_iterator.__next__.return_value = (None, None, None, [mock_var_bind]) + mocker.patch("pysnmp.hlapi.getCmd", return_value=mock_iterator) + + exploit_instance = cve_2024_23113.CVE202423113Exploit() + + target = definitions.Target("udp", "192.168.1.1", 161) + + accept = exploit_instance.accept(target) + vulnerabilities = exploit_instance.check(target) + + assert accept is False + assert len(vulnerabilities) == 0 + + +def testCVE202423113_whenVersionNotFound_reportNothing( + mocker: plugin.MockerFixture, +) -> None: + """CVE-2024-23113 unit test: case when version cannot be determined.""" + + mock_var_bind = mock.MagicMock() + mock_var_bind.__getitem__.return_value.prettyPrint.return_value = ( + "Unexpected response" + ) + + mock_iterator = mock.MagicMock() + mock_iterator.__next__.return_value = (None, None, None, [mock_var_bind]) + mocker.patch("pysnmp.hlapi.getCmd", return_value=mock_iterator) + + exploit_instance = cve_2024_23113.CVE202423113Exploit() + + target = definitions.Target("udp", "192.168.1.1", 161) + + accept = exploit_instance.accept(target) + vulnerabilities = exploit_instance.check(target) + + assert accept is False + assert len(vulnerabilities) == 0 + + +def testCVE202423113_whenSNMPErrorStatus_handleGracefully( + mocker: plugin.MockerFixture, +) -> None: + """CVE-2024-23113 unit test: case when SNMP returns an error status.""" + + mock_iterator = mock.MagicMock() + mock_iterator.__next__.return_value = (None, "Error", 0, None) + mocker.patch("pysnmp.hlapi.getCmd", return_value=mock_iterator) + mock_logging = mocker.patch("logging.error") + + mocker.patch( + "agent.exploits.cve_2024_23113._get_fortinet_version_http", return_value=None + ) + mocker.patch( + "agent.exploits.cve_2024_23113._get_fortinet_version_tcp", return_value=None + ) + + exploit_instance = cve_2024_23113.CVE202423113Exploit() + + target = definitions.Target("udp", "192.168.1.1", 161) + + accept = exploit_instance.accept(target) + vulnerabilities = exploit_instance.check(target) + + assert accept is False + assert len(vulnerabilities) == 0 + mock_logging.assert_called_with("SNMP error_status: %s at %s", "Error", 0) + + +def testCVE202423113_whenSNMPFails_fallbackToOtherMethods( + mocker: plugin.MockerFixture, +) -> None: + """CVE-2024-23113 unit test: case when SNMP fails and falls back to other methods.""" + + # Mock SNMP to fail + mock_iterator = mock.MagicMock() + mock_iterator.__next__.return_value = ("Error", None, None, None) + mocker.patch("pysnmp.hlapi.getCmd", return_value=mock_iterator) + + # Mock HTTP detection to succeed + mocker.patch( + "agent.exploits.cve_2024_23113._get_fortinet_version_http", + return_value=("FortiOS", (7, 0, 5)), + ) + + exploit_instance = cve_2024_23113.CVE202423113Exploit() + + target = definitions.Target("tcp", "192.168.1.1", 443) + + accept = exploit_instance.accept(target) + vulnerabilities = exploit_instance.check(target) + + assert accept is True + assert len(vulnerabilities) == 1 + vulnerability = vulnerabilities[0] + assert vulnerability.entry.title == cve_2024_23113.VULNERABILITY_TITLE + assert ( + "FortiOS device at tcp://192.168.1.1:443 is running a vulnerable version: 7.0.5" + in vulnerability.technical_detail + ) + + +@pytest.mark.parametrize( + "content,expected_product,expected_version", + [ + ("FortiSwitchManager 7.2.0", "FortiSwitchManager", (7, 2, 0)), + ("FortiPAM 1.1 Hands-On LAB", "FortiPAM", (1, 1, 0)), + ], +) +def testCVE202423113_withHTTPDetection_returnVulnerabilities( + mocker: plugin.MockerFixture, + content: str, + expected_product: str, + expected_version: tuple[int, int, int], +) -> None: + """Test HTTP-based detection for various Fortinet products.""" + mock_response = mock.MagicMock() + mock_response.text = content + mocker.patch("requests.get", return_value=mock_response) + + exploit_instance = cve_2024_23113.CVE202423113Exploit() + target = definitions.Target("tcp", "192.168.1.1", 443) + + accept = exploit_instance.accept(target) + vulnerabilities = exploit_instance.check(target) + + assert accept is True + assert len(vulnerabilities) == 1 + vulnerability = vulnerabilities[0] + assert vulnerability.entry.title == cve_2024_23113.VULNERABILITY_TITLE + assert ( + f"{expected_product} device at tcp://192.168.1.1:443 is running a vulnerable version: {'.'.join(map(str, expected_version))}" + in vulnerability.technical_detail + ) + + +def testCVE202423113_withTCPDetection_returnVulnerabilities( + mocker: plugin.MockerFixture, +) -> None: + """Test TCP-based detection for FortiOS.""" + mock_socket = mocker.patch("socket.create_connection") + mock_instance = mock_socket.return_value.__enter__.return_value + + # Simulate receiving Fortinet version string via TCP + mock_instance.recv.return_value = b"7.0.0-build1234-FortiOS" + + # Instantiate the exploit class and define the target + exploit_instance = cve_2024_23113.CVE202423113Exploit() + target = definitions.Target("tcp", "192.168.1.1", 53) + + # Run the accept and check methods + accept = exploit_instance.accept(target) + vulnerabilities = exploit_instance.check(target) + + assert accept is True + assert len(vulnerabilities) == 1 + vulnerability = vulnerabilities[0] + assert vulnerability.entry.title == cve_2024_23113.VULNERABILITY_TITLE + assert ( + "FortiOS device at tcp://192.168.1.1:53 is running a vulnerable version: 7.0.0" + in vulnerability.technical_detail + ) + + +def testCVE202423113_whenAllDetectionMethodsFail_shouldReturnNoVulnerabilities( + mocker: plugin.MockerFixture, +) -> None: + """Test case when all detection methods fail.""" + mocker.patch( + "agent.exploits.cve_2024_23113._get_fortinet_version_snmp", return_value=None + ) + mocker.patch( + "agent.exploits.cve_2024_23113._get_fortinet_version_http", return_value=None + ) + mocker.patch( + "agent.exploits.cve_2024_23113._get_fortinet_version_tcp", return_value=None + ) + + exploit_instance = cve_2024_23113.CVE202423113Exploit() + target = definitions.Target("tcp", "192.168.1.1", 443) + + accept = exploit_instance.accept(target) + vulnerabilities = exploit_instance.check(target) + + assert accept is False + assert len(vulnerabilities) == 0 From 08ed59823a98c54939fcdf96157c1d677b3ec445 Mon Sep 17 00:00:00 2001 From: nmasdoufi-ol Date: Fri, 11 Oct 2024 17:39:43 +0100 Subject: [PATCH 2/2] Resolve Comments. --- agent/exploits/cve_2024_23113.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/exploits/cve_2024_23113.py b/agent/exploits/cve_2024_23113.py index 2d40d8a..87f18ad 100644 --- a/agent/exploits/cve_2024_23113.py +++ b/agent/exploits/cve_2024_23113.py @@ -174,7 +174,7 @@ def _get_fortinet_version_tcp(host: str) -> tuple[str, tuple[int, int, int]] | N ) as sock: data = sock.recv(1024).decode("utf-8") match = re.search(r"(\d+\.\d+\.\d+)-.*-FortiOS", data) - if match: + if match is not None: version = tuple(map(int, match.group(1).split("."))) return "FortiOS", version # type: ignore except (socket.error, UnicodeDecodeError):