generated from Ostorlab/template_agent
-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #122 from Ostorlab/feature/cve_2024_23113
Add CVE-2024-23113
- Loading branch information
Showing
2 changed files
with
506 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,263 @@ | ||
"""Agent Asteroid implementation for CVE-2024-23113""" | ||
|
||
import datetime | ||
import logging | ||
import re | ||
import socket | ||
|
||
import requests | ||
from ostorlab.agent.kb import kb | ||
from ostorlab.agent.mixins import agent_report_vulnerability_mixin | ||
from pysnmp import hlapi # type: ignore | ||
|
||
from agent import definitions | ||
from agent import exploits_registry | ||
|
||
VULNERABILITY_TITLE = "Critical Fortinet Format String Vulnerability" | ||
VULNERABILITY_REFERENCE = "CVE-2024-23113" | ||
VULNERABILITY_DESCRIPTION = """A use of externally-controlled format string in Fortinet FortiOS versions 7.4.0 through 7.4.2, 7.2.0 through | ||
7.2.6, 7.0.0 through 7.0.13, FortiProxy versions 7.4.0 through 7.4.2, 7.2.0 through 7.2.8, 7.0.0 through 7.0.14, | ||
FortiPAM versions 1.2.0, 1.1.0 through 1.1.2, 1.0.0 through 1.0.3, FortiSwitchManager versions 7.2.0 through 7.2.3, | ||
7.0.0 through 7.0.3 allows attacker to execute unauthorized code or commands via specially crafted packets.""" | ||
|
||
DEFAULT_TIMEOUT = TIMEOUT = datetime.timedelta(seconds=10) | ||
|
||
VULNERABLE_VERSIONS = { | ||
"FortiOS": [ | ||
(7, 0, 0), | ||
(7, 0, 1), | ||
(7, 0, 2), | ||
(7, 0, 3), | ||
(7, 0, 4), | ||
(7, 0, 5), | ||
(7, 0, 6), | ||
(7, 0, 7), | ||
(7, 0, 8), | ||
(7, 0, 9), | ||
(7, 0, 10), | ||
(7, 0, 11), | ||
(7, 0, 12), | ||
(7, 0, 13), | ||
(7, 2, 0), | ||
(7, 2, 1), | ||
(7, 2, 2), | ||
(7, 2, 3), | ||
(7, 2, 4), | ||
(7, 2, 5), | ||
(7, 2, 6), | ||
(7, 4, 0), | ||
(7, 4, 1), | ||
(7, 4, 2), | ||
], | ||
"FortiProxy": [ | ||
(7, 0, 0), | ||
(7, 0, 1), | ||
(7, 0, 2), | ||
(7, 0, 3), | ||
(7, 0, 4), | ||
(7, 0, 5), | ||
(7, 0, 6), | ||
(7, 0, 7), | ||
(7, 0, 8), | ||
(7, 0, 9), | ||
(7, 0, 10), | ||
(7, 0, 11), | ||
(7, 0, 12), | ||
(7, 0, 13), | ||
(7, 0, 14), | ||
(7, 2, 0), | ||
(7, 2, 1), | ||
(7, 2, 2), | ||
(7, 2, 3), | ||
(7, 2, 4), | ||
(7, 2, 5), | ||
(7, 2, 6), | ||
(7, 2, 7), | ||
(7, 2, 8), | ||
(7, 4, 0), | ||
(7, 4, 1), | ||
(7, 4, 2), | ||
], | ||
"FortiPAM": [ | ||
(1, 0, 0), | ||
(1, 0, 1), | ||
(1, 0, 2), | ||
(1, 0, 3), | ||
(1, 1, 0), | ||
(1, 1, 1), | ||
(1, 1, 2), | ||
(1, 2, 0), | ||
], | ||
"FortiSwitchManager": [ | ||
(7, 0, 0), | ||
(7, 0, 1), | ||
(7, 0, 2), | ||
(7, 0, 3), | ||
(7, 2, 0), | ||
(7, 2, 1), | ||
(7, 2, 2), | ||
(7, 2, 3), | ||
], | ||
} | ||
|
||
|
||
def _get_fortinet_version_snmp(host: str) -> tuple[str, tuple[int, int, int]] | None: | ||
version = None | ||
product = None | ||
iterator = hlapi.getCmd( | ||
hlapi.SnmpEngine(), | ||
hlapi.CommunityData("public", mpModel=1), | ||
hlapi.UdpTransportTarget((host, 161), timeout=DEFAULT_TIMEOUT.seconds), | ||
hlapi.ContextData(), | ||
hlapi.ObjectType(hlapi.ObjectIdentity("1.3.6.1.2.1.47.1.1.1.1.10.1")), | ||
) | ||
error_indication, error_status, error_index, var_binds = next(iterator) | ||
|
||
if error_indication is not None: | ||
logging.error("SNMP error_indication:: %s", error_indication) | ||
return None | ||
elif error_status is not None: | ||
logging.error("SNMP error_status: %s at %s", error_status, error_index) | ||
return None | ||
else: | ||
for var_bind in var_binds: | ||
description = var_bind[1].prettyPrint() | ||
logging.info("Fortinet description found: %s", description) | ||
match = re.search( | ||
r"(FortiOS|FortiProxy|FortiPAM|FortiSwitchManager).*v(\d+)\.(\d+)\.(\d+)", | ||
description, | ||
) | ||
if match is not None: | ||
product = match.group(1) | ||
version_parts = list(map(int, match.groups()[1:])) | ||
version = tuple(version_parts + [0] * (3 - len(version_parts))) | ||
logging.info("%s version extracted: %s", product, version) | ||
|
||
if product is not None and isinstance(version, tuple) and len(version) == 3: | ||
return product, version | ||
return None | ||
|
||
|
||
def _get_fortinet_version_http(host: str) -> tuple[str, tuple[int, int, int]] | None: | ||
try: | ||
response = requests.get( | ||
f"https://{host}", verify=False, timeout=DEFAULT_TIMEOUT.seconds | ||
) | ||
content = response.text | ||
|
||
# FortiSwitchManager detection | ||
match = re.search( | ||
r"<strong>FortiSwitchManager (\d+\.\d+\.\d+)</strong>", content | ||
) | ||
if match is not None: | ||
version = tuple(int(v) for v in match.group(1).split(".")) | ||
return "FortiSwitchManager", version # type: ignore | ||
|
||
# FortiPAM detection | ||
match = re.search(r"<strong>FortiPAM (\d+\.\d+)", content) | ||
if match is not None: | ||
version_parts = [int(v) for v in match.group(1).split(".")] | ||
# Ensure we always have 3 parts in the version tuple | ||
version = tuple(version_parts + [0] * (3 - len(version_parts))) | ||
return "FortiPAM", version # type: ignore | ||
|
||
except requests.RequestException: | ||
logging.error("Failed to connect to %s via HTTPS", host) | ||
|
||
return None | ||
|
||
|
||
def _get_fortinet_version_tcp(host: str) -> tuple[str, tuple[int, int, int]] | None: | ||
try: | ||
with socket.create_connection( | ||
(host, 53), timeout=DEFAULT_TIMEOUT.seconds | ||
) as sock: | ||
data = sock.recv(1024).decode("utf-8") | ||
match = re.search(r"(\d+\.\d+\.\d+)-.*-FortiOS", data) | ||
if match is not None: | ||
version = tuple(map(int, match.group(1).split("."))) | ||
return "FortiOS", version # type: ignore | ||
except (socket.error, UnicodeDecodeError): | ||
logging.error(f"Failed to connect to {host} on port 53") | ||
|
||
return None | ||
|
||
|
||
def _get_fortinet_version(host: str) -> tuple[str, tuple[int, int, int]] | None: | ||
# Try HTTP First | ||
result = _get_fortinet_version_http(host) | ||
if result is not None: | ||
return result | ||
|
||
# Try TCP if HTTP Fails | ||
result = _get_fortinet_version_tcp(host) | ||
if result is not None: | ||
return result | ||
|
||
# Try SNMP if HTTP Fails | ||
result = _get_fortinet_version_snmp(host) | ||
if result is not None: | ||
return result | ||
|
||
return None | ||
|
||
|
||
def _create_vulnerability( | ||
target: definitions.Target, product: str, version: tuple[int, int, int] | ||
) -> definitions.Vulnerability: | ||
entry = kb.Entry( | ||
title=VULNERABILITY_TITLE, | ||
risk_rating="HIGH", | ||
short_description=VULNERABILITY_DESCRIPTION, | ||
description=f"{VULNERABILITY_DESCRIPTION} Detected {product} version: {'.'.join(map(str, version))}", | ||
references={ | ||
"fortiguard.com": "https://www.fortiguard.com/psirt/FG-IR-24-029", | ||
"nvd.nist.gov": f"https://nvd.nist.gov/vuln/detail/{VULNERABILITY_REFERENCE}", | ||
}, | ||
recommendation=( | ||
f"- Update {product} to the latest non-vulnerable version. " | ||
"- If immediate updating is not possible, restrict access to trusted IP addresses and monitor for suspicious activities." | ||
), | ||
security_issue=True, | ||
privacy_issue=False, | ||
has_public_exploit=True, | ||
targeted_by_malware=True, | ||
targeted_by_ransomware=False, | ||
targeted_by_nation_state=True, | ||
) | ||
technical_detail = ( | ||
f"{product} device at {target.origin} is running a vulnerable version: {'.'.join(map(str, version))}. " | ||
f"This version is susceptible to CVE-2024-23113. Immediate action is required." | ||
) | ||
vulnerability = definitions.Vulnerability( | ||
entry=entry, | ||
technical_detail=technical_detail, | ||
risk_rating=agent_report_vulnerability_mixin.RiskRating.HIGH, | ||
) | ||
return vulnerability | ||
|
||
|
||
@exploits_registry.register | ||
class CVE202423113Exploit(definitions.Exploit): | ||
""" | ||
CVE-2024-23113: Fortinet Products Format String Vulnerability | ||
""" | ||
|
||
def accept(self, target: definitions.Target) -> bool: | ||
result = _get_fortinet_version(target.host) | ||
if result is not None: | ||
product, version = result | ||
return version in VULNERABLE_VERSIONS.get(product, []) | ||
return False | ||
|
||
def check(self, target: definitions.Target) -> list[definitions.Vulnerability]: | ||
vulnerabilities: list[definitions.Vulnerability] = [] | ||
result = _get_fortinet_version(target.host) | ||
|
||
if result is not None: | ||
product, version = result | ||
if version in VULNERABLE_VERSIONS.get(product, []): | ||
vulnerability = _create_vulnerability(target, product, version) | ||
vulnerabilities.append(vulnerability) | ||
|
||
return vulnerabilities |
Oops, something went wrong.