Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add detection for CVE-2024-50379 #166

Merged
merged 2 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions agent/exploits/cve_2024_50379.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""Agent Asteroid implementation for CVE-2024-50379"""

import re
import datetime
import requests

from agent import definitions
from agent import exploits_registry
from agent.exploits import webexploit
from packaging import version

DEFAULT_TIMEOUT = datetime.timedelta(seconds=90)

VULNERABILITY_TITLE = "Apache Tomcat Race Condition Remote Code Execution"
VULNERABILITY_REFERENCE = "CVE-2024-50379"
VULNERABILITY_DESCRIPTION = (
"A race condition vulnerability in Apache Tomcat allows remote code execution through concurrent PUT requests "
"with non-standard file extensions and subsequent GET requests. The vulnerability affects versions "
"9.0.0.M1 through 9.0.98, 10.1.0-M1 through 10.1.34, and 11.0.0-M1 through 11.0.2."
)
RISK_RATING = "CRITICAL"

VERSION_PATTERN = re.compile(r"Apache Tomcat/(\d+\.\d+\.\d+)")

# Version ranges for affected versions
VULNERABLE_RANGES = [
(version.parse("9.0.0"), version.parse("9.0.98")),
(version.parse("10.1.0"), version.parse("10.1.34")),
(version.parse("11.0.0"), version.parse("11.0.2")),
]


def _is_version_vulnerable(version_str: str) -> bool:
"""
Check if the detected version falls within any of the vulnerable ranges.

Args:
version_str: The version string to check

Returns:
bool: True if the version is vulnerable, False otherwise
"""
try:
detected_version = version.parse(version_str)
for min_ver, max_ver in VULNERABLE_RANGES:
if min_ver <= detected_version <= max_ver:
return True
return False
except version.InvalidVersion:
return False

Check warning on line 50 in agent/exploits/cve_2024_50379.py

View check run for this annotation

Codecov / codecov/patch

agent/exploits/cve_2024_50379.py#L49-L50

Added lines #L49 - L50 were not covered by tests


@exploits_registry.register
class CVE202450379Exploit(webexploit.WebExploit):
accept_request = definitions.Request(method="GET", path="/")
accept_pattern = [re.compile(r"Apache Tomcat")]

def check(self, target: definitions.Target) -> list[definitions.Vulnerability]:
vulnerabilities: list[definitions.Vulnerability] = []

try:
root_response = requests.get(
target.origin, timeout=DEFAULT_TIMEOUT.seconds, verify=False
)
if not (200 <= root_response.status_code < 600 and root_response.text):
return vulnerabilities

# Extract version information
version_match = VERSION_PATTERN.search(root_response.text)
if version_match is None:
amine3 marked this conversation as resolved.
Show resolved Hide resolved
return vulnerabilities

tomcat_version = version_match.group(1)
if _is_version_vulnerable(tomcat_version) is False:
amine3 marked this conversation as resolved.
Show resolved Hide resolved
return vulnerabilities

# Test for the PUT capability that enables the race condition
test_response = requests.put(
f"{target.origin}/test.Jsp",
data="<!-- test -->",
amine3 marked this conversation as resolved.
Show resolved Hide resolved
timeout=DEFAULT_TIMEOUT.seconds,
verify=False,
)

if (
amine3 marked this conversation as resolved.
Show resolved Hide resolved
test_response.status_code in (201, 204)
or _is_version_vulnerable(tomcat_version) is True
):
vulnerabilities = [self._create_vulnerability(target)]

except requests.RequestException:
return vulnerabilities

return vulnerabilities
nmasdoufi-ol marked this conversation as resolved.
Show resolved Hide resolved

metadata = definitions.VulnerabilityMetadata(
title=VULNERABILITY_TITLE,
description=VULNERABILITY_DESCRIPTION,
reference=VULNERABILITY_REFERENCE,
risk_rating=RISK_RATING,
)
146 changes: 146 additions & 0 deletions tests/exploits/cve_2024_50379_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
"""Unit tests for Agent Asteroid: CVE-2024-50379"""

import requests
import requests_mock as req_mock
from agent import definitions
from agent.exploits import cve_2024_50379


def testCVE202450379_whenVulnerable_reportFinding(
requests_mock: req_mock.Mocker,
) -> None:
"""CVE-2024-50379 unit test: case when target is vulnerable."""
nmasdoufi-ol marked this conversation as resolved.
Show resolved Hide resolved
exploit_instance = cve_2024_50379.CVE202450379Exploit()
nmasdoufi-ol marked this conversation as resolved.
Show resolved Hide resolved

# Mock the initial Tomcat version check
requests_mock.get(
"http://localhost:80/",
text="<h3>Apache Tomcat/9.0.63</h3>",
status_code=200,
)

# Mock the PUT request test
requests_mock.put(
"http://localhost:80/test.Jsp",
status_code=201,
)

target = definitions.Target("http", "localhost", 80)
nmasdoufi-ol marked this conversation as resolved.
Show resolved Hide resolved

accept = exploit_instance.accept(target)
vulnerabilities = exploit_instance.check(target)

assert accept is True
nmasdoufi-ol marked this conversation as resolved.
Show resolved Hide resolved
assert len(vulnerabilities) > 0
vulnerability = vulnerabilities[0]
assert (
vulnerability.entry.title
== "Apache Tomcat Race Condition Remote Code Execution"
)
assert (
vulnerability.technical_detail
== "http://localhost:80 is vulnerable to CVE-2024-50379, Apache Tomcat Race Condition Remote Code Execution"
)


def testCVE202450379_whenSafeVersion_reportNothing(
requests_mock: req_mock.Mocker,
) -> None:
"""CVE-2024-50379 unit test: case when target has safe version."""
exploit_instance = cve_2024_50379.CVE202450379Exploit()

requests_mock.get(
"http://localhost:80/",
text="<h3>Apache Tomcat/9.0.99</h3>",
status_code=200,
)

target = definitions.Target("http", "localhost", 80)

accept = exploit_instance.accept(target)
vulnerabilities = exploit_instance.check(target)

assert accept is True
assert len(vulnerabilities) == 0


def testCVE202450379_whenNoTomcat_reportNothing(
requests_mock: req_mock.Mocker,
) -> None:
"""CVE-2024-50379 unit test: case when target is not Tomcat."""
requests_mock.get(
"http://localhost:80/",
text="<html><body>Not a Tomcat server</body></html>",
status_code=200,
)

exploit_instance = cve_2024_50379.CVE202450379Exploit()
target = definitions.Target("http", "localhost", 80)

accept = exploit_instance.accept(target)

assert accept is False


def testCVE202450379_whenPutNotAllowed_reportNothing(
requests_mock: req_mock.Mocker,
) -> None:
"""CVE-2024-50379 unit test: case when PUT requests are not allowed."""
exploit_instance = cve_2024_50379.CVE202450379Exploit()

requests_mock.get(
"http://localhost:80/",
status_code=200,
)

requests_mock.put(
"http://localhost:80/test.Jsp",
status_code=403,
)

target = definitions.Target("http", "localhost", 80)

vulnerabilities = exploit_instance.check(target)

assert len(vulnerabilities) == 0


def testCVE202450379_whenConnectionError_reportNothing(
requests_mock: req_mock.Mocker,
) -> None:
"""CVE-2024-50379 unit test: case when connection fails."""
exploit_instance = cve_2024_50379.CVE202450379Exploit()

requests_mock.get(
"http://localhost:80/",
exc=requests.exceptions.ConnectionError,
)

target = definitions.Target("http", "localhost", 80)

accept = exploit_instance.accept(target)
vulnerabilities = exploit_instance.check(target)

assert accept is False
assert len(vulnerabilities) == 0


def testCVE202450379_whenInvalidVersion_reportNothing(
requests_mock: req_mock.Mocker,
) -> None:
"""CVE-2024-50379 unit test: case when version string is invalid."""
exploit_instance = cve_2024_50379.CVE202450379Exploit()

requests_mock.get(
"http://localhost:80/",
text="<h3>Apache Tomcat/Invalid.Version</h3>",
status_code=200,
)

target = definitions.Target("http", "localhost", 80)

accept = exploit_instance.accept(target)
vulnerabilities = exploit_instance.check(target)

assert accept is True
assert len(vulnerabilities) == 0
Loading