generated from Ostorlab/template_agent
-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #148 from Ostorlab/feature/cve-2024-11667
Add CVE-2024-11667
- Loading branch information
Showing
2 changed files
with
215 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
"""Agent Asteroid implementation for CVE-2024-11667""" | ||
|
||
import re | ||
import datetime | ||
from urllib import parse as urlparse | ||
|
||
from requests import exceptions as requests_exceptions | ||
|
||
from agent import definitions | ||
from agent import exploits_registry | ||
from agent.exploits import webexploit | ||
|
||
VULNERABILITY_TITLE = ( | ||
"Zyxel ATP/USG FLEX/USG FLEX 50(W)/USG20(W)-VPN UP TO 5.38 URL PATH TRAVERSAL" | ||
) | ||
VULNERABILITY_REFERENCE = "CVE-2024-11667" | ||
VULNERABILITY_DESCRIPTION = ( | ||
"A directory traversal vulnerability in the web management interface of " | ||
"Zyxel ATP series firmware versions V5.00 through V5.38, USG FLEX series " | ||
"firmware versions V5.00 through V5.38, USG FLEX 50(W) series firmware " | ||
"versions V5.10 through V5.38, and USG20(W)-VPN series firmware versions " | ||
"V5.10 through V5.38 could allow an attacker to download or upload files via a crafted URL." | ||
) | ||
RISK_RATING = "CRITICAL" | ||
DEFAULT_TIMEOUT = datetime.timedelta(seconds=90) | ||
|
||
VERSION_THRESHOLDS = { | ||
"USG FLEX ATP": 210507, | ||
"USG 20W": 210928, | ||
"USG FLEX 50": 210928, | ||
"USG FLEX 50W": 210928, | ||
"USG FLEX 100": 210513, | ||
"USG FLEX 200": 210513, | ||
"USG FLEX 500": 210513, | ||
"USG FLEX 700": 210513, | ||
} | ||
|
||
MAX_VULNERABLE_VERSION = 240329 | ||
FAVICON_VERSION_PATTERN = re.compile(r"/favicon.ico\?v=(\d+)") | ||
|
||
|
||
@exploits_registry.register | ||
class CVE202411667Exploit(webexploit.WebExploit): | ||
accept_request = definitions.Request(method="GET", path="/") | ||
check_request = definitions.Request(method="GET", path="/") | ||
# Clear without const | ||
accept_pattern = [ | ||
re.compile(r"USG FLEX (50|100|200|500|700|50W)</title>"), | ||
re.compile(r"USG FLEX ATP (700|100|200)</title>"), | ||
re.compile(r"USG 20W</title>"), | ||
] | ||
|
||
metadata = definitions.VulnerabilityMetadata( | ||
title=VULNERABILITY_TITLE, | ||
description=VULNERABILITY_DESCRIPTION, | ||
reference=VULNERABILITY_REFERENCE, | ||
risk_rating=RISK_RATING, | ||
) | ||
|
||
def check(self, target: definitions.Target) -> list[definitions.Vulnerability]: | ||
"""Check for vulnerability in the target.""" | ||
vulnerabilities: list[definitions.Vulnerability] = [] | ||
target_endpoint = urlparse.urljoin(target.origin, self.check_request.path) | ||
|
||
try: | ||
response = self.session.get( | ||
target_endpoint, timeout=DEFAULT_TIMEOUT.seconds, verify=False | ||
) | ||
response.raise_for_status() | ||
except requests_exceptions.RequestException: | ||
return vulnerabilities | ||
|
||
# Determine the minimum threshold for a detected device type | ||
min_vulnerable_version = None | ||
for device_type, threshold in VERSION_THRESHOLDS.items(): | ||
if device_type in response.text: | ||
min_vulnerable_version = threshold | ||
break | ||
|
||
if min_vulnerable_version is None: | ||
return vulnerabilities | ||
|
||
# Extract and validate the favicon version | ||
favicon_match = FAVICON_VERSION_PATTERN.search(response.text) | ||
if favicon_match is None: | ||
return vulnerabilities | ||
|
||
try: | ||
extracted_version = int( | ||
favicon_match.group(1)[:6] | ||
) # Keep only the first 6 digits | ||
except ValueError: | ||
return vulnerabilities | ||
|
||
# Check if the version is within the vulnerable range | ||
if min_vulnerable_version <= extracted_version <= MAX_VULNERABLE_VERSION: | ||
vulnerability = self._create_vulnerability(target) | ||
vulnerabilities.append(vulnerability) | ||
|
||
return vulnerabilities |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
"""Unit tests for Agent Asteroid: CVE-2024-11667""" | ||
|
||
import requests | ||
import requests_mock as req_mock | ||
|
||
from agent import definitions | ||
from agent.exploits import cve_2024_11667 | ||
|
||
|
||
def create_mock_response(device_type: str, version: str) -> str: | ||
"""Create a mock HTML response with the given device type and version.""" | ||
return f""" | ||
<!DOCTYPE html> | ||
<html> | ||
<head> | ||
<title>{device_type}</title> | ||
<link rel="icon" href="/favicon.ico?v={version}" type="image/x-icon"> | ||
</head> | ||
<body> | ||
<!-- Page content --> | ||
</body> | ||
</html> | ||
""" | ||
|
||
|
||
def testCVE202411667_whenVulnerable_reportFinding( | ||
requests_mock: req_mock.mocker.Mocker, | ||
) -> None: | ||
"""CVE-2024-11667 unit test: case when target is vulnerable.""" | ||
device_type = "USG FLEX 100" | ||
vulnerable_version = "210513" # A version within the vulnerable range | ||
mock_response = create_mock_response(device_type, vulnerable_version) | ||
requests_mock.get( | ||
"http://localhost:80/", | ||
text=mock_response, | ||
status_code=200, | ||
) | ||
|
||
exploit_instance = cve_2024_11667.CVE202411667Exploit() | ||
target = definitions.Target("http", "localhost", 80) | ||
|
||
accept = exploit_instance.accept(target) | ||
vulnerabilities = exploit_instance.check(target) | ||
|
||
assert accept is True | ||
assert len(vulnerabilities) > 0 | ||
vulnerability = vulnerabilities[0] | ||
assert vulnerability.entry.title == ( | ||
"Zyxel ATP/USG FLEX/USG FLEX 50(W)/USG20(W)-VPN UP TO 5.38 URL PATH TRAVERSAL" | ||
) | ||
assert ( | ||
vulnerability.technical_detail | ||
== "http://localhost:80 is vulnerable to CVE-2024-11667, Zyxel ATP/USG FLEX/USG FLEX 50(W)/USG20(W)-VPN UP TO 5.38 URL PATH TRAVERSAL" | ||
) | ||
|
||
|
||
def testCVE202411667_whenSafe_reportNothing( | ||
requests_mock: req_mock.mocker.Mocker, | ||
) -> None: | ||
"""CVE-2024-11667 unit test: case when target is safe.""" | ||
device_type = "USG FLEX 100" | ||
safe_version = "241116" # A version outside the vulnerable range | ||
mock_response = create_mock_response(device_type, safe_version) | ||
requests_mock.get( | ||
"http://localhost:80/", | ||
text=mock_response, | ||
status_code=200, | ||
) | ||
|
||
exploit_instance = cve_2024_11667.CVE202411667Exploit() | ||
target = definitions.Target("http", "localhost", 80) | ||
|
||
accept = exploit_instance.accept(target) | ||
vulnerabilities = exploit_instance.check(target) | ||
|
||
assert accept is True | ||
assert len(vulnerabilities) == 0 | ||
|
||
|
||
def testCVE202411667_whenConnectionError_reportNothing( | ||
requests_mock: req_mock.mocker.Mocker, | ||
) -> None: | ||
"""CVE-2024-11667 unit test: case when a connection error occurs.""" | ||
requests_mock.get( | ||
"http://localhost:80/", | ||
exc=requests.exceptions.ConnectionError, | ||
) | ||
|
||
exploit_instance = cve_2024_11667.CVE202411667Exploit() | ||
target = definitions.Target("http", "localhost", 80) | ||
|
||
accept = exploit_instance.accept(target) | ||
assert accept is False | ||
|
||
vulnerabilities = exploit_instance.check(target) | ||
assert len(vulnerabilities) == 0 | ||
|
||
|
||
def testCVE202411667_whenTimeout_reportNothing( | ||
requests_mock: req_mock.mocker.Mocker, | ||
) -> None: | ||
"""CVE-2024-11667 unit test: case when a timeout occurs.""" | ||
requests_mock.get( | ||
"http://localhost:80/", | ||
exc=requests.exceptions.Timeout, | ||
) | ||
|
||
exploit_instance = cve_2024_11667.CVE202411667Exploit() | ||
target = definitions.Target("http", "localhost", 80) | ||
|
||
accept = exploit_instance.accept(target) | ||
assert accept is False | ||
|
||
vulnerabilities = exploit_instance.check(target) | ||
assert len(vulnerabilities) == 0 |