generated from Ostorlab/template_agent
-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
176 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
"""Utilities for Asteroid agent""" | ||
|
||
|
||
from ostorlab.agent.message import message as m | ||
from urllib import parse as urlparser | ||
import ipaddress | ||
from agent import definitions | ||
|
||
DEFAULT_PORT = 443 | ||
DEFAULT_SCHEME = "https" | ||
|
||
SCHEME_TO_PORT = { | ||
"http": 80, | ||
"https": 443, | ||
"ftp": 21, | ||
"ssh": 22, | ||
"telnet": 23, | ||
"smtp": 25, | ||
"pop3": 110, | ||
"imap": 143, | ||
"irc": 6667, | ||
"mysql": 3306, | ||
"postgres": 5432, | ||
"redis": 6379, | ||
"mongodb": 27017, | ||
"ldap": 389, | ||
"sftp": 22, | ||
"vnc": 5900, | ||
"git": 9418, | ||
} | ||
|
||
|
||
def _get_port(message: m.Message, scheme: str) -> int: | ||
"""Returns the port to be used for the target.""" | ||
if message.data.get("port") is None: | ||
return SCHEME_TO_PORT.get(scheme) or DEFAULT_PORT | ||
return int(message.data["port"]) | ||
|
||
|
||
def _get_scheme(message: m.Message) -> str: | ||
"""Returns the schema to be used for the target.""" | ||
protocol = message.data.get("protocol") | ||
if protocol is not None: | ||
return str(protocol) | ||
|
||
schema = message.data.get("schema") | ||
if schema is None: | ||
return DEFAULT_SCHEME | ||
if schema in [ | ||
"https?", | ||
"ssl/https-alt?", | ||
"ssl/https-alt", | ||
"https-alt", | ||
"https-alt?", | ||
]: | ||
return "https" | ||
if schema in ["http?", "http"]: | ||
return "http" | ||
return str(schema) | ||
|
||
|
||
def prepare_targets(message: m.Message) -> list[definitions.Target]: | ||
"""Prepare targets based on type, if a domain name is provided, port and protocol are collected | ||
from the config.""" | ||
if (host := message.data.get("host")) is not None: | ||
scheme = _get_scheme(message) | ||
port = _get_port(message, scheme) | ||
mask = message.data.get("mask") | ||
if mask is None: | ||
hosts = ipaddress.ip_network(host) | ||
else: | ||
hosts = ipaddress.ip_network(f"{host}/{mask}", strict=False) | ||
return [ | ||
definitions.Target(host=str(h), port=port, scheme=scheme) for h in hosts | ||
] | ||
elif (host := message.data.get("name")) is not None: | ||
scheme = _get_scheme(message) | ||
port = _get_port(message, scheme) | ||
return [definitions.Target(host=host, port=port, scheme=scheme)] | ||
elif (url := message.data.get("url")) is not None: | ||
parsed_url = urlparser.urlparse(url) | ||
host = parsed_url.netloc | ||
scheme = parsed_url.scheme | ||
port = _get_port(message, scheme) | ||
return [definitions.Target(host=host, port=port, scheme=scheme)] | ||
else: | ||
raise NotImplementedError |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
"""Unit tests for asteroid agent utilities""" | ||
from ostorlab.agent.message import message | ||
|
||
from agent import utils | ||
|
||
|
||
def testPrepareTargets_whenDomainAsset_returnResult( | ||
scan_message_domain_name: message.Message, | ||
) -> None: | ||
targets = utils.prepare_targets(scan_message_domain_name) | ||
|
||
assert len(targets) > 0 | ||
target = targets[0] | ||
assert target.host == "www.google.com" | ||
assert target.scheme == "https" | ||
assert target.port == 443 | ||
|
||
|
||
def testPrepareTargets_whenIPv4Asset_returnResult( | ||
scan_message_ipv4: message.Message, | ||
) -> None: | ||
targets = utils.prepare_targets(scan_message_ipv4) | ||
|
||
assert len(targets) > 0 | ||
target = targets[0] | ||
assert target.host == "192.168.1.17" | ||
assert target.scheme == "https" | ||
assert target.port == 443 | ||
|
||
|
||
def testPrepareTargets_whenIPv6Asset_returnResult( | ||
scan_message_ipv6: message.Message, | ||
) -> None: | ||
targets = utils.prepare_targets(scan_message_ipv6) | ||
|
||
assert len(targets) > 0 | ||
target = targets[0] | ||
assert target.host == "2001:db8:3333:4444:5555:6666:7777:8888" | ||
assert target.scheme == "https" | ||
assert target.port == 443 | ||
|
||
|
||
def testPrepareTargets_whenLinkAsset_returnResult( | ||
scan_message_link: message.Message, | ||
) -> None: | ||
targets = utils.prepare_targets(scan_message_link) | ||
|
||
assert len(targets) > 0 | ||
target = targets[0] | ||
assert target.host == "www.google.com" | ||
assert target.scheme == "https" | ||
assert target.port == 443 |