From 55d42570b900d4c9b48aa2bfe6e6e66810219381 Mon Sep 17 00:00:00 2001 From: nmasdoufi-ol Date: Mon, 2 Dec 2024 13:59:27 +0100 Subject: [PATCH 1/7] Improved IPv6 Handling for Nmap Agent --- agent/nmap_agent.py | 84 ++++++++++++++-- agent/nmap_options.py | 22 +++++ tests/conftest.py | 32 +++++- tests/nmap_agent_test.py | 204 ++++++++++++++++++++++++++++++++++++++- 4 files changed, 324 insertions(+), 18 deletions(-) diff --git a/agent/nmap_agent.py b/agent/nmap_agent.py index bec88b04..1b8697d7 100644 --- a/agent/nmap_agent.py +++ b/agent/nmap_agent.py @@ -45,6 +45,10 @@ DEFAULT_MASK_IPV6 = 128 # scan up to 65536 host IPV6_CIDR_LIMIT = 112 +# More IPv6-specific constants +IPV6_MIN_PREFIX = 8 # Minimum safe prefix length for IPv6 +IPV6_DEFAULT_PING_TIMEOUT = 1000 # ms +IPV6_MAX_BATCH_SIZE = 100 # Maximum number of IPv6 addresses to scan in one batch BLACKLISTED_SERVICES = ["tcpwrapped"] @@ -74,6 +78,33 @@ def __init__( self._scope_domain_regex: Optional[str] = self.args.get("scope_domain_regex") self._vpn_config: Optional[str] = self.args.get("vpn_config") self._dns_config: Optional[str] = self.args.get("dns_config") + self._validate_ipv6_settings() + + def _validate_ipv6_settings(self) -> None: + """Validate IPv6-specific settings.""" + max_mask = int(self.args.get("max_network_mask_ipv6", "128")) + if max_mask < IPV6_MIN_PREFIX or max_mask > 128: + raise ValueError(f"IPv6 prefix must be between {IPV6_MIN_PREFIX} and 128") + + def _normalize_ipv6_address(self, address: str) -> str: + """Normalize IPv6 address format.""" + try: + return str(ipaddress.IPv6Address(address).compressed) + except ipaddress.AddressValueError as e: + logger.error("Invalid IPv6 address: %s", e) + raise + + def _get_ipv6_scan_options(self) -> dict[str, Any]: + """Get IPv6-specific scan options.""" + return { + "ping_timeout": self.args.get( + "ipv6_ping_timeout", IPV6_DEFAULT_PING_TIMEOUT + ), + "min_rate": self.args.get("ipv6_min_rate", 100), + "max_rate": self.args.get("ipv6_max_rate", 1000), + "max_retries": self.args.get("ipv6_max_retries", 2), + "fragment_mtu": self.args.get("ipv6_fragment_mtu", 1280), + } def start(self) -> None: if self._vpn_config is not None and self._dns_config is not None: @@ -102,20 +133,37 @@ def process(self, message: msg.Message) -> None: else: hosts = [(host, mask)] elif "v6" in message.selector: + # Handle IPv6 address normalization + try: + normalized_host = self._normalize_ipv6_address(host) + except ipaddress.AddressValueError: + logger.error("Invalid IPv6 address: %s", host) + return + mask = int(message.data.get("mask", DEFAULT_MASK_IPV6)) if mask < IPV6_CIDR_LIMIT: - raise ValueError( - f"Subnet mask below {IPV6_CIDR_LIMIT} is not supported" + logger.error( + "IPv6 subnet mask below %s is not supported", IPV6_CIDR_LIMIT ) + return + # Validate the mask max_mask = int(self.args.get("max_network_mask_ipv6", "128")) if mask < max_mask: - for subnet in ipaddress.ip_network( - f"{host}/{mask}", strict=False - ).subnets(new_prefix=max_mask): - hosts.append((str(subnet.network_address), max_mask)) + try: + # Only process valid subnets + for subnet in ipaddress.ip_network( + f"{normalized_host}/{mask}", strict=False + ).subnets(new_prefix=max_mask): + if isinstance(subnet, ipaddress.IPv6Network): + hosts.append((str(subnet.network_address), max_mask)) + else: + logger.error("Unexpected subnet type for IPv6: %s", subnet) + except ValueError as e: + logger.error("Failed to process IPv6 network: %s", e) + return else: - hosts = [(host, mask)] + hosts = [(normalized_host, mask)] domain_name = self._prepare_domain_name( message.data.get("name"), message.data.get("url") @@ -163,6 +211,8 @@ def process(self, message: msg.Message) -> None: logger.error("Neither host or domain are set.") def _scan_host(self, host: str, mask: int) -> Tuple[Dict[str, Any], str]: + is_ipv6 = ":" in host # Simple check for IPv6 address + options = nmap_options.NmapOptions( dns_resolution=False, ports=self.args.get("ports"), @@ -175,11 +225,25 @@ def _scan_host(self, host: str, mask: int) -> Tuple[Dict[str, Any], str]: script_default=self.args.get("script_default", False), version_detection=self.args.get("version_info", False), ) - client = nmap_wrapper.NmapWrapper(options) + if is_ipv6 is True: + # Add IPv6-specific options + ipv6_options = self._get_ipv6_scan_options() + options.ping_timeout = ipv6_options["ping_timeout"] + options.min_rate = ipv6_options["min_rate"] + options.max_rate = ipv6_options["max_rate"] + options.max_retries = ipv6_options["max_retries"] + options.fragment_mtu = ipv6_options["fragment_mtu"] + + client = nmap_wrapper.NmapWrapper(options) logger.info("scanning target %s/%s with options %s", host, mask, options) - scan_results, normal_results = client.scan_hosts(hosts=host, mask=mask) - return scan_results, normal_results + + try: + scan_results, normal_results = client.scan_hosts(hosts=host, mask=mask) + return scan_results, normal_results + except subprocess.CalledProcessError as e: + logger.error("Nmap scan failed for IPv6 host %s: %s", host, e) + raise def _scan_domain(self, domain_name: str) -> Tuple[Dict[str, Any], str]: options = nmap_options.NmapOptions( diff --git a/agent/nmap_options.py b/agent/nmap_options.py index 3a4c31b0..7d82c2a9 100644 --- a/agent/nmap_options.py +++ b/agent/nmap_options.py @@ -63,6 +63,12 @@ class NmapOptions: no_ping: bool = True privileged: Optional[bool] = None + ping_timeout: int | None = None # Example: Timeout for pings (in milliseconds) + min_rate: int | None = None # Minimum number of packets per second + max_rate: int | None = None # Maximum number of packets per second + max_retries: int | None = None # Maximum number of retries for probes + fragment_mtu: int | None = None # MTU for fragmented packets + def _set_os_detection_option(self) -> List[str]: """Appends the os detection option to the list of nmap options.""" command_options = [] @@ -151,6 +157,21 @@ def _run_scripts_command(self, scripts: List[str]) -> List[str]: return command + def _set_additional_options(self) -> List[str]: + """Appends additional options for fine-tuning Nmap behavior.""" + command_options = [] + if self.ping_timeout is not None: + command_options.append(f"--host-timeout {self.ping_timeout}ms") + if self.min_rate is not None: + command_options.append(f"--min-rate {self.min_rate}") + if self.max_rate is not None: + command_options.append(f"--max-rate {self.max_rate}") + if self.max_retries is not None: + command_options.append(f"--max-retries {self.max_retries}") + if self.fragment_mtu is not None: + command_options.append(f"--mtu {self.fragment_mtu}") + return command_options + @property def command_options(self) -> List[str]: """Computes the list of nmap options.""" @@ -165,4 +186,5 @@ def command_options(self) -> List[str]: command_options.extend(self._set_privileged()) command_options.extend(self._set_scripts()) command_options.extend(self._set_script_default()) + command_options.extend(self._set_additional_options()) return command_options diff --git a/tests/conftest.py b/tests/conftest.py index 8ac5f9db..c6d6b415 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -290,15 +290,41 @@ def ipv6_msg_without_mask() -> message.Message: ) +@pytest.fixture +def invalid_ipv6_msg() -> message.Message: + """Creates an invalid IPv6 message for testing error handling.""" + return message.Message.from_data( + selector="v3.asset.ip.v6", + data={ + "version": 6, + "host": "invalid_ipv6", + "mask": "112", + }, + ) + + +@pytest.fixture +def large_subnet_ipv6_msg() -> message.Message: + """Creates a message with a large IPv6 subnet.""" + return message.Message.from_data( + selector="v3.asset.ip.v6", + data={ + "version": 6, + "host": "2600:3c01:224a:6e00::", + "mask": "112", + }, + ) + + @pytest.fixture def ipv6_msg_above_limit() -> message.Message: - """Creates a dummy message of type v3.asset.ip.v6 for testing purposes.""" + """Creates a message with an IPv6 subnet above the allowed limit (below mask 112).""" return message.Message.from_data( selector="v3.asset.ip.v6", data={ "version": 6, - "host": "2600:3c01:224a:6e00:f03c:91ff:fe18:bb2f", - "mask": "64", + "host": "2600:3c01:224a:6e00::", + "mask": "96", # Below IPV6_CIDR_LIMIT of 112 }, ) diff --git a/tests/nmap_agent_test.py b/tests/nmap_agent_test.py index 53a8468f..01f8b90e 100644 --- a/tests/nmap_agent_test.py +++ b/tests/nmap_agent_test.py @@ -1,6 +1,8 @@ """Unittests for Nmap agent.""" +import ipaddress import json +import logging from typing import List, Dict, Union import subprocess @@ -212,6 +214,41 @@ } } +LARGE_IPV6_SUBNET_JSON_OUTPUT = { + "nmaprun": { + "host": [ + { + "address": { + "@addr": "2600:3c01:224a:6e00:f03c:91ff:fe18:bb2f", + "@addrtype": "ipv6", + }, + "ports": { + "port": { + "@portid": "22", + "@protocol": "tcp", + "state": {"@state": "open"}, + "service": {"@name": "ssh"}, + } + }, + }, + { + "address": { + "@addr": "2600:3c01:224a:6e00:f03c:91ff:fe18:bb30", + "@addrtype": "ipv6", + }, + "ports": { + "port": { + "@portid": "80", + "@protocol": "tcp", + "state": {"@state": "open"}, + "service": {"@name": "http"}, + } + }, + }, + ] + } +} + IPV6_HUMAN_OUTPUT = """ # Nmap 7.92 scan initiated Mon Mar 28 15:05:11 2022 as: nmap -sV --script=banner -n "-p 0-65535" -T5 -oX /tmp/xmloutput -oN /tmp/normal 2600:3c01:224a:6e00:f03c:91ff:fe18:bb2f │ │ Warning: 2600:3c01:224a:6e00:f03c:91ff:fe18:bb2f giving up on port because retransmission cap hit (2). │ │ @@ -225,6 +262,155 @@ """ +def testNmapAgentLifecycle_whenIpv6WithHostBits_shouldReport( + nmap_test_agent: nmap_agent.NmapAgent, + agent_mock: List[message.Message], + agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], + ipv6_msg: message.Message, + mocker: plugin.MockerFixture, +) -> None: + """Unit test of nmap agent when ipv6 with host bits is provided.""" + mocker.patch( + "agent.nmap_wrapper.NmapWrapper.scan_hosts", + return_value=(IPV6_JSON_OUTPUT, IPV6_HUMAN_OUTPUT), + ) + + nmap_test_agent.process(ipv6_msg) + + assert len(agent_mock) == 2 + assert agent_mock[0].selector == "v3.asset.ip.v6.port.service" + assert agent_mock[1].selector == "v3.report.vulnerability" + assert agent_mock[1].data["risk_rating"] == "INFO" + assert agent_mock[1].data["title"] == "Network Port Scan" + assert ( + "2600:3c01:224a:6e00:f03c:91ff:fe18:bb2f" + in agent_mock[1].data["technical_detail"] + ) + + +def testNmapAgent_whenInvalidIpv6_shouldReportRaiseAnError( + nmap_test_agent: nmap_agent.NmapAgent, + agent_mock: List[message.Message], + mocker: plugin.MockerFixture, +) -> None: + """Test handling of invalid IPv6 addresses.""" + msg = message.Message.from_data( + selector="v3.asset.ip.v6", + data={ + "version": 6, + "host": "invalid_ipv6", + "mask": "112", + }, + ) + + # Mock _normalize_ipv6_address to raise the expected error + mocker.patch.object( + nmap_test_agent, + "_normalize_ipv6_address", + side_effect=ipaddress.AddressValueError("Invalid IPv6 address"), + ) + + nmap_test_agent.process(msg) # Should handle the error gracefully + assert len(agent_mock) == 0 # No messages should be emitted for invalid address + + +def testNmapAgent_whenIpv6SubnetBelowLimit_shouldReportNothing( + nmap_test_agent: nmap_agent.NmapAgent, + agent_mock: List[message.Message], + mocker: plugin.MockerFixture, +) -> None: + """Test handling of IPv6 subnets below the allowed limit.""" + msg = message.Message.from_data( + selector="v3.asset.ip.v6", + data={ + "version": 6, + "host": "2600:3c01:224a:6e00::", + "mask": "96", # Below IPV6_CIDR_LIMIT + }, + ) + + # Mock _normalize_ipv6_address to return valid address + mocker.patch.object( + nmap_test_agent, "_normalize_ipv6_address", return_value="2600:3c01:224a:6e00::" + ) + + nmap_test_agent.process(msg) + assert len(agent_mock) == 0 # No messages should be emitted for subnet below limit + + +def testNmapAgent_whenLargeIpv6Subnet_shouldPerformBatchProcessing( + nmap_test_agent: nmap_agent.NmapAgent, + agent_mock: List[message.Message], + agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], + large_subnet_ipv6_msg: message.Message, + mocker: plugin.MockerFixture, +) -> None: + """Test batch processing of large IPv6 subnets.""" + scan_mock = mocker.patch( + "agent.nmap_wrapper.NmapWrapper.scan_hosts", + return_value=(LARGE_IPV6_SUBNET_JSON_OUTPUT, IPV6_HUMAN_OUTPUT), + ) + + nmap_test_agent.process(large_subnet_ipv6_msg) + + assert scan_mock.call_count > 0 + assert ( + len([m for m in agent_mock if m.selector == "v3.asset.ip.v6.port.service"]) >= 2 + ) + + +def testNmapAgent_whenIpv6SpecificOptions_shouldBeSetProperly( + nmap_test_agent: nmap_agent.NmapAgent, + agent_mock: List[message.Message], + mocker: plugin.MockerFixture, +) -> None: + """Test IPv6-specific scan options are properly set.""" + msg = message.Message.from_data( + selector="v3.asset.ip.v6", + data={ + "version": 6, + "host": "2600:3c01:224a:6e00::", + "mask": "128", + }, + ) + + mock_wrapper = mocker.patch("agent.nmap_wrapper.NmapWrapper") + mock_wrapper.return_value.scan_hosts.return_value = ( + IPV6_JSON_OUTPUT, + IPV6_HUMAN_OUTPUT, + ) + + mocker.patch.object( + nmap_test_agent, "_normalize_ipv6_address", return_value="2600:3c01:224a:6e00::" + ) + + mocker.patch.object(nmap_test_agent, "add_ip_network", return_value=True) + + nmap_test_agent.process(msg) + + mock_wrapper.assert_called_once() + options = mock_wrapper.call_args[0][0] + assert hasattr(options, "ping_timeout") + assert hasattr(options, "fragment_mtu") + assert options.fragment_mtu == 1280 # Default IPv6 MTU + + +def testNmapAgent_whenIpv6_addressNormalizationShouldBeDone( + nmap_test_agent: nmap_agent.NmapAgent, + mocker: plugin.MockerFixture, +) -> None: + """Test IPv6 address normalization.""" + test_cases = [ + ("2001:db8:0:0:0:0:0:1", "2001:db8::1"), # Compressed format + ("2001:0db8:0000:0000:0000:0000:0000:0001", "2001:db8::1"), # Full format + ("2001:DB8::1", "2001:db8::1"), # Mixed case + ] + + for input_addr, expected_addr in test_cases: + normalized = nmap_test_agent._normalize_ipv6_address(input_addr) + assert normalized == expected_addr + + def testAgentLifecycle_whenScanRunsWithoutErrors_emitsBackMessagesAndVulnerability( nmap_test_agent: nmap_agent.NmapAgent, agent_mock: List[message.Message], @@ -825,18 +1011,26 @@ def testNmapAgent_whenIpv6WithoutMask_agentShouldNotGetStuck( assert len(agent_mock) == 0 -def testNmapAgent_whenIpv6AboveLimit_agentShouldRaiseError( +def testNmapAgent_whenIpv6AboveLimit_agentShouldLogError( nmap_test_agent: nmap_agent.NmapAgent, agent_mock: List[message.Message], agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], ipv6_msg_above_limit: message.Message, + mocker: plugin.MockerFixture, + caplog: pytest.LogCaptureFixture, ) -> None: - """Unit test of nmap agent when ipv6 above limit is provided, the agent should raise an error.""" - with pytest.raises(ValueError) as error_message: + """Unit test for IPv6 subnet mask validation.""" + mocker.patch.object( + nmap_test_agent, "_normalize_ipv6_address", return_value="2600:3c01:224a:6e00::" + ) + + with caplog.at_level(logging.ERROR): nmap_test_agent.process(ipv6_msg_above_limit) - assert len(agent_mock) == 0 - assert error_message.value.args[0] == "Subnet mask below 112 is not supported" + assert ( + f"IPv6 subnet mask below {nmap_agent.IPV6_CIDR_LIMIT} is not supported" + in caplog.text + ) def testAgentNmap_whenInvalidDomainName_doesNotCrash( From c8e6e9d5afe8a9f9f9175df599a3d11c5423b55b Mon Sep 17 00:00:00 2001 From: nmasdoufi-ol Date: Mon, 2 Dec 2024 15:04:47 +0100 Subject: [PATCH 2/7] Improved IPv6 Handling for Nmap Agent --- agent/nmap_agent.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/agent/nmap_agent.py b/agent/nmap_agent.py index 1b8697d7..2483ef45 100644 --- a/agent/nmap_agent.py +++ b/agent/nmap_agent.py @@ -150,18 +150,10 @@ def process(self, message: msg.Message) -> None: # Validate the mask max_mask = int(self.args.get("max_network_mask_ipv6", "128")) if mask < max_mask: - try: - # Only process valid subnets - for subnet in ipaddress.ip_network( - f"{normalized_host}/{mask}", strict=False - ).subnets(new_prefix=max_mask): - if isinstance(subnet, ipaddress.IPv6Network): - hosts.append((str(subnet.network_address), max_mask)) - else: - logger.error("Unexpected subnet type for IPv6: %s", subnet) - except ValueError as e: - logger.error("Failed to process IPv6 network: %s", e) - return + for subnet in ipaddress.ip_network( + f"{normalized_host}/{mask}", strict=False + ).subnets(new_prefix=max_mask): + hosts.append((str(subnet.network_address), max_mask)) else: hosts = [(normalized_host, mask)] From f1dcc09b63bcb32eaa3229b169e641fc1f811f7d Mon Sep 17 00:00:00 2001 From: nmasdoufi-ol Date: Mon, 2 Dec 2024 15:18:01 +0100 Subject: [PATCH 3/7] Improved IPv6 Handling for Nmap Agent --- agent/nmap_agent.py | 6 +----- tests/nmap_agent_test.py | 27 +++++++-------------------- 2 files changed, 8 insertions(+), 25 deletions(-) diff --git a/agent/nmap_agent.py b/agent/nmap_agent.py index 2483ef45..3d25f488 100644 --- a/agent/nmap_agent.py +++ b/agent/nmap_agent.py @@ -134,11 +134,7 @@ def process(self, message: msg.Message) -> None: hosts = [(host, mask)] elif "v6" in message.selector: # Handle IPv6 address normalization - try: - normalized_host = self._normalize_ipv6_address(host) - except ipaddress.AddressValueError: - logger.error("Invalid IPv6 address: %s", host) - return + normalized_host = self._normalize_ipv6_address(host) mask = int(message.data.get("mask", DEFAULT_MASK_IPV6)) if mask < IPV6_CIDR_LIMIT: diff --git a/tests/nmap_agent_test.py b/tests/nmap_agent_test.py index 01f8b90e..c6306aac 100644 --- a/tests/nmap_agent_test.py +++ b/tests/nmap_agent_test.py @@ -288,30 +288,17 @@ def testNmapAgentLifecycle_whenIpv6WithHostBits_shouldReport( ) -def testNmapAgent_whenInvalidIpv6_shouldReportRaiseAnError( +def testNmapAgent_whenInvalidIpv6_shouldRaiseAnError( nmap_test_agent: nmap_agent.NmapAgent, agent_mock: List[message.Message], - mocker: plugin.MockerFixture, + invalid_ipv6_msg: message.Message, ) -> None: - """Test handling of invalid IPv6 addresses.""" - msg = message.Message.from_data( - selector="v3.asset.ip.v6", - data={ - "version": 6, - "host": "invalid_ipv6", - "mask": "112", - }, - ) + """Test that invalid IPv6 raises an AddressValueError.""" + with pytest.raises(ipaddress.AddressValueError, match="At least 3 parts expected"): + nmap_test_agent.process(invalid_ipv6_msg) - # Mock _normalize_ipv6_address to raise the expected error - mocker.patch.object( - nmap_test_agent, - "_normalize_ipv6_address", - side_effect=ipaddress.AddressValueError("Invalid IPv6 address"), - ) - - nmap_test_agent.process(msg) # Should handle the error gracefully - assert len(agent_mock) == 0 # No messages should be emitted for invalid address + # Ensure no messages were emitted + assert len(agent_mock) == 0 def testNmapAgent_whenIpv6SubnetBelowLimit_shouldReportNothing( From a76273f941018ad09c01d7859c36556f933628fc Mon Sep 17 00:00:00 2001 From: nmasdoufi-ol Date: Tue, 3 Dec 2024 14:01:53 +0100 Subject: [PATCH 4/7] Resolved Comments. --- agent/nmap_agent.py | 16 +++++----------- tests/nmap_agent_test.py | 2 +- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/agent/nmap_agent.py b/agent/nmap_agent.py index 3d25f488..38be0d08 100644 --- a/agent/nmap_agent.py +++ b/agent/nmap_agent.py @@ -44,11 +44,11 @@ DEFAULT_MASK_IPV6 = 128 # scan up to 65536 host -IPV6_CIDR_LIMIT = 112 +CIDR_LIMIT = 112 # More IPv6-specific constants IPV6_MIN_PREFIX = 8 # Minimum safe prefix length for IPv6 -IPV6_DEFAULT_PING_TIMEOUT = 1000 # ms -IPV6_MAX_BATCH_SIZE = 100 # Maximum number of IPv6 addresses to scan in one batch +# IPV6_DEFAULT_PING_TIMEOUT = 1000 # ms +MAX_BATCH_SIZE = 100 # Maximum number of IPv6 addresses to scan in one batch BLACKLISTED_SERVICES = ["tcpwrapped"] @@ -97,9 +97,6 @@ def _normalize_ipv6_address(self, address: str) -> str: def _get_ipv6_scan_options(self) -> dict[str, Any]: """Get IPv6-specific scan options.""" return { - "ping_timeout": self.args.get( - "ipv6_ping_timeout", IPV6_DEFAULT_PING_TIMEOUT - ), "min_rate": self.args.get("ipv6_min_rate", 100), "max_rate": self.args.get("ipv6_max_rate", 1000), "max_retries": self.args.get("ipv6_max_retries", 2), @@ -137,10 +134,8 @@ def process(self, message: msg.Message) -> None: normalized_host = self._normalize_ipv6_address(host) mask = int(message.data.get("mask", DEFAULT_MASK_IPV6)) - if mask < IPV6_CIDR_LIMIT: - logger.error( - "IPv6 subnet mask below %s is not supported", IPV6_CIDR_LIMIT - ) + if mask < CIDR_LIMIT: + logger.error("IPv6 subnet mask below %s is not supported", CIDR_LIMIT) return # Validate the mask @@ -217,7 +212,6 @@ def _scan_host(self, host: str, mask: int) -> Tuple[Dict[str, Any], str]: if is_ipv6 is True: # Add IPv6-specific options ipv6_options = self._get_ipv6_scan_options() - options.ping_timeout = ipv6_options["ping_timeout"] options.min_rate = ipv6_options["min_rate"] options.max_rate = ipv6_options["max_rate"] options.max_retries = ipv6_options["max_retries"] diff --git a/tests/nmap_agent_test.py b/tests/nmap_agent_test.py index c6306aac..60e6c36c 100644 --- a/tests/nmap_agent_test.py +++ b/tests/nmap_agent_test.py @@ -1015,7 +1015,7 @@ def testNmapAgent_whenIpv6AboveLimit_agentShouldLogError( nmap_test_agent.process(ipv6_msg_above_limit) assert ( - f"IPv6 subnet mask below {nmap_agent.IPV6_CIDR_LIMIT} is not supported" + f"IPv6 subnet mask below {nmap_agent.CIDR_LIMIT} is not supported" in caplog.text ) From f9aee09b5328e2b8b88bb74ecfb06bb488a54f4e Mon Sep 17 00:00:00 2001 From: nmasdoufi-ol Date: Tue, 3 Dec 2024 14:42:09 +0100 Subject: [PATCH 5/7] Resolved Comments. --- agent/nmap_agent.py | 46 ++++++------------- agent/nmap_options.py | 25 ++--------- tests/nmap_agent_test.py | 96 ++-------------------------------------- 3 files changed, 19 insertions(+), 148 deletions(-) diff --git a/agent/nmap_agent.py b/agent/nmap_agent.py index 38be0d08..70212fbc 100644 --- a/agent/nmap_agent.py +++ b/agent/nmap_agent.py @@ -44,7 +44,7 @@ DEFAULT_MASK_IPV6 = 128 # scan up to 65536 host -CIDR_LIMIT = 112 +IPV6_CIDR_LIMIT = 112 # More IPv6-specific constants IPV6_MIN_PREFIX = 8 # Minimum safe prefix length for IPv6 # IPV6_DEFAULT_PING_TIMEOUT = 1000 # ms @@ -78,7 +78,9 @@ def __init__( self._scope_domain_regex: Optional[str] = self.args.get("scope_domain_regex") self._vpn_config: Optional[str] = self.args.get("vpn_config") self._dns_config: Optional[str] = self.args.get("dns_config") - self._validate_ipv6_settings() + self._ipv6_cidr_limit: int = int( + self.args.get("ipv6_cidr_limit", IPV6_CIDR_LIMIT) + ) def _validate_ipv6_settings(self) -> None: """Validate IPv6-specific settings.""" @@ -86,23 +88,6 @@ def _validate_ipv6_settings(self) -> None: if max_mask < IPV6_MIN_PREFIX or max_mask > 128: raise ValueError(f"IPv6 prefix must be between {IPV6_MIN_PREFIX} and 128") - def _normalize_ipv6_address(self, address: str) -> str: - """Normalize IPv6 address format.""" - try: - return str(ipaddress.IPv6Address(address).compressed) - except ipaddress.AddressValueError as e: - logger.error("Invalid IPv6 address: %s", e) - raise - - def _get_ipv6_scan_options(self) -> dict[str, Any]: - """Get IPv6-specific scan options.""" - return { - "min_rate": self.args.get("ipv6_min_rate", 100), - "max_rate": self.args.get("ipv6_max_rate", 1000), - "max_retries": self.args.get("ipv6_max_retries", 2), - "fragment_mtu": self.args.get("ipv6_fragment_mtu", 1280), - } - def start(self) -> None: if self._vpn_config is not None and self._dns_config is not None: self._connect_to_vpn() @@ -130,15 +115,17 @@ def process(self, message: msg.Message) -> None: else: hosts = [(host, mask)] elif "v6" in message.selector: - # Handle IPv6 address normalization - normalized_host = self._normalize_ipv6_address(host) - mask = int(message.data.get("mask", DEFAULT_MASK_IPV6)) - if mask < CIDR_LIMIT: - logger.error("IPv6 subnet mask below %s is not supported", CIDR_LIMIT) + if mask < IPV6_CIDR_LIMIT: + logger.error( + "IPv6 subnet mask below %s is not supported", IPV6_CIDR_LIMIT + ) return - # Validate the mask + # Normalize IPv6 address + ip = ipaddress.IPv6Address(host) + normalized_host = str(ip.exploded) + max_mask = int(self.args.get("max_network_mask_ipv6", "128")) if mask < max_mask: for subnet in ipaddress.ip_network( @@ -207,16 +194,9 @@ def _scan_host(self, host: str, mask: int) -> Tuple[Dict[str, Any], str]: scripts=self.args.get("scripts"), script_default=self.args.get("script_default", False), version_detection=self.args.get("version_info", False), + ipv6_enabled=is_ipv6, ) - if is_ipv6 is True: - # Add IPv6-specific options - ipv6_options = self._get_ipv6_scan_options() - options.min_rate = ipv6_options["min_rate"] - options.max_rate = ipv6_options["max_rate"] - options.max_retries = ipv6_options["max_retries"] - options.fragment_mtu = ipv6_options["fragment_mtu"] - client = nmap_wrapper.NmapWrapper(options) logger.info("scanning target %s/%s with options %s", host, mask, options) diff --git a/agent/nmap_options.py b/agent/nmap_options.py index 7d82c2a9..496eb237 100644 --- a/agent/nmap_options.py +++ b/agent/nmap_options.py @@ -62,12 +62,7 @@ class NmapOptions: ) no_ping: bool = True privileged: Optional[bool] = None - - ping_timeout: int | None = None # Example: Timeout for pings (in milliseconds) - min_rate: int | None = None # Minimum number of packets per second - max_rate: int | None = None # Maximum number of packets per second - max_retries: int | None = None # Maximum number of retries for probes - fragment_mtu: int | None = None # MTU for fragmented packets + ipv6_enabled: bool = False def _set_os_detection_option(self) -> List[str]: """Appends the os detection option to the list of nmap options.""" @@ -157,25 +152,12 @@ def _run_scripts_command(self, scripts: List[str]) -> List[str]: return command - def _set_additional_options(self) -> List[str]: - """Appends additional options for fine-tuning Nmap behavior.""" - command_options = [] - if self.ping_timeout is not None: - command_options.append(f"--host-timeout {self.ping_timeout}ms") - if self.min_rate is not None: - command_options.append(f"--min-rate {self.min_rate}") - if self.max_rate is not None: - command_options.append(f"--max-rate {self.max_rate}") - if self.max_retries is not None: - command_options.append(f"--max-retries {self.max_retries}") - if self.fragment_mtu is not None: - command_options.append(f"--mtu {self.fragment_mtu}") - return command_options - @property def command_options(self) -> List[str]: """Computes the list of nmap options.""" command_options = [] + if self.ipv6_enabled is True: + command_options.append("-6") command_options.extend(self._set_os_detection_option()) command_options.extend(self._set_version_detection_option()) command_options.extend(self._set_dns_resolution_option()) @@ -186,5 +168,4 @@ def command_options(self) -> List[str]: command_options.extend(self._set_privileged()) command_options.extend(self._set_scripts()) command_options.extend(self._set_script_default()) - command_options.extend(self._set_additional_options()) return command_options diff --git a/tests/nmap_agent_test.py b/tests/nmap_agent_test.py index 60e6c36c..6cbe150e 100644 --- a/tests/nmap_agent_test.py +++ b/tests/nmap_agent_test.py @@ -3,9 +3,10 @@ import ipaddress import json import logging -from typing import List, Dict, Union import subprocess +from typing import List, Dict, Union +import pytest import requests_mock as rq_mock from ostorlab.agent.message import message from ostorlab.utils import defintions as utils_definitions @@ -13,7 +14,6 @@ from agent import nmap_agent from agent import nmap_options -import pytest SCAN_RESULT_HOST_AS_LIST = { "nmaprun": { @@ -265,7 +265,6 @@ def testNmapAgentLifecycle_whenIpv6WithHostBits_shouldReport( nmap_test_agent: nmap_agent.NmapAgent, agent_mock: List[message.Message], - agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], ipv6_msg: message.Message, mocker: plugin.MockerFixture, ) -> None: @@ -304,7 +303,6 @@ def testNmapAgent_whenInvalidIpv6_shouldRaiseAnError( def testNmapAgent_whenIpv6SubnetBelowLimit_shouldReportNothing( nmap_test_agent: nmap_agent.NmapAgent, agent_mock: List[message.Message], - mocker: plugin.MockerFixture, ) -> None: """Test handling of IPv6 subnets below the allowed limit.""" msg = message.Message.from_data( @@ -316,11 +314,6 @@ def testNmapAgent_whenIpv6SubnetBelowLimit_shouldReportNothing( }, ) - # Mock _normalize_ipv6_address to return valid address - mocker.patch.object( - nmap_test_agent, "_normalize_ipv6_address", return_value="2600:3c01:224a:6e00::" - ) - nmap_test_agent.process(msg) assert len(agent_mock) == 0 # No messages should be emitted for subnet below limit @@ -328,7 +321,6 @@ def testNmapAgent_whenIpv6SubnetBelowLimit_shouldReportNothing( def testNmapAgent_whenLargeIpv6Subnet_shouldPerformBatchProcessing( nmap_test_agent: nmap_agent.NmapAgent, agent_mock: List[message.Message], - agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], large_subnet_ipv6_msg: message.Message, mocker: plugin.MockerFixture, ) -> None: @@ -346,62 +338,9 @@ def testNmapAgent_whenLargeIpv6Subnet_shouldPerformBatchProcessing( ) -def testNmapAgent_whenIpv6SpecificOptions_shouldBeSetProperly( - nmap_test_agent: nmap_agent.NmapAgent, - agent_mock: List[message.Message], - mocker: plugin.MockerFixture, -) -> None: - """Test IPv6-specific scan options are properly set.""" - msg = message.Message.from_data( - selector="v3.asset.ip.v6", - data={ - "version": 6, - "host": "2600:3c01:224a:6e00::", - "mask": "128", - }, - ) - - mock_wrapper = mocker.patch("agent.nmap_wrapper.NmapWrapper") - mock_wrapper.return_value.scan_hosts.return_value = ( - IPV6_JSON_OUTPUT, - IPV6_HUMAN_OUTPUT, - ) - - mocker.patch.object( - nmap_test_agent, "_normalize_ipv6_address", return_value="2600:3c01:224a:6e00::" - ) - - mocker.patch.object(nmap_test_agent, "add_ip_network", return_value=True) - - nmap_test_agent.process(msg) - - mock_wrapper.assert_called_once() - options = mock_wrapper.call_args[0][0] - assert hasattr(options, "ping_timeout") - assert hasattr(options, "fragment_mtu") - assert options.fragment_mtu == 1280 # Default IPv6 MTU - - -def testNmapAgent_whenIpv6_addressNormalizationShouldBeDone( - nmap_test_agent: nmap_agent.NmapAgent, - mocker: plugin.MockerFixture, -) -> None: - """Test IPv6 address normalization.""" - test_cases = [ - ("2001:db8:0:0:0:0:0:1", "2001:db8::1"), # Compressed format - ("2001:0db8:0000:0000:0000:0000:0000:0001", "2001:db8::1"), # Full format - ("2001:DB8::1", "2001:db8::1"), # Mixed case - ] - - for input_addr, expected_addr in test_cases: - normalized = nmap_test_agent._normalize_ipv6_address(input_addr) - assert normalized == expected_addr - - def testAgentLifecycle_whenScanRunsWithoutErrors_emitsBackMessagesAndVulnerability( nmap_test_agent: nmap_agent.NmapAgent, agent_mock: List[message.Message], - agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], ipv4_msg: message.Message, mocker: plugin.MockerFixture, ) -> None: @@ -431,7 +370,6 @@ def testAgentLifecycle_whenScanRunsWithoutErrors_emitsBackMessagesAndVulnerabili def testAgentLifecycle_whenScanRunsWithoutErrors_emitsBackVulnerabilityMsg( nmap_test_agent: nmap_agent.NmapAgent, agent_mock: List[message.Message], - agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], ipv4_msg: message.Message, mocker: plugin.MockerFixture, ) -> None: @@ -455,7 +393,6 @@ def testAgentLifecycle_whenScanRunsWithoutErrors_emitsBackVulnerabilityMsg( def testAgentLifecycle_whenLinkAssetAndScanRunsWithoutErrors_emitsBackMessagesAndVulnerability( nmap_test_agent: nmap_agent.NmapAgent, agent_mock: List[message.Message], - agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], link_msg: message.Message, mocker: plugin.MockerFixture, ) -> None: @@ -492,7 +429,6 @@ def testAgentLifecycle_whenLinkAssetAndScanRunsWithoutErrors_emitsBackMessagesAn def testAgentEmitBanner_whenScanRunsWithoutErrors_emitsMsgWithBanner( nmap_test_agent: nmap_agent.NmapAgent, agent_mock: List[message.Message], - agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], ipv4_msg: message.Message, mocker: plugin.MockerFixture, fake_output: None | Dict[str, str], @@ -526,7 +462,6 @@ def testAgentEmitBanner_whenScanRunsWithoutErrors_emitsMsgWithBanner( def testAgentEmitBannerScanDomain_whenScanRunsWithoutErrors_emitsMsgWithBanner( nmap_test_agent: nmap_agent.NmapAgent, agent_mock: List[message.Message], - agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], domain_msg: message.Message, mocker: plugin.MockerFixture, fake_output: None | Dict[str, str], @@ -550,7 +485,6 @@ def testAgentEmitBannerScanDomain_whenScanRunsWithoutErrors_emitsMsgWithBanner( def testAgentScanDomain_whenScanRunsWithoutErrors_emitsDomainService( nmap_test_agent: nmap_agent.NmapAgent, agent_mock: List[message.Message], - agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], domain_msg: message.Message, mocker: plugin.MockerFixture, fake_output: None | Dict[str, str], @@ -582,7 +516,6 @@ def testAgentNmap_whenUrlsScriptsGivent_RunScan( nmap_test_agent_with_scripts_arg: nmap_agent.NmapAgent, requests_mock: rq_mock.mocker.Mocker, agent_mock: List[message.Message], - agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], domain_msg: message.Message, mocker: plugin.MockerFixture, fake_output: None | Dict[str, str], @@ -617,8 +550,6 @@ def testAgentNmap_whenUrlsScriptsGivent_RunScan( def testAgentNmapOptions_whenUrlsScriptsGivent_RunScan( nmap_test_agent_with_scripts_arg: nmap_agent.NmapAgent, requests_mock: rq_mock.mocker.Mocker, - agent_mock: List[message.Message], - agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], mocker: plugin.MockerFixture, fake_output: None | Dict[str, str], ) -> None: @@ -654,8 +585,6 @@ def testAgentNmapOptions_whenUrlsScriptsGivent_RunScan( def testAgentNmapOptions_whenUrlsScriptsGivent_RunScan2( nmap_test_agent_with_scripts_arg: nmap_agent.NmapAgent, requests_mock: rq_mock.mocker.Mocker, - agent_mock: List[message.Message], - agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], mocker: plugin.MockerFixture, fake_output_range: None | Dict[str, str], ) -> None: @@ -691,7 +620,6 @@ def testAgentNmapOptions_whenUrlsScriptsGivent_RunScan2( def testEmitFingerprints_whenScanFindsBanner_emitsFingerprint( nmap_test_agent: nmap_agent.NmapAgent, agent_mock: List[message.Message], - agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], domain_msg: message.Message, mocker: plugin.MockerFixture, fake_output: None | Dict[str, str], @@ -721,7 +649,6 @@ def testEmitFingerprints_whenScanFindsBanner_emitsFingerprint( def testAgentNmapOptions_withMaxNetworkMask_scansEachSubnet( nmap_test_agent: nmap_agent.NmapAgent, agent_mock: List[message.Message], - agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], ipv4_msg2: message.Message, mocker: plugin.MockerFixture, fake_output: None | Dict[str, str], @@ -757,7 +684,6 @@ def testAgentNmapOptions_withMaxNetworkMask_scansEachSubnet( def testAgentProcessMessage_whenASubnetIsTargetdAfterABiggerRangeIsPreviouslyScanned_subnetIsNotScanned( nmap_test_agent: nmap_agent.NmapAgent, agent_mock: List[message.Message], - agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], ipv4_msg_with_mask: message.Message, mocker: plugin.MockerFixture, ) -> None: @@ -786,7 +712,6 @@ def testAgentProcessMessage_whenASubnetIsTargetdAfterABiggerRangeIsPreviouslySca def testAgentEmitBannerScanDomain_withMultiplehostnames_reportVulnerabilities( nmap_test_agent: nmap_agent.NmapAgent, agent_mock: List[message.Message], - agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], domain_msg: message.Message, mocker: plugin.MockerFixture, fake_output: None | Dict[str, str], @@ -831,7 +756,6 @@ def testNmapAgent_withDomainScopeArgAndLinkMessageNotInScope_targetShouldNotBeSc def testAgentNmapOptions_whenServiceHasProduct_reportsFingerprint( nmap_test_agent: nmap_agent.NmapAgent, agent_mock: List[message.Message], - agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], domain_msg: message.Message, mocker: plugin.MockerFixture, fake_output_product: None | Dict[str, str], @@ -883,7 +807,6 @@ def testAgentNmapOptions_whenServiceHasProduct_reportsFingerprint( def testNmapAgent_whenHostIsNotUp_shouldNotRaisAnError( nmap_test_agent: nmap_agent.NmapAgent, agent_mock: List[message.Message], - agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], ipv4_msg: message.Message, mocker: plugin.MockerFixture, fake_output_with_down_host: None | Dict[str, str], @@ -904,7 +827,6 @@ def testNmapAgent_whenHostIsNotUp_shouldNotRaisAnError( def testNmapAgent_whenDomainIsNotUp_shouldNotRaisAnError( nmap_test_agent: nmap_agent.NmapAgent, agent_mock: List[message.Message], - agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], domain_is_down_msg: message.Message, mocker: plugin.MockerFixture, fake_output_with_down_host: None | Dict[str, str], @@ -924,8 +846,6 @@ def testNmapAgent_whenDomainIsNotUp_shouldNotRaisAnError( def testAgentLifecycle_whenScanRunsWithVpn_shouldConnectToVPN( nmap_agent_with_vpn_config_arg: nmap_agent.NmapAgent, - agent_mock: List[message.Message], - agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], ipv4_msg: message.Message, mocker: plugin.MockerFixture, ) -> None: @@ -946,8 +866,6 @@ def testAgentLifecycle_whenScanRunsWithVpn_shouldConnectToVPN( def testAgentNmap_whenNoHost_agentShouldNotCrash( nmap_test_agent: nmap_agent.NmapAgent, - agent_mock: List[message.Message], - agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], junk_msg: message.Message, mocker: plugin.MockerFixture, ) -> None: @@ -963,7 +881,6 @@ def testAgentNmap_whenNoHost_agentShouldNotCrash( def testNmapAgentLifecycle_whenIpv6WithHostBits_agentShouldNotCrash( nmap_test_agent: nmap_agent.NmapAgent, agent_mock: List[message.Message], - agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], ipv6_msg: message.Message, mocker: plugin.MockerFixture, ) -> None: @@ -1000,22 +917,15 @@ def testNmapAgent_whenIpv6WithoutMask_agentShouldNotGetStuck( def testNmapAgent_whenIpv6AboveLimit_agentShouldLogError( nmap_test_agent: nmap_agent.NmapAgent, - agent_mock: List[message.Message], - agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], ipv6_msg_above_limit: message.Message, - mocker: plugin.MockerFixture, caplog: pytest.LogCaptureFixture, ) -> None: """Unit test for IPv6 subnet mask validation.""" - mocker.patch.object( - nmap_test_agent, "_normalize_ipv6_address", return_value="2600:3c01:224a:6e00::" - ) - with caplog.at_level(logging.ERROR): nmap_test_agent.process(ipv6_msg_above_limit) assert ( - f"IPv6 subnet mask below {nmap_agent.CIDR_LIMIT} is not supported" + f"IPv6 subnet mask below {nmap_agent.IPV6_CIDR_LIMIT} is not supported" in caplog.text ) From 043b4e013a1d4c70f6582cd1163dd94c67e7429b Mon Sep 17 00:00:00 2001 From: nmasdoufi-ol Date: Tue, 3 Dec 2024 14:44:10 +0100 Subject: [PATCH 6/7] Resolved Comments. --- agent/nmap_agent.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/agent/nmap_agent.py b/agent/nmap_agent.py index 70212fbc..e11f3f0a 100644 --- a/agent/nmap_agent.py +++ b/agent/nmap_agent.py @@ -43,12 +43,8 @@ DNS_RESOLV_CONFIG_PATH = "/etc/resolv.conf" DEFAULT_MASK_IPV6 = 128 -# scan up to 65536 host IPV6_CIDR_LIMIT = 112 -# More IPv6-specific constants IPV6_MIN_PREFIX = 8 # Minimum safe prefix length for IPv6 -# IPV6_DEFAULT_PING_TIMEOUT = 1000 # ms -MAX_BATCH_SIZE = 100 # Maximum number of IPv6 addresses to scan in one batch BLACKLISTED_SERVICES = ["tcpwrapped"] From a7eaf3c2f151b2f87d78ff2f92e929e7b1011c90 Mon Sep 17 00:00:00 2001 From: nmasdoufi-ol Date: Tue, 3 Dec 2024 17:57:41 +0100 Subject: [PATCH 7/7] Remove unnecessary code --- agent/nmap_agent.py | 12 ------------ agent/nmap_options.py | 3 --- 2 files changed, 15 deletions(-) diff --git a/agent/nmap_agent.py b/agent/nmap_agent.py index e11f3f0a..485a148c 100644 --- a/agent/nmap_agent.py +++ b/agent/nmap_agent.py @@ -74,15 +74,6 @@ def __init__( self._scope_domain_regex: Optional[str] = self.args.get("scope_domain_regex") self._vpn_config: Optional[str] = self.args.get("vpn_config") self._dns_config: Optional[str] = self.args.get("dns_config") - self._ipv6_cidr_limit: int = int( - self.args.get("ipv6_cidr_limit", IPV6_CIDR_LIMIT) - ) - - def _validate_ipv6_settings(self) -> None: - """Validate IPv6-specific settings.""" - max_mask = int(self.args.get("max_network_mask_ipv6", "128")) - if max_mask < IPV6_MIN_PREFIX or max_mask > 128: - raise ValueError(f"IPv6 prefix must be between {IPV6_MIN_PREFIX} and 128") def start(self) -> None: if self._vpn_config is not None and self._dns_config is not None: @@ -177,8 +168,6 @@ def process(self, message: msg.Message) -> None: logger.error("Neither host or domain are set.") def _scan_host(self, host: str, mask: int) -> Tuple[Dict[str, Any], str]: - is_ipv6 = ":" in host # Simple check for IPv6 address - options = nmap_options.NmapOptions( dns_resolution=False, ports=self.args.get("ports"), @@ -190,7 +179,6 @@ def _scan_host(self, host: str, mask: int) -> Tuple[Dict[str, Any], str]: scripts=self.args.get("scripts"), script_default=self.args.get("script_default", False), version_detection=self.args.get("version_info", False), - ipv6_enabled=is_ipv6, ) client = nmap_wrapper.NmapWrapper(options) diff --git a/agent/nmap_options.py b/agent/nmap_options.py index 496eb237..3a4c31b0 100644 --- a/agent/nmap_options.py +++ b/agent/nmap_options.py @@ -62,7 +62,6 @@ class NmapOptions: ) no_ping: bool = True privileged: Optional[bool] = None - ipv6_enabled: bool = False def _set_os_detection_option(self) -> List[str]: """Appends the os detection option to the list of nmap options.""" @@ -156,8 +155,6 @@ def _run_scripts_command(self, scripts: List[str]) -> List[str]: def command_options(self) -> List[str]: """Computes the list of nmap options.""" command_options = [] - if self.ipv6_enabled is True: - command_options.append("-6") command_options.extend(self._set_os_detection_option()) command_options.extend(self._set_version_detection_option()) command_options.extend(self._set_dns_resolution_option())