Skip to content

Commit

Permalink
Merge pull request #132 from Ostorlab/feature/add-detection-for-cve-2…
Browse files Browse the repository at this point in the history
…024-8956

Add Detection for CVE-2024-8956
  • Loading branch information
3asm authored Nov 7, 2024
2 parents c842f73 + 33f56c3 commit d38cc5f
Show file tree
Hide file tree
Showing 2 changed files with 217 additions and 0 deletions.
127 changes: 127 additions & 0 deletions agent/exploits/cve_2024_8956.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""Agent Asteroid implementation for CVE-2024-8956"""

import datetime
import logging
import re

from ostorlab.agent.kb import kb
from ostorlab.agent.mixins import agent_report_vulnerability_mixin
from packaging import version
from requests import exceptions as requests_exceptions

from agent import definitions
from agent import exploits_registry
from agent.exploits import webexploit

VULNERABILITY_TITLE = "ValueHD PTZ Camera Authentication Bypass"
VULNERABILITY_REFERENCE = "CVE-2024-8956"
VULNERABILITY_DESCRIPTION = (
"ValueHD PTZ cameras contain an authentication bypass "
"vulnerability in the param.cgi endpoint."
)
RISK_RATING = "CRITICAL"
DEFAULT_TIMEOUT = datetime.timedelta(seconds=90)
DEVICE_CONF_PATH = "/cgi-bin/param.cgi?get_device_conf"


@exploits_registry.register
class VHDPTZAuthBypassExploit(webexploit.WebExploit):
"""
CVE-2024-8956: ValueHD PTZ Camera Authentication Bypass
"""

accept_request = definitions.Request(method="GET", path=DEVICE_CONF_PATH)
check_requests = definitions.Request(method="GET", path=DEVICE_CONF_PATH)

accept_pattern = [
re.compile(r'versioninfo="SOC v\d+\.\d+\.\d+\s*[-\\s*ARM]?'),
]

version_pattern = re.compile(r'versioninfo="SOC v(\d+\.\d+\.\d+)')

vuln_ranges = [
definitions.VulnRange(
min=None, # No minimum version
max=version.Version("6.3.39"), # Versions up to 6.3.39 are vulnerable
)
]

metadata = definitions.VulnerabilityMetadata(
title=VULNERABILITY_TITLE,
description=VULNERABILITY_DESCRIPTION,
reference=VULNERABILITY_REFERENCE,
risk_rating=RISK_RATING,
)

def _create_vulnerability(
self, target: definitions.Target, details: str | None = None
) -> definitions.Vulnerability:
entry = kb.Entry(
title=self.metadata.title,
risk_rating=self.metadata.risk_rating,
short_description=self.metadata.description,
description=self.metadata.description,
references={
"nvd.nist.gov": f"https://nvd.nist.gov/vuln/detail/{self.metadata.reference}",
},
recommendation=(
"- Make sure to install the latest security patches from software vendor \n"
"- Update to the latest software version"
),
security_issue=True,
privacy_issue=False,
has_public_exploit=True,
targeted_by_malware=False,
targeted_by_ransomware=False,
targeted_by_nation_state=False,
)

technical_detail = f"{target.origin} is vulnerable to {self.metadata.reference}, {self.metadata.title}"
if details is not None:
technical_detail += f"\n- {details}"

vulnerability = definitions.Vulnerability(
entry=entry,
technical_detail=technical_detail,
risk_rating=agent_report_vulnerability_mixin.RiskRating[
self.metadata.risk_rating.upper()
],
)
return vulnerability

def check(self, target: definitions.Target) -> list[definitions.Vulnerability]:
"""Rule to detect authentication bypass vulnerability on a target."""
vulnerabilities: list[definitions.Vulnerability] = []

try:
resp = self.session.get(
f"{target.origin}{self.check_requests.path}",
timeout=DEFAULT_TIMEOUT.seconds,
)
if (
self._is_vulnerable_version(resp.text) is True
or resp.status_code == 200
):
vulnerabilities.append(
self._create_vulnerability(
target,
details="param.cgi?get_device_conf is accessible and exposes device configurations",
)
)
except requests_exceptions.RequestException as e:
logging.error("Authentication bypass detection failed: %s", e)

return vulnerabilities

def _is_vulnerable_version(self, response_text: str) -> bool:
"""Check if the target is running a vulnerable version."""
if (matched := self.version_pattern.findall(response_text)) is not None:
for extracted_version in matched:
for r in self.vuln_ranges:
if (
r.min is None or r.min <= version.Version(extracted_version)
) and (
r.max is None or r.max >= version.Version(extracted_version)
):
return True
return False
90 changes: 90 additions & 0 deletions tests/exploits/cve_2024_8956_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""Unit tests for VHD PTZ Camera Authentication Bypass: CVE-2024-8956"""

import requests_mock as req_mock
from requests import exceptions as requests_exceptions

from agent import definitions
from agent.exploits import cve_2024_8956


def testVHDPTZAuthBypass_whenVulnerable_reportFinding(
requests_mock: req_mock.mocker.Mocker,
) -> None:
"""Test case: when target is vulnerable (firmware 6.3.32)."""
requests_mock.get(
"http://localhost:80/cgi-bin/param.cgi?get_device_conf",
text='devname="ptzoptics" devtype="VX630A" versioninfo="SOC v6.3.32 - ARM 6.3.51THI" serial_num="r1j04260027" device_model="F53.HI"',
status_code=200,
)

exploit_instance = cve_2024_8956.VHDPTZAuthBypassExploit()
target = definitions.Target("http", "localhost", 80)

vulnerabilities = exploit_instance.check(target)
vulnerability = vulnerabilities[0]

assert len(vulnerabilities) == 1

assert vulnerability.entry.title == "ValueHD PTZ Camera Authentication Bypass"
assert vulnerability.entry.references == {
"nvd.nist.gov": "https://nvd.nist.gov/vuln/detail/CVE-2024-8956"
}


def testVHDPTZAuthBypass_whenGetSystemConfIsAccessible_reportFinding(
requests_mock: req_mock.mocker.Mocker,
) -> None:
"""Test case: when target is safe (firmware 6.3.40) but Get System Conf Is Accessible."""
requests_mock.get(
"http://localhost:80/cgi-bin/param.cgi?get_device_conf",
text='devname="ptzoptics" devtype="VX630A" versioninfo="SOC v6.3.40 - ARM 6.3.51THI" serial_num="r1j04260027" device_model="F53.HI"',
status_code=200,
)

exploit_instance = cve_2024_8956.VHDPTZAuthBypassExploit()
target = definitions.Target("http", "localhost", 80)

vulnerabilities = exploit_instance.check(target)

assert len(vulnerabilities) == 1


def testVHDPTZAuthBypass_whenAuthRequired_reportNothing(
requests_mock: req_mock.mocker.Mocker,
) -> None:
"""Test case: when authentication is required (not vulnerable)."""
requests_mock.get(
"http://localhost:80/cgi-bin/param.cgi",
status_code=401,
)
requests_mock.get(
"http://localhost:80/cgi-bin/param.cgi?get_system_conf",
status_code=401,
)
exploit_instance = cve_2024_8956.VHDPTZAuthBypassExploit()
target = definitions.Target("http", "localhost", 80)

vulnerabilities = exploit_instance.check(target)

assert len(vulnerabilities) == 0


def testVHDPTZAuthBypass_requestException_handlingErrorLogged(
requests_mock: req_mock.mocker.Mocker,
) -> None:
"""Test case: handle RequestException in auth bypass detection."""
requests_mock.get(
"http://localhost:80/cgi-bin/param.cgi",
exc=requests_exceptions.RequestException("Simulated connection error"),
)
requests_mock.get(
"http://localhost:80/cgi-bin/param.cgi?get_system_conf",
exc=requests_exceptions.RequestException("Simulated connection error"),
)

exploit_instance = cve_2024_8956.VHDPTZAuthBypassExploit()
target = definitions.Target("http", "localhost", 80)

vulnerabilities = exploit_instance.check(target)

assert len(vulnerabilities) == 0

0 comments on commit d38cc5f

Please sign in to comment.