Skip to content

Commit

Permalink
Merge pull request #8 from Ostorlab/feat_prepare_targets
Browse files Browse the repository at this point in the history
Feat / Prepare targets
  • Loading branch information
3asm authored Nov 17, 2023
2 parents a9de23a + d01d29c commit b7ca2e9
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 1 deletion.
100 changes: 100 additions & 0 deletions agent/targets_preparer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""Target Preparer Module for Asteroid Agent"""
from typing import Generator

from ostorlab.agent.message import message as m
from urllib import parse as urlparser
import ipaddress
from agent import definitions

MIN_MASK_IPV4 = 24
MIN_MASK_IPV6 = 120
DEFAULT_PORT = 443
DEFAULT_SCHEME = "https"
SCHEME_TO_PORT = {
"http": 80,
"https": 443,
"ftp": 21,
"ssh": 22,
"telnet": 23,
"smtp": 25,
"pop3": 110,
"imap": 143,
"irc": 6667,
"mysql": 3306,
"postgres": 5432,
"redis": 6379,
"mongodb": 27017,
"ldap": 389,
"sftp": 22,
"vnc": 5900,
"git": 9418,
}


def _get_port(message: m.Message, scheme: str) -> int:
"""Returns the port to be used for the target."""
if message.data.get("port") is None:
return SCHEME_TO_PORT.get(scheme) or DEFAULT_PORT
return int(message.data["port"])


def _get_scheme(message: m.Message) -> str:
"""Returns the schema to be used for the target."""
protocol = message.data.get("protocol")
if protocol is not None:
return str(protocol)

schema = message.data.get("schema")
if schema is None:
return DEFAULT_SCHEME
if schema in [
"https",
"https?",
"ssl/https-alt?",
"ssl/https-alt",
"https-alt",
"https-alt?",
]:
return "https"
if schema in ["http?", "http"]:
return "http"
return str(schema)


def prepare_targets(message: m.Message) -> Generator[definitions.Target, None, None]:
"""Prepare targets based on type. If a domain name is provided, port and protocol are collected from the config.
Args:
message (m.Message): The input message containing information about the target.
Yields:
Target: A target containing host, port, and scheme information.
"""
if (host := message.data.get("host")) is not None:
scheme = _get_scheme(message)
port = _get_port(message, scheme)
mask = message.data.get("mask")
if mask is None:
hosts = ipaddress.ip_network(host)
else:
version = message.data.get("version")
if version == 4 and int(mask) < MIN_MASK_IPV4:
raise ValueError(f"Subnet mask below {MIN_MASK_IPV4} is not supported.")
if version == 6 and int(mask) < MIN_MASK_IPV6:
raise ValueError(f"Subnet mask below {MIN_MASK_IPV6} is not supported")
hosts = ipaddress.ip_network(f"{host}/{mask}", strict=False)
yield from (
definitions.Target(host=str(h), port=port, scheme=scheme) for h in hosts
)
elif (host := message.data.get("name")) is not None:
scheme = _get_scheme(message)
port = _get_port(message, scheme)
yield definitions.Target(host=host, port=port, scheme=scheme)
elif (url := message.data.get("url")) is not None:
parsed_url = urlparser.urlparse(url)
host = parsed_url.netloc
scheme = parsed_url.scheme
port = _get_port(message, scheme)
yield definitions.Target(host=host, port=port, scheme=scheme)
else:
raise NotImplementedError
38 changes: 37 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Type

import pytest

from ostorlab.agent.message import message
from agent import definitions


Expand All @@ -18,3 +18,39 @@ def check(self, target: definitions.Target) -> list[definitions.Vulnerability]:
return []

return TestExploit


@pytest.fixture()
def scan_message_domain_name() -> message.Message:
"""Creates a message of type v3.asset.domain_name.service to be used by the agent for testing purposes."""
selector = "v3.asset.domain_name.service"
msg_data = {"schema": "https", "name": "www.google.com", "port": 443}
return message.Message.from_data(selector, data=msg_data)


@pytest.fixture()
def scan_message_link() -> message.Message:
"""Creates a message of type v3.asset.link to be used by the agent for testing purposes."""
selector = "v3.asset.link"
msg_data = {"url": "https://www.google.com", "method": "POST"}
return message.Message.from_data(selector, data=msg_data)


@pytest.fixture()
def scan_message_ipv6() -> message.Message:
"""Creates a message of type v3.asset.ip.v6 to be used by the agent for testing purposes."""
selector = "v3.asset.ip.v6"
msg_data = {
"host": "2001:db8:3333:4444:5555:6666:7777:8888",
"mask": "128",
"version": 6,
}
return message.Message.from_data(selector, data=msg_data)


@pytest.fixture()
def scan_message_ipv4() -> message.Message:
"""Creates a message of type v3.asset.ip.v4 to be used by the agent for testing purposes."""
selector = "v3.asset.ip.v4"
msg_data = {"host": "192.168.1.17", "mask": "32", "version": 4}
return message.Message.from_data(selector, data=msg_data)
52 changes: 52 additions & 0 deletions tests/targets_preparer_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Unit tests for target preparer"""
from ostorlab.agent.message import message

from agent import targets_preparer


def testPrepareTargets_whenDomainAsset_returnResult(
scan_message_domain_name: message.Message,
) -> None:
targets = targets_preparer.prepare_targets(scan_message_domain_name)

assert any(targets)
for target in targets:
assert target.host == "192.168.1.17"
assert target.scheme == "https"
assert target.port == 443


def testPrepareTargets_whenIPv4Asset_returnResult(
scan_message_ipv4: message.Message,
) -> None:
targets = targets_preparer.prepare_targets(scan_message_ipv4)

assert any(targets)
for target in targets:
assert target.host == "192.168.1.17"
assert target.scheme == "https"
assert target.port == 443


def testPrepareTargets_whenIPv6Asset_returnResult(
scan_message_ipv6: message.Message,
) -> None:
targets = targets_preparer.prepare_targets(scan_message_ipv6)

assert any(targets)
for target in targets:
assert target.host == "2001:db8:3333:4444:5555:6666:7777:8888"
assert target.scheme == "https"
assert target.port == 443


def testPrepareTargets_whenLinkAsset_returnResult(
scan_message_link: message.Message,
) -> None:
targets = targets_preparer.prepare_targets(scan_message_link)

assert any(targets)
for target in targets:
assert target.host == "www.google.com"
assert target.scheme == "https"
assert target.port == 443

0 comments on commit b7ca2e9

Please sign in to comment.