diff --git a/agent/utils.py b/agent/utils.py new file mode 100644 index 00000000..de78069b --- /dev/null +++ b/agent/utils.py @@ -0,0 +1,87 @@ +"""Utilities for Asteroid agent""" + + +from ostorlab.agent.message import message as m +from urllib import parse as urlparser +import ipaddress +from agent import definitions + +DEFAULT_PORT = 443 +DEFAULT_SCHEME = "https" + +SCHEME_TO_PORT = { + "http": 80, + "https": 443, + "ftp": 21, + "ssh": 22, + "telnet": 23, + "smtp": 25, + "pop3": 110, + "imap": 143, + "irc": 6667, + "mysql": 3306, + "postgres": 5432, + "redis": 6379, + "mongodb": 27017, + "ldap": 389, + "sftp": 22, + "vnc": 5900, + "git": 9418, +} + + +def _get_port(message: m.Message, scheme: str) -> int: + """Returns the port to be used for the target.""" + if message.data.get("port") is None: + return SCHEME_TO_PORT.get(scheme) or DEFAULT_PORT + return int(message.data["port"]) + + +def _get_scheme(message: m.Message) -> str: + """Returns the schema to be used for the target.""" + protocol = message.data.get("protocol") + if protocol is not None: + return str(protocol) + + schema = message.data.get("schema") + if schema is None: + return DEFAULT_SCHEME + if schema in [ + "https?", + "ssl/https-alt?", + "ssl/https-alt", + "https-alt", + "https-alt?", + ]: + return "https" + if schema in ["http?", "http"]: + return "http" + return str(schema) + + +def prepare_targets(message: m.Message) -> list[definitions.Target]: + """Prepare targets based on type, if a domain name is provided, port and protocol are collected + from the config.""" + if (host := message.data.get("host")) is not None: + scheme = _get_scheme(message) + port = _get_port(message, scheme) + mask = message.data.get("mask") + if mask is None: + hosts = ipaddress.ip_network(host) + else: + hosts = ipaddress.ip_network(f"{host}/{mask}", strict=False) + return [ + definitions.Target(host=str(h), port=port, scheme=scheme) for h in hosts + ] + elif (host := message.data.get("name")) is not None: + scheme = _get_scheme(message) + port = _get_port(message, scheme) + return [definitions.Target(host=host, port=port, scheme=scheme)] + elif (url := message.data.get("url")) is not None: + parsed_url = urlparser.urlparse(url) + host = parsed_url.netloc + scheme = parsed_url.scheme + port = _get_port(message, scheme) + return [definitions.Target(host=host, port=port, scheme=scheme)] + else: + raise NotImplementedError diff --git a/tests/conftest.py b/tests/conftest.py index f1919a4a..f8e95728 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,7 +2,7 @@ from typing import Type import pytest - +from ostorlab.agent.message import message from agent import definitions @@ -18,3 +18,39 @@ def check(self, target: definitions.Target) -> list[definitions.Vulnerability]: return [] return TestExploit + + +@pytest.fixture() +def scan_message_domain_name() -> message.Message: + """Creates a message of type v3.asset.domain_name.service to be used by the agent for testing purposes.""" + selector = "v3.asset.domain_name.service" + msg_data = {"schema": "https", "name": "www.google.com", "port": 443} + return message.Message.from_data(selector, data=msg_data) + + +@pytest.fixture() +def scan_message_link() -> message.Message: + """Creates a message of type v3.asset.link to be used by the agent for testing purposes.""" + selector = "v3.asset.link" + msg_data = {"url": "https://www.google.com", "method": "POST"} + return message.Message.from_data(selector, data=msg_data) + + +@pytest.fixture() +def scan_message_ipv6() -> message.Message: + """Creates a message of type v3.asset.ip.v6 to be used by the agent for testing purposes.""" + selector = "v3.asset.ip.v6" + msg_data = { + "host": "2001:db8:3333:4444:5555:6666:7777:8888", + "mask": "128", + "version": 6, + } + return message.Message.from_data(selector, data=msg_data) + + +@pytest.fixture() +def scan_message_ipv4() -> message.Message: + """Creates a message of type v3.asset.ip.v4 to be used by the agent for testing purposes.""" + selector = "v3.asset.ip.v4" + msg_data = {"host": "192.168.1.17", "mask": "32", "version": 4} + return message.Message.from_data(selector, data=msg_data) diff --git a/tests/utils_test.py b/tests/utils_test.py new file mode 100644 index 00000000..2dd2549c --- /dev/null +++ b/tests/utils_test.py @@ -0,0 +1,52 @@ +"""Unit tests for asteroid agent utilities""" +from ostorlab.agent.message import message + +from agent import utils + + +def testPrepareTargets_whenDomainAsset_returnResult( + scan_message_domain_name: message.Message, +) -> None: + targets = utils.prepare_targets(scan_message_domain_name) + + assert len(targets) > 0 + target = targets[0] + assert target.host == "www.google.com" + assert target.scheme == "https" + assert target.port == 443 + + +def testPrepareTargets_whenIPv4Asset_returnResult( + scan_message_ipv4: message.Message, +) -> None: + targets = utils.prepare_targets(scan_message_ipv4) + + assert len(targets) > 0 + target = targets[0] + assert target.host == "192.168.1.17" + assert target.scheme == "https" + assert target.port == 443 + + +def testPrepareTargets_whenIPv6Asset_returnResult( + scan_message_ipv6: message.Message, +) -> None: + targets = utils.prepare_targets(scan_message_ipv6) + + assert len(targets) > 0 + target = targets[0] + assert target.host == "2001:db8:3333:4444:5555:6666:7777:8888" + assert target.scheme == "https" + assert target.port == 443 + + +def testPrepareTargets_whenLinkAsset_returnResult( + scan_message_link: message.Message, +) -> None: + targets = utils.prepare_targets(scan_message_link) + + assert len(targets) > 0 + target = targets[0] + assert target.host == "www.google.com" + assert target.scheme == "https" + assert target.port == 443