diff --git a/README.md b/README.md index 411d2206..1b95ef0d 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,7 @@ Supported agent flags: * `fast_mode` (`-F`): Fast mode scans fewer ports than the default mode. * `ports` (`-p`): List of ports to scan. +* `top_ports` (`--top-ports`): Top ports to scan. * `no_ping` (`-Pn`): Treat all hosts as online, skip host discovery. * `version_info` (`-sV`): Probe open ports to determine service/version info. * `timing_template` (`-Tx`): Template of timing settings (T0, T1, ... T5).. diff --git a/agent/nmap_agent.py b/agent/nmap_agent.py index 7bc234cf..cf933deb 100644 --- a/agent/nmap_agent.py +++ b/agent/nmap_agent.py @@ -155,6 +155,7 @@ def _scan_host(self, host: str, mask: int) -> Tuple[Dict[str, Any], str]: options = nmap_options.NmapOptions( dns_resolution=False, ports=self.args.get("ports"), + top_ports=self.args.get("top_ports"), fast_mode=self.args.get("fast_mode", False), no_ping=self.args.get("no_ping", False), timing_template=nmap_options.TimingTemplate[self.args["timing_template"]], @@ -172,6 +173,7 @@ def _scan_domain(self, domain_name: str) -> Tuple[Dict[str, Any], str]: options = nmap_options.NmapOptions( dns_resolution=False, ports=self.args.get("ports"), + top_ports=self.args.get("top_ports"), fast_mode=self.args.get("fast_mode", False), no_ping=self.args.get("no_ping", False), timing_template=nmap_options.TimingTemplate[self.args["timing_template"]], diff --git a/agent/nmap_options.py b/agent/nmap_options.py index 799e9242..f1235668 100644 --- a/agent/nmap_options.py +++ b/agent/nmap_options.py @@ -44,7 +44,8 @@ class NmapOptions: dns_resolution: bool = True dns_servers: List[str] | None = None - ports: Optional[str] | None = None + ports: Optional[str] = None + top_ports: Optional[int] = None fast_mode: bool = False timing_template: TimingTemplate = TimingTemplate.T3 script_default: bool = False @@ -97,6 +98,8 @@ def _set_ports_option(self) -> List[str]: """Appends the ports option to the list of nmap options.""" if self.fast_mode is True: return ["-F"] + elif self.top_ports is not None: + return ["--top-ports", str(self.top_ports)] elif self.ports is not None: return ["-p", self.ports] else: diff --git a/agent/nmap_wrapper.py b/agent/nmap_wrapper.py index a8599c3e..c296202f 100644 --- a/agent/nmap_wrapper.py +++ b/agent/nmap_wrapper.py @@ -39,7 +39,7 @@ def __init__(self, options: nmap_options.NmapOptions) -> None: """ self._options = options - def _construct_command_host(self, host: str, mask: int) -> List[str]: + def construct_command_host(self, host: str, mask: int) -> List[str]: """ Construct the Nmap command to be run. @@ -96,7 +96,7 @@ def scan_hosts(self, hosts: str, mask: int) -> Tuple[Dict[str, Any], str]: result of the scan. """ logger.info("running the nmap scan") - command = self._construct_command_host(hosts, mask) + command = self.construct_command_host(hosts, mask) subprocess.run(command, check=True) diff --git a/ostorlab.yaml b/ostorlab.yaml index 3581e206..3d2e203b 100755 --- a/ostorlab.yaml +++ b/ostorlab.yaml @@ -19,6 +19,7 @@ description: | * `fast_mode` (`-F`): Fast mode scans fewer ports than the default mode. * `ports` (`-p`): List of ports to scan. + * `top_ports` (`--top-ports`): Top ports to scan. * `no_ping` (`-Pn`): Treat all hosts as online, skip host discovery. * `version_info` (`-sV`): Probe open ports to determine service/version info. * `timing_template` (`-Tx`): Template of timing settings (T0, T1, ... T5).. @@ -84,6 +85,9 @@ args: type: "string" description: "List of ports to scan." value: "0-65535" + - name: "top_ports" + type: "int" + description: "Top ports to scan." - name: "no_ping" description: "Treat all hosts as online, skip host discovery." type: "boolean" diff --git a/tests/conftest.py b/tests/conftest.py index 8a9a0fd8..365ac01b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -280,3 +280,94 @@ def ipv6_msg_above_limit() -> message.Message: "mask": "64", }, ) + + +@pytest.fixture(scope="function") +def nmap_agent_fast_mode( + agent_mock: List[message.Message], + agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], +) -> nmap_agent.NmapAgent: + """Fixture of the Nmap Agent to be used for testing purposes.""" + del agent_persist_mock + with (pathlib.Path(__file__).parent.parent / "ostorlab.yaml").open() as yaml_o: + definition = agent_definitions.AgentDefinition.from_yaml(yaml_o) + settings = runtime_definitions.AgentSettings( + key="agent/ostorlab/nmap_agent", + bus_url="NA", + bus_exchange_topic="NA", + args=[ + utils_definitions.Arg( + name="fast_mode", + type="boolean", + value=json.dumps(True).encode(), + ) + ], + healthcheck_port=5301, + redis_url="redis://guest:guest@localhost:6379", + ) + + agent = nmap_agent.NmapAgent(definition, settings) + return agent + + +@pytest.fixture(scope="function") +def nmap_agent_top_ports( + request: Any, + agent_mock: List[message.Message], + agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], +) -> nmap_agent.NmapAgent: + """Fixture of the Nmap Agent to be used for testing purposes.""" + del agent_persist_mock + with (pathlib.Path(__file__).parent.parent / "ostorlab.yaml").open() as yaml_o: + definition = agent_definitions.AgentDefinition.from_yaml(yaml_o) + settings = runtime_definitions.AgentSettings( + key="agent/ostorlab/nmap_agent", + bus_url="NA", + bus_exchange_topic="NA", + args=[ + utils_definitions.Arg( + name="fast_mode", + type="boolean", + value=json.dumps(False).encode(), + ), + utils_definitions.Arg( + name="top_ports", + type="int", + value=json.dumps("420").encode(), + ), + ], + healthcheck_port=5301, + redis_url="redis://guest:guest@localhost:6379", + ) + + agent = nmap_agent.NmapAgent(definition, settings) + return agent + + +@pytest.fixture(scope="function") +def nmap_agent_all_ports( + request: Any, + agent_mock: List[message.Message], + agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], +) -> nmap_agent.NmapAgent: + """Fixture of the Nmap Agent to be used for testing purposes.""" + del agent_persist_mock + with (pathlib.Path(__file__).parent.parent / "ostorlab.yaml").open() as yaml_o: + definition = agent_definitions.AgentDefinition.from_yaml(yaml_o) + settings = runtime_definitions.AgentSettings( + key="agent/ostorlab/nmap_agent", + bus_url="NA", + bus_exchange_topic="NA", + args=[ + utils_definitions.Arg( + name="fast_mode", + type="boolean", + value=json.dumps(False).encode(), + ) + ], + healthcheck_port=5301, + redis_url="redis://guest:guest@localhost:6379", + ) + + agent = nmap_agent.NmapAgent(definition, settings) + return agent diff --git a/tests/nmap_wrapper_test.py b/tests/nmap_wrapper_test.py new file mode 100644 index 00000000..850643b0 --- /dev/null +++ b/tests/nmap_wrapper_test.py @@ -0,0 +1,118 @@ +"""Nmap wrapper unit tests""" + +import agent.nmap_agent +from agent import nmap_options +from agent import nmap_wrapper + + +def testNmapWrapper_whenFastMode_returnCommand( + nmap_agent_fast_mode: agent.nmap_agent.NmapAgent, +) -> None: + args = nmap_agent_fast_mode.args + options = nmap_options.NmapOptions( + dns_resolution=False, + ports=args.get("ports"), + top_ports=args.get("top_ports"), + fast_mode=args.get("fast_mode", False), + no_ping=args.get("no_ping", False), + timing_template=nmap_options.TimingTemplate[args["timing_template"]], + scripts=args.get("scripts"), + script_default=args.get("script_default", False), + version_detection=args.get("version_info", False), + ) + client = nmap_wrapper.NmapWrapper(options) + + command = client.construct_command_host("127.0.0.1", 24) + + assert command == [ + "nmap", + "-sV", + "-n", + "-F", + "-T3", + "-sT", + "--script", + "banner", + "-sC", + "-oX", + "/tmp/xmloutput", + "-oN", + "/tmp/normal", + "127.0.0.1/24", + ] + + +def testNmapWrapper_whenTopPortsUsed_returnCommand( + nmap_agent_top_ports: agent.nmap_agent.NmapAgent, +) -> None: + args = nmap_agent_top_ports.args + options = nmap_options.NmapOptions( + dns_resolution=False, + ports=args.get("ports"), + top_ports=args.get("top_ports"), + fast_mode=args.get("fast_mode", False), + no_ping=args.get("no_ping", False), + timing_template=nmap_options.TimingTemplate[args["timing_template"]], + scripts=args.get("scripts"), + script_default=args.get("script_default", False), + version_detection=args.get("version_info", False), + ) + client = nmap_wrapper.NmapWrapper(options) + + command = client.construct_command_host("127.0.0.1", 24) + + assert command == [ + "nmap", + "-sV", + "-n", + "--top-ports", + "420", + "-T3", + "-sT", + "--script", + "banner", + "-sC", + "-oX", + "/tmp/xmloutput", + "-oN", + "/tmp/normal", + "127.0.0.1/24", + ] + + +def testNmapWrapper_whenAllTopPortsUsed_returnCommand( + nmap_agent_all_ports: agent.nmap_agent.NmapAgent, +) -> None: + args = nmap_agent_all_ports.args + options = nmap_options.NmapOptions( + dns_resolution=False, + ports=args.get("ports"), + top_ports=args.get("top_ports"), + fast_mode=args.get("fast_mode", False), + no_ping=args.get("no_ping", False), + timing_template=nmap_options.TimingTemplate[args["timing_template"]], + scripts=args.get("scripts"), + script_default=args.get("script_default", False), + version_detection=args.get("version_info", False), + ) + client = nmap_wrapper.NmapWrapper(options) + + command = client.construct_command_host("127.0.0.1", 24) + + assert command == [ + "nmap", + "-sV", + "-n", + "-p", + "0-65535", + "-T3", + "-sT", + "--script", + "banner", + "-sC", + "-oX", + "/tmp/xmloutput", + "-oN", + "/tmp/normal", + "127.0.0.1/24", + ]