From fdbc0e672e0049c6c128d539eeb20e7ab605c513 Mon Sep 17 00:00:00 2001 From: benyissa Date: Mon, 18 Dec 2023 18:42:32 +0100 Subject: [PATCH] handle cidr limits --- agent/tsunami/factory/preapre_tagets_tools.py | 35 +++++++++------- ostorlab.yaml | 2 +- tests/conftest.py | 40 +++++++++++++++++++ tests/factory/tools_test.py | 26 ++++++++++++ 4 files changed, 88 insertions(+), 15 deletions(-) diff --git a/agent/tsunami/factory/preapre_tagets_tools.py b/agent/tsunami/factory/preapre_tagets_tools.py index a412232..f75d300 100644 --- a/agent/tsunami/factory/preapre_tagets_tools.py +++ b/agent/tsunami/factory/preapre_tagets_tools.py @@ -7,6 +7,9 @@ from ostorlab.agent.message import message as msg +IPV4_CIDR_LIMIT = 16 +IPV6_CIDR_LIMIT = 112 + @dataclasses.dataclass class Target: @@ -53,21 +56,24 @@ def _prepare_url_target(message: msg.Message) -> Target: return Target(domain=str(parse.urlparse(link).netloc), url=link) -def _prepare_ip_targets(message: msg.Message) -> list[Target]: - version = message.data["version"] +def _prepare_ip_targets(message: msg.Message, host: str) -> list[Target]: + mask = message.data.get("mask") + version = message.data.get("version") if version == 6: + if mask is not None and int(mask) < IPV6_CIDR_LIMIT: + raise ValueError(f"Subnet mask below {IPV6_CIDR_LIMIT} is not supported.") version = "v6" - elif message.data["version"] == 4: + elif version == 4: + if mask is not None and int(mask) < IPV4_CIDR_LIMIT: + raise ValueError(f"Subnet mask below {IPV4_CIDR_LIMIT} is not supported.") version = "v4" else: - raise ValueError(f'Incorrect ip version {message.data["version"]}') + raise ValueError(f"Incorrect ip version {version}") try: - if message.data.get("mask") is None: - ip_network = ipaddress.ip_network(message.data["host"]) + if mask is None: + ip_network = ipaddress.ip_network(host) else: - ip_network = ipaddress.ip_network( - f"""{message.data.get('host')}/{message.data.get('mask')}""" - ) + ip_network = ipaddress.ip_network(f"""{host}/{mask}""") return [ Target(version=version, address=str(host), ip_network=ip_network) for host in ip_network.hosts() @@ -75,8 +81,8 @@ def _prepare_ip_targets(message: msg.Message) -> list[Target]: except ValueError: logging.info( "Incorrect %s / %s", - {message.data.get("host")}, - {message.data.get("mask")}, + {host}, + {mask}, ) return [] @@ -87,10 +93,11 @@ def prepare_targets(message: msg.Message, args: dict[str, Any]) -> list[Target]: if message.data.get("name") is not None: return [_prepare_domain_target(message, args)] # link message - elif message.data.get("url") is not None: + if message.data.get("url") is not None: return [_prepare_url_target(message)] # IP message - elif message.data.get("host") is not None: - return _prepare_ip_targets(message) + host = message.data.get("host") + if host is not None: + return _prepare_ip_targets(message, host) else: raise ValueError("Message is invalid.") diff --git a/ostorlab.yaml b/ostorlab.yaml index 0be863d..4c4dd0d 100644 --- a/ostorlab.yaml +++ b/ostorlab.yaml @@ -1,6 +1,6 @@ kind: Agent name: tsunami -version: 0.5.0 +version: 0.5.1 image: images/logo.png description: | This repository is an implementation of [Ostorlab Agent](https://pypi.org/project/ostorlab/) for the [Tsunami Scanner](https://github.com/google/tsunami-security-scanner) by Google. diff --git a/tests/conftest.py b/tests/conftest.py index 63f9deb..b2de6b0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -80,3 +80,43 @@ def ip_small_range_message() -> message.Message: selector = "v3.asset.ip.v4" msg_data = {"host": "42.42.42.42", "mask": "31", "version": 4} return message.Message.from_data(selector, data=msg_data) + + +@pytest.fixture() +def scan_message_ipv4_with_mask8() -> message.Message: + """Creates a message of type v3.asset.ip.v4 to be used by the agent for testing purposes.""" + selector = "v3.asset.ip.v4" + msg_data = {"host": "192.168.1.17", "mask": "8", "version": 4} + return message.Message.from_data(selector, data=msg_data) + + +@pytest.fixture() +def scan_message_ipv4_with_mask16() -> message.Message: + """Creates a message of type v3.asset.ip.v4 to be used by the agent for testing purposes.""" + selector = "v3.asset.ip.v4" + msg_data = {"host": "192.168.1.17", "mask": "16", "version": 4} + return message.Message.from_data(selector, data=msg_data) + + +@pytest.fixture() +def scan_message_ipv6_with_mask64() -> message.Message: + """Creates a message of type v3.asset.ip.v6 to be used by the agent for testing purposes.""" + selector = "v3.asset.ip.v6" + msg_data = { + "host": "2001:db8:3333:4444:5555:6666:7777:8888", + "mask": "64", + "version": 6, + } + return message.Message.from_data(selector, data=msg_data) + + +@pytest.fixture() +def scan_message_ipv6_with_mask112() -> message.Message: + """Creates a message of type v3.asset.ip.v6 to be used by the agent for testing purposes.""" + selector = "v3.asset.ip.v6" + msg_data = { + "host": "2001:db8:3333:4444:5555:6666:7777:8888", + "mask": "112", + "version": 6, + } + return message.Message.from_data(selector, data=msg_data) diff --git a/tests/factory/tools_test.py b/tests/factory/tools_test.py index f63355a..0d17c45 100644 --- a/tests/factory/tools_test.py +++ b/tests/factory/tools_test.py @@ -74,3 +74,29 @@ def testTsunamyFactory_whenPrepareMessages_prepareTarget( input_message: message.Message, expected: list[tools.Target] ) -> None: assert tools.prepare_targets(message=input_message, args={}) == expected + + +def testPrepareTargets_whenIPv4AssetReachCIDRLimit_raiseValueError( + scan_message_ipv4_with_mask8: message.Message, +) -> None: + with pytest.raises(ValueError, match="Subnet mask below 16 is not supported."): + tools.prepare_targets(message=scan_message_ipv4_with_mask8, args={}) + + +def testPrepareTargets_whenIPv4AssetDoesNotReachCIDRLimit_doesNotRaiseValueError( + scan_message_ipv4_with_mask16: message.Message, +) -> None: + tools.prepare_targets(message=scan_message_ipv4_with_mask16, args={}) + + +def testPrepareTargets_whenIPv6AssetReachCIDRLimit_raiseValueError( + scan_message_ipv6_with_mask64: message.Message, +) -> None: + with pytest.raises(ValueError, match="Subnet mask below 112 is not supported."): + tools.prepare_targets(message=scan_message_ipv6_with_mask64, args={}) + + +def testPrepareTargets_whenIPv6AssetDoesNotReachCIDRLimit_doesNotRaiseValueError( + scan_message_ipv6_with_mask112: message.Message, +) -> None: + tools.prepare_targets(message=scan_message_ipv6_with_mask112, args={})