diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 5715dc99..687fc9ae 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -36,6 +36,9 @@ jobs: python -m pip install --upgrade pip python -m pip install -r requirement.txt python -m pip install -r tests/test-requirement.txt + - name: Install nmap. + run: | + sudo apt-get install nmap - name: Running tests with pytest. run: | set -o pipefail diff --git a/agent/nmap_agent.py b/agent/nmap_agent.py index 17e65f90..7bc234cf 100644 --- a/agent/nmap_agent.py +++ b/agent/nmap_agent.py @@ -3,13 +3,13 @@ The agent expects messages of type `v3.asset.ip.[v4,v6]`, and emits back messages of type `v3.asset.ip.v[4,6].port.service`, and `v3.report.vulnerability` with a technical report of the scan. """ -import ipaddress import datetime +import ipaddress import logging import re +import subprocess from typing import Dict, Any, Tuple, Optional, List, cast from urllib import parse -import subprocess from ostorlab.agent import agent, definitions as agent_definitions from ostorlab.agent.kb import kb @@ -41,6 +41,10 @@ WIREGUARD_CONFIG_FILE_PATH = "/etc/wireguard/wg0.conf" DNS_RESOLV_CONFIG_PATH = "/etc/resolv.conf" +DEFAULT_MASK_IPV6 = 128 +# scan up to 65536 host +IPV6_CIDR_LIMIT = 112 + class Error(Exception): """Base Custom Error Class.""" @@ -96,8 +100,13 @@ def process(self, message: msg.Message) -> None: else: hosts = [(host, mask)] elif "v6" in message.selector: - mask = int(message.data.get("mask", "64")) - max_mask = int(self.args.get("max_network_mask_ipv6", "64")) + mask = int(message.data.get("mask", DEFAULT_MASK_IPV6)) + if mask < IPV6_CIDR_LIMIT: + raise ValueError( + f"Subnet mask below {IPV6_CIDR_LIMIT} is not supported" + ) + + max_mask = int(self.args.get("max_network_mask_ipv6", "128")) if mask < max_mask: for subnet in ipaddress.ip_network( f"{host}/{mask}", strict=False diff --git a/ostorlab.yaml b/ostorlab.yaml index b3abd63a..f4a39fe9 100755 --- a/ostorlab.yaml +++ b/ostorlab.yaml @@ -1,6 +1,6 @@ kind: Agent name: nmap -version: 0.10.0 +version: 0.10.1 image: images/logo.png description: | This repository is an implementation of [Ostorlab Agent](https://pypi.org/project/ostorlab/) for the [Nmap Scanner](https://github.com/projectdiscovery/nmap) by Project Discovery. @@ -111,7 +111,7 @@ args: - name: "max_network_mask_ipv6" type: "int" description: "When scanning an IP range, maximum network size, if the network is above max, network in divided into subnetworks." - value: 64 + value: 112 - name: "scope_domain_regex" type: "string" description: "Regular expression to define domain scanning scope." diff --git a/tests/conftest.py b/tests/conftest.py index 790c75a8..8a9a0fd8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -246,6 +246,31 @@ def junk_msg() -> message.Message: @pytest.fixture def ipv6_msg() -> message.Message: + """Creates a dummy message of type v3.asset.ip.v6 for testing purposes.""" + return message.Message.from_data( + selector="v3.asset.ip.v6", + data={ + "version": 6, + "host": "2600:3c01:224a:6e00:f03c:91ff:fe18:bb2f", + "mask": "112", + }, + ) + + +@pytest.fixture +def ipv6_msg_without_mask() -> message.Message: + """Creates a dummy message of type v3.asset.ip.v6 for testing purposes.""" + return message.Message.from_data( + selector="v3.asset.ip.v6", + data={ + "version": 6, + "host": "2001:470:1:18:1000::46", + }, + ) + + +@pytest.fixture +def ipv6_msg_above_limit() -> message.Message: """Creates a dummy message of type v3.asset.ip.v6 for testing purposes.""" return message.Message.from_data( selector="v3.asset.ip.v6", diff --git a/tests/nmap_agent_test.py b/tests/nmap_agent_test.py index 7b92a78a..17908482 100644 --- a/tests/nmap_agent_test.py +++ b/tests/nmap_agent_test.py @@ -9,6 +9,7 @@ from agent import nmap_agent from agent import nmap_options +import pytest JSON_OUTPUT = { "nmaprun": { @@ -642,3 +643,29 @@ def testNmapAgentLifecycle_whenIpv6WithHostBits_agentShouldNotCrash( "2600:3c01:224a:6e00:f03c:91ff:fe18:bb2f" in agent_mock[1].data["technical_detail"] ) + + +def testNmapAgent_whenIpv6WithoutMask_agentShouldNotGetStuck( + nmap_test_agent: nmap_agent.NmapAgent, + agent_mock: List[message.Message], + agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], + ipv6_msg_without_mask: message.Message, +) -> None: + """Unit test of nmap agent when ipv6 without mask is provided, the agent should not get stuck.""" + nmap_test_agent.process(ipv6_msg_without_mask) + + assert len(agent_mock) == 0 + + +def testNmapAgent_whenIpv6AboveLimit_agentShouldRaiseError( + nmap_test_agent: nmap_agent.NmapAgent, + agent_mock: List[message.Message], + agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], + ipv6_msg_above_limit: message.Message, +) -> None: + """Unit test of nmap agent when ipv6 above limit is provided, the agent should raise an error.""" + with pytest.raises(ValueError) as error_message: + nmap_test_agent.process(ipv6_msg_above_limit) + + assert len(agent_mock) == 0 + assert error_message.value.args[0] == "Subnet mask below 112 is not supported"