Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/log the correct value #14

Merged
merged 5 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions agent/metasploit_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,23 @@ def process(self, message: m.Message) -> None:
logger.info("Selected metasploit module: %s", selected_module.modulename)
targets = utils.prepare_targets(message)
for target in targets:
rhost = target.host
vhost = target.host
BlueSquare1 marked this conversation as resolved.
Show resolved Hide resolved
rport = target.port
is_ssl = target.scheme == "https"
try:
rhost = socket.gethostbyname(vhost)
except socket.gaierror:
logger.warning("The specified target %s is not valid", vhost)
continue
try:
module_instance = self._set_module_args(
selected_module, rhost, rport, is_ssl, options
selected_module, vhost, rhost, rport, is_ssl, options
)
except ValueError:
logger.error(
"Failed to set arguments for %s", selected_module.modulename
except ValueError as e:
logger.warning(
"Failed to set arguments for %s from %s",
selected_module.modulename,
e,
)
continue
job = module_instance.check_exploit()
Expand All @@ -114,7 +121,7 @@ def process(self, message: m.Message) -> None:
and results.get("code") in VULNERABLE_STATUSES
):
technical_detail = f"Using `{module_instance.moduletype}` module `{module_instance.modulename}`\n"
technical_detail += f"Target: {rhost}:{rport}\n"
technical_detail += f"Target: {vhost}:{rport}\n"
technical_detail += f'Message: \n```{results["message"]}```'

self._emit_results(module_instance, technical_detail)
Expand Down Expand Up @@ -176,8 +183,7 @@ def _get_job_results(
results = job_result["result"]
break
if status == "errored":
logger.error("Module error: %s", job_result["error"])
break
logger.warning("Module Error: %s", job_result["error"])
if time.time() - init_timestamp > MODULE_TIMEOUT:
logger.error("Metasploit job %s timed out", job_uuid)
break
Expand Down Expand Up @@ -220,14 +226,11 @@ def _set_module_args(
self,
selected_module: msfrpc.MsfModule,
vhost: str,
rhost: str,
rport: int,
is_ssl: bool,
options: list[dict[str, str]],
) -> msfrpc.MsfModule:
try:
rhost = socket.gethostbyname(vhost)
except socket.gaierror as exc:
raise ValueError("The specified target is not valid") from exc
if "RHOSTS" in selected_module.required:
selected_module["RHOSTS"] = rhost
elif "DOMAIN" in selected_module.required:
Expand Down
8 changes: 8 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,11 @@ def metasploitable_scan_message() -> message.Message:
selector = "v3.asset.ip.v4"
msg_data = {"host": "192.168.1.17", "mask": "32", "version": 4}
return message.Message.from_data(selector, data=msg_data)


@pytest.fixture()
def scan_message_host_not_exist() -> message.Message:
"""Creates a message of type v3.asset.domain_name.service to be used by the agent for testing purposes."""
selector = "v3.asset.domain_name.service"
msg_data = {"schema": "https", "name": "sa.com", "port": 443}
return message.Message.from_data(selector, data=msg_data)
19 changes: 19 additions & 0 deletions tests/metasploit_agent_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,22 @@ def testMetasploitAgent_whenUnknownTarget_shouldNotBeProcessed(
agent_instance.process(msg)

connect_msfrpc_mock.assert_not_called()


@pytest.mark.parametrize(
"agent_instance",
[["exploit/windows/http/exchange_proxyshell_rce", []]],
indirect=True,
)
def testExploit_whenHostNotExist_returnCorrectMessage(
agent_instance: msf_agent.MetasploitAgent,
agent_mock: list[message.Message],
agent_persist_mock: dict[str | bytes, str | bytes],
scan_message_host_not_exist: message.Message,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Unit test for agent metasploit exploit check, case when target is not exist"""
agent_instance.process(scan_message_host_not_exist)

assert len(agent_mock) == 0
assert "The specified target sa.com is not valid" in caplog.text
Loading