generated from Ostorlab/template_agent
-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
…023-28461 Add Detection for CVE-2023-28461
- Loading branch information
Showing
2 changed files
with
220 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
"""Agent Asteroid implementation for CVE-2023-28461""" | ||
|
||
import datetime | ||
import logging | ||
import re | ||
import ssl | ||
from typing import Any | ||
from urllib import parse as urlparse | ||
|
||
import urllib3 | ||
from packaging import version | ||
|
||
from agent import definitions | ||
from agent import exploits_registry | ||
from agent.exploits import webexploit | ||
|
||
VULNERABILITY_TITLE = ( | ||
"Array Networks Array AG Series and vxAG Remote Code Execution Vulnerability" | ||
) | ||
VULNERABILITY_REFERENCE = "CVE-2023-28461" | ||
VULNERABILITY_DESCRIPTION = """A critical vulnerability in Array Networks Array AG Series and vxAG | ||
SSL VPN gateways allows remote code execution by exploiting an HTTP header with the 'flags' attribute | ||
to browse the filesystem without authentication.""" | ||
RISK_RATING = "CRITICAL" | ||
MAX_VULNERABLE_VERSION = version.parse("9.4.0.481") | ||
|
||
DEFAULT_TIMEOUT = datetime.timedelta(seconds=90) | ||
|
||
|
||
def _create_ssl_context() -> Any: | ||
"""Creates an SSL context that allows insecure connections.""" | ||
ctx = urllib3.util.ssl_.create_urllib3_context() | ||
ctx.load_default_certs() | ||
ctx.options |= 0x4 # ssl.OP_LEGACY_SERVER_CONNECT | ||
ctx.check_hostname = False | ||
ctx.verify_mode = ssl.CERT_NONE | ||
return ctx | ||
|
||
|
||
def _send_ssl_request(url: str) -> Any: | ||
"""Send a GET request using urllib3 with a custom SSL context.""" | ||
ssl_context = _create_ssl_context() | ||
with urllib3.PoolManager(ssl_context=ssl_context) as http: | ||
try: | ||
response = http.request("GET", url, timeout=DEFAULT_TIMEOUT.total_seconds()) | ||
return response.data.decode("utf-8") | ||
except urllib3.exceptions.HTTPError as e: | ||
logging.error("Error during SSL request: %s", e) | ||
return "" | ||
|
||
|
||
@exploits_registry.register | ||
class CVE202328461Exploit(webexploit.WebExploit): | ||
accept_request = definitions.Request( | ||
method="GET", path="/prx/000/http/localhost/login" | ||
) | ||
accept_pattern = [re.compile(r"Rel_AG_(\d+_\d+_\d+_\d+)")] | ||
metadata = definitions.VulnerabilityMetadata( | ||
title=VULNERABILITY_TITLE, | ||
description=VULNERABILITY_DESCRIPTION, | ||
reference=VULNERABILITY_REFERENCE, | ||
risk_rating=RISK_RATING, | ||
) | ||
|
||
def accept(self, target: definitions.Target) -> bool: | ||
"""Checks if the target matches the accept pattern. | ||
Args: | ||
target: Target to verify | ||
Returns: | ||
True if the target appears valid; otherwise False. | ||
""" | ||
target_endpoint = urlparse.urljoin(target.origin, self.accept_request.path) | ||
response_body = _send_ssl_request(target_endpoint) | ||
|
||
for pattern in self.accept_pattern: | ||
if pattern.search(response_body) is not None: | ||
return True | ||
|
||
return False | ||
|
||
def check(self, target: definitions.Target) -> list[definitions.Vulnerability]: | ||
"""Rule to detect specific vulnerability on a specific target. | ||
Args: | ||
target: Target to scan | ||
Returns: | ||
List of identified vulnerabilities. | ||
""" | ||
vulnerabilities: list[definitions.Vulnerability] = [] | ||
target_endpoint = urlparse.urljoin(target.origin, self.accept_request.path) | ||
response_body = _send_ssl_request(target_endpoint) | ||
|
||
for pattern in self.accept_pattern: | ||
match = pattern.search(response_body) | ||
if match is not None: | ||
version_str = match.group(1).replace("_", ".") | ||
target_version = version.parse(version_str) | ||
|
||
if target_version <= MAX_VULNERABLE_VERSION: | ||
vulnerability = self._create_vulnerability(target) | ||
vulnerabilities.append(vulnerability) | ||
break | ||
|
||
return vulnerabilities |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
"""Unit tests for Agent Asteroid: CVE-2023-28461""" | ||
|
||
import requests_mock as req_mock | ||
from agent import definitions | ||
from agent.exploits import cve_2023_28461 | ||
from unittest import mock | ||
|
||
VULNERABLE_RESPONSE = """ | ||
</script> | ||
<script>document.title=_AN_str_title_login;</script> | ||
<title>Login</title> | ||
<link rel="stylesheet" type="text/css" href="/prx/000/http/localh/portal.css?v=Rel_AG_9_4_0_233"> | ||
<meta id="viewport" name="viewport" content="width=device-width,initial-scale=1.0,maximum-scale=1.0,user-scalable=0" /> | ||
<script language="JavaScript" src="/prx/000/http/localh/an_util.js?v=Rel_AG_9_4_0_233" charset="UTF-8"></script> | ||
""" | ||
|
||
SAFE_RESPONSE = """ | ||
</script> | ||
<script>document.title=_AN_str_title_login;</script> | ||
<title>Login</title> | ||
<link rel="stylesheet" type="text/css" href="/prx/000/http/localh/portal.css?v=Rel_AG_9_4_0_482"> | ||
<meta id="viewport" name="viewport" content="width=device-width,initial-scale=1.0,maximum-scale=1.0,user-scalable=0" /> | ||
<script language="JavaScript" src="/prx/000/http/localh/an_util.js?v=Rel_AG_9_4_0_482" charset="UTF-8"></script> | ||
""" | ||
|
||
NO_VERSION_RESPONSE = """ | ||
</script> | ||
<script>document.title=_AN_str_title_login;</script> | ||
<title>Login</title> | ||
<link rel="stylesheet" type="text/css" href="/prx/000/http/localh/portal.css"> | ||
<meta id="viewport" name="viewport" content="width=device-width,initial-scale=1.0,maximum-scale=1.0,user-scalable=0" /> | ||
<script language="JavaScript" src="/prx/000/http/localh/an_util.js" charset="UTF-8"></script> | ||
""" | ||
|
||
|
||
def testCVE202328461_whenVulnerable_reportFinding( | ||
requests_mock: req_mock.mocker.Mocker, | ||
) -> None: | ||
"""CVE-2023-28461 unit test: case when the target is vulnerable.""" | ||
requests_mock.get( | ||
"http://localhost:80/prx/000/http/localhost/login", | ||
text=VULNERABLE_RESPONSE, | ||
status_code=200, | ||
) | ||
|
||
with mock.patch( | ||
"agent.exploits.cve_2023_28461._send_ssl_request", | ||
return_value=VULNERABLE_RESPONSE, | ||
): | ||
exploit_instance = cve_2023_28461.CVE202328461Exploit() | ||
target = definitions.Target("http", "localhost", 80) | ||
|
||
accept = exploit_instance.accept(target) | ||
vulnerabilities = exploit_instance.check(target) | ||
|
||
assert accept is True | ||
assert len(vulnerabilities) > 0 | ||
vulnerability = vulnerabilities[0] | ||
assert ( | ||
vulnerability.entry.title | ||
== "Array Networks Array AG Series and vxAG Remote Code Execution Vulnerability" | ||
) | ||
assert ( | ||
vulnerability.technical_detail | ||
== "http://localhost:80 is vulnerable to CVE-2023-28461, Array Networks Array AG Series and vxAG Remote Code Execution Vulnerability" | ||
) | ||
|
||
|
||
def testCVE202328461_whenSafe_reportNothing( | ||
requests_mock: req_mock.mocker.Mocker, | ||
) -> None: | ||
"""CVE-2023-28461 unit test: case when the target is safe.""" | ||
requests_mock.get( | ||
"http://localhost:80/prx/000/http/localhost/login", | ||
text=SAFE_RESPONSE, | ||
status_code=200, | ||
) | ||
|
||
with mock.patch( | ||
"agent.exploits.cve_2023_28461._send_ssl_request", return_value=SAFE_RESPONSE | ||
): | ||
exploit_instance = cve_2023_28461.CVE202328461Exploit() | ||
target = definitions.Target("http", "localhost", 80) | ||
|
||
accept = exploit_instance.accept(target) | ||
vulnerabilities = exploit_instance.check(target) | ||
|
||
assert accept is True | ||
assert len(vulnerabilities) == 0 | ||
|
||
|
||
def testCVE202328461_whenNoVersionFound_reportNothing( | ||
requests_mock: req_mock.mocker.Mocker, | ||
) -> None: | ||
"""CVE-2023-28461 unit test: case when the server does not include a version.""" | ||
requests_mock.get( | ||
"http://localhost:80/prx/000/http/localhost/login", | ||
text=NO_VERSION_RESPONSE, | ||
status_code=200, | ||
) | ||
|
||
with mock.patch( | ||
"agent.exploits.cve_2023_28461._send_ssl_request", | ||
return_value=NO_VERSION_RESPONSE, | ||
): | ||
exploit_instance = cve_2023_28461.CVE202328461Exploit() | ||
target = definitions.Target("http", "localhost", 80) | ||
|
||
accept = exploit_instance.accept(target) | ||
vulnerabilities = exploit_instance.check(target) | ||
|
||
assert accept is False | ||
assert len(vulnerabilities) == 0 |