Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rely only on explicit status #10

Merged
merged 22 commits into from
Nov 20, 2023
66 changes: 20 additions & 46 deletions agent/metasploit_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,6 @@
logger = logging.getLogger(__name__)

MODULE_TIMEOUT = 300
WORKSPACE_ARG = "WORKSPACE => Ostorlab"
MSF_SAFE_INDICATOR = "[-]"
MSF_UNKNOWN_INDICATOR = "Cannot reliably check exploitability"
MSF_DEFAULT_AUXILIARY_MESSAGE = """
[*] Scanned 1 of 1 hosts (100% complete)
[*] Auxiliary module execution completed
"""


class Error(Exception):
Expand Down Expand Up @@ -71,7 +64,6 @@ def process(self, message: m.Message) -> None:

"""
client = utils.connect_msfrpc()
cid = client.consoles.console().cid
for entry in self._config:
module = entry.get("module")
options = entry.get("options") or []
Expand All @@ -92,52 +84,34 @@ def process(self, message: m.Message) -> None:
"Failed to set arguments for %s", selected_module.modulename
)
continue
if module_instance.moduletype == "exploit":
job = module_instance.check_exploit()
elif module_instance.moduletype == "auxiliary":
job = module_instance.execute()
else:
job = module_instance.check_exploit()
if job.get("error") is True:
logger.error(
"%s module type is not implemented", module_instance.moduletype
"Metasploit Error: %s", job.get("error_string", "Unknown Error")
)
continue
job_uuid = job.get("uuid")
if job_uuid is not None:
results = self._get_job_results(client, job_uuid)
else:
results = None

if isinstance(results, dict) and results.get("code") in ["safe", "unknown"]:
job_uuid = job.get("uuid")
if job_uuid is None:
continue

target = (
module_instance.runoptions.get("VHOST")
or module_instance.runoptions.get("RHOSTS")
or module_instance.runoptions.get("DOMAIN")
)
technical_detail = f"Using `{module_instance.moduletype}` module `{module_instance.modulename}`\n"
technical_detail += f"Target: {target}\n"
results = self._get_job_results(client, job_uuid)

if isinstance(results, dict) and results.get("code") == "vulnerable":
technical_detail += f'Message: \n```{results["message"]}```'
else:
console_output = client.consoles.console(cid).run_module_with_output(
module_instance
if isinstance(results, dict) and results.get("code") in [
"vulnerable",
"appears",
]:
target = (
BlueSquare1 marked this conversation as resolved.
Show resolved Hide resolved
module_instance.runoptions.get("VHOST")
or module_instance.runoptions.get("RHOSTS")
or module_instance.runoptions.get("DOMAIN")
)
try:
module_output = console_output.split(WORKSPACE_ARG)[1]
except IndexError:
logger.error("Unexpected console output:\n %s", console_output)
continue
if MSF_SAFE_INDICATOR in module_output:
continue
if MSF_UNKNOWN_INDICATOR in module_output:
continue
if MSF_DEFAULT_AUXILIARY_MESSAGE == module_output:
continue
technical_detail += f"Message: \n```{module_output}```"

self._emit_results(module_instance, technical_detail)
technical_detail = f"Using `{module_instance.moduletype}` module `{module_instance.modulename}`\n"
technical_detail += f"Target: {target}\n"
technical_detail += f'Message: \n```{results["message"]}```'

self._emit_results(module_instance, technical_detail)

client.logout()

def _get_job_results(
Expand Down
175 changes: 52 additions & 123 deletions tests/metasploit_agent_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
[["exploit/windows/http/exchange_proxyshell_rce", []]],
indirect=True,
)
def testExploitCheck_whenSafe_returnNone(
def testExploit_whenSafe_returnNone(
agent_instance: msf_agent.MetasploitAgent,
agent_mock: list[message.Message],
scan_message: message.Message,
Expand All @@ -38,47 +38,10 @@ def testAuxiliaryExecute_whenSafe_returnNone(
assert len(agent_mock) == 0


@pytest.mark.parametrize(
"agent_instance", [["auxiliary/scanner/http/joomla_version", []]], indirect=True
)
def testAuxiliaryExecute_whenVulnerable_returnFindings(
agent_instance: msf_agent.MetasploitAgent,
mocker: plugin.MockerFixture,
agent_mock: list[message.Message],
scan_message: message.Message,
) -> None:
"""Unit test for agent metasploit auxiliary execute, case when target is vulnerable"""
with open(
"tests/msf_output/auxiliary.txt", encoding="utf-8"
) as auxiliary_output_file:
mocker.patch(
"pymetasploit3.msfrpc.MsfConsole.run_module_with_output",
return_value=auxiliary_output_file.read(),
)

agent_instance.process(scan_message)

assert len(agent_mock) == 1
vulnerability_finding = agent_mock[0].data
assert vulnerability_finding["title"] == "Joomla Version Scanner"
assert vulnerability_finding["risk_rating"] == "HIGH"
assert vulnerability_finding["technical_detail"] == (
"Using `auxiliary` module `scanner/http/joomla_version`\n"
"Target: www.google.com\n"
"Message: \n"
"```\n"
"[*] Server: Apache\n"
"[+] Joomla version: 1.0\n"
"[*] Scanned 1 of 1 hosts (100% complete)\n"
"[*] Auxiliary module execution completed\n"
"```"
)


@pytest.mark.parametrize(
"agent_instance", [["exploit/unix/misc/distcc_exec", []]], indirect=True
)
def testExploitCheck_whenVulnerable_returnFindings(
def testExploit_whenVulnerable_returnFindings(
agent_instance: msf_agent.MetasploitAgent,
mocker: plugin.MockerFixture,
agent_mock: list[message.Message],
Expand Down Expand Up @@ -117,130 +80,96 @@ def testExploitCheck_whenVulnerable_returnFindings(


@pytest.mark.parametrize(
"agent_instance", [["exploit/unix/misc/distcc_exec", []]], indirect=True
"agent_instance",
[["exploit/windows/http/ws_ftp_rce_cve_2023_40044", []]],
indirect=True,
)
def testExploitCheck_whenVulnerable_returnConsoleOutput(
def testExploit_whenCannotCheck_returnNone(
agent_instance: msf_agent.MetasploitAgent,
mocker: plugin.MockerFixture,
agent_mock: list[message.Message],
metasploitable_scan_message: message.Message,
scan_message: message.Message,
) -> None:
"""Unit test for agent metasploit exploit check, case when target is vulnerable (console output)"""
mocker.patch(
"pymetasploit3.msfrpc.MsfModule.check_exploit",
return_value={"job_id": 10, "uuid": "CzwatViyCW2tJABg0FiYfHeC"},
)
mocker.patch(
"pymetasploit3.msfrpc.JobManager.info_by_uuid",
return_value={"status": "completed", "result": None},
)
with open("tests/msf_output/exploit.txt", encoding="utf-8") as exploit_output_file:
mocker.patch(
"pymetasploit3.msfrpc.MsfConsole.run_module_with_output",
return_value=exploit_output_file.read(),
)

agent_instance.process(metasploitable_scan_message)
"""Unit test for agent metasploit exploit check, case when cannot check"""
agent_instance.process(scan_message)

assert len(agent_mock) == 1
vulnerability_finding = agent_mock[0].data
assert vulnerability_finding["title"] == "DistCC Daemon Command Execution"
assert vulnerability_finding["risk_rating"] == "HIGH"
assert vulnerability_finding["technical_detail"] == (
"Using `exploit` module `unix/misc/distcc_exec`\n"
"Target: 192.168.1.17\n"
"Message: \n"
"```\n"
"[+] 192.168.1.17:3632 - The target is vulnerable.\n"
"```"
)
assert len(agent_mock) == 0


@pytest.mark.parametrize(
"agent_instance",
[["auxiliary/scanner/portscan/tcp", '[{"name": "PORTS", "value": "443,80"}]']],
[["auxiliary/scanner/ike/cisco_ike_benigncertain", []]],
indirect=True,
)
def testAuxiliaryPortScan_whenResultsFound_returnOpenPorts(
def testExploit_whenDefaultAuxiliaryMessage_returnNone(
agent_instance: msf_agent.MetasploitAgent,
agent_mock: list[message.Message],
scan_message: message.Message,
) -> None:
"""Unit test for agent metasploit auxiliary run, case when results are found"""
"""Unit test for agent metasploit exploit check,
case when console returns default auxiliary message"""
agent_instance.process(scan_message)

assert len(agent_mock) == 1
vulnerability_finding = agent_mock[0].data
assert vulnerability_finding["title"] == "TCP Port Scanner"
assert vulnerability_finding["risk_rating"] == "HIGH"
assert "443 - TCP OPEN" in vulnerability_finding["technical_detail"]
assert "80 - TCP OPEN" in vulnerability_finding["technical_detail"]
assert (
"[*] Auxiliary module execution completed"
in vulnerability_finding["technical_detail"]
)


def testAgent_whenMultipleModulesUsed_returnFindings(
agent_multi_instance: msf_agent.MetasploitAgent,
mocker: plugin.MockerFixture,
agent_mock: list[message.Message],
scan_message: message.Message,
) -> None:
"""Unit test for agent metasploit auxiliary run, case when results are found"""
agent_multi_instance.process(scan_message)

assert len(agent_mock) == 2
assert any("TCP Port Scanner" in finding.data["title"] for finding in agent_mock)
assert any(finding.data["risk_rating"] == "HIGH" for finding in agent_mock)
assert any(
"443 - TCP OPEN" in finding.data["technical_detail"] for finding in agent_mock
)
assert any(
"80 - TCP OPEN" in finding.data["technical_detail"] for finding in agent_mock
)
assert any(
"[*] Auxiliary module execution completed" in finding.data["technical_detail"]
for finding in agent_mock
)
assert any(
"Archive.org Stored Domain URLs" in finding.data["title"]
for finding in agent_mock
)
assert (
"http://ostorlab.co/robots.txt" in finding.data["technical_detail"]
for finding in agent_mock
)
assert len(agent_mock) == 0


@pytest.mark.parametrize(
"agent_instance",
[["exploit/windows/http/ws_ftp_rce_cve_2023_40044", []]],
[["auxiliary/scanner/ssl/openssl_heartbleed", []]],
indirect=True,
)
def testExploitCheck_whenCannotCheck_returnNone(
def testAuxiliary_whenSafe_returnNone(
agent_instance: msf_agent.MetasploitAgent,
agent_mock: list[message.Message],
scan_message: message.Message,
) -> None:
"""Unit test for agent metasploit exploit check, case when cannot check"""
"""Unit test for agent metasploit exploit check,
case when console returns default auxiliary message"""
agent_instance.process(scan_message)

assert len(agent_mock) == 0


@pytest.mark.parametrize(
"agent_instance",
[["auxiliary/scanner/ike/cisco_ike_benigncertain", []]],
[["auxiliary/scanner/ssl/openssl_heartbleed", []]],
indirect=True,
)
def testExploitCheck_whenDefaultAuxiliaryMessage_returnNone(
def testAuxiliary_whenAppearsVulnerable_returnFindings(
agent_instance: msf_agent.MetasploitAgent,
agent_mock: list[message.Message],
scan_message: message.Message,
mocker: plugin.MockerFixture,
) -> None:
"""Unit test for agent metasploit exploit check,
"""Unit test for agent metasploit auxiliary check,
case when console returns default auxiliary message"""
mocker.patch(
"pymetasploit3.msfrpc.MsfModule.check_exploit",
return_value={"job_id": 10, "uuid": "CzwatViyCW2tJABg0FiYfHeC"},
)
mocker.patch(
"pymetasploit3.msfrpc.JobManager.info_by_uuid",
return_value={
"status": "completed",
"result": {
"code": "appears",
"message": "The target appears to be vulnerable.",
"reason": None,
"details": {},
},
},
)

agent_instance.process(scan_message)

assert len(agent_mock) == 0
assert len(agent_mock) == 1
vulnerability_finding = agent_mock[0].data
assert (
vulnerability_finding["title"]
== "OpenSSL Heartbeat (Heartbleed) Information Leak"
)
assert vulnerability_finding["risk_rating"] == "HIGH"
assert "scanner/ssl/openssl_heartbleed" in vulnerability_finding["technical_detail"]
assert (
"The target appears to be vulnerable."
in vulnerability_finding["technical_detail"]
)
Loading