Skip to content

Commit

Permalink
Merge pull request #80 from Ostorlab/feature/top_ports_flag
Browse files Browse the repository at this point in the history
Add top ports to agent nmap
  • Loading branch information
3asm authored Dec 6, 2023
2 parents 3525222 + fe6e65d commit c1868d4
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 3 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Supported agent flags:

* `fast_mode` (`-F`): Fast mode scans fewer ports than the default mode.
* `ports` (`-p`): List of ports to scan.
* `top_ports` (`--top-ports`): Top ports to scan.
* `no_ping` (`-Pn`): Treat all hosts as online, skip host discovery.
* `version_info` (`-sV`): Probe open ports to determine service/version info.
* `timing_template` (`-Tx`): Template of timing settings (T0, T1, ... T5)..
Expand Down
2 changes: 2 additions & 0 deletions agent/nmap_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def _scan_host(self, host: str, mask: int) -> Tuple[Dict[str, Any], str]:
options = nmap_options.NmapOptions(
dns_resolution=False,
ports=self.args.get("ports"),
top_ports=self.args.get("top_ports"),
fast_mode=self.args.get("fast_mode", False),
no_ping=self.args.get("no_ping", False),
timing_template=nmap_options.TimingTemplate[self.args["timing_template"]],
Expand All @@ -172,6 +173,7 @@ def _scan_domain(self, domain_name: str) -> Tuple[Dict[str, Any], str]:
options = nmap_options.NmapOptions(
dns_resolution=False,
ports=self.args.get("ports"),
top_ports=self.args.get("top_ports"),
fast_mode=self.args.get("fast_mode", False),
no_ping=self.args.get("no_ping", False),
timing_template=nmap_options.TimingTemplate[self.args["timing_template"]],
Expand Down
5 changes: 4 additions & 1 deletion agent/nmap_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class NmapOptions:

dns_resolution: bool = True
dns_servers: List[str] | None = None
ports: Optional[str] | None = None
ports: Optional[str] = None
top_ports: Optional[int] = None
fast_mode: bool = False
timing_template: TimingTemplate = TimingTemplate.T3
script_default: bool = False
Expand Down Expand Up @@ -97,6 +98,8 @@ def _set_ports_option(self) -> List[str]:
"""Appends the ports option to the list of nmap options."""
if self.fast_mode is True:
return ["-F"]
elif self.top_ports is not None:
return ["--top-ports", str(self.top_ports)]
elif self.ports is not None:
return ["-p", self.ports]
else:
Expand Down
4 changes: 2 additions & 2 deletions agent/nmap_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, options: nmap_options.NmapOptions) -> None:
"""
self._options = options

def _construct_command_host(self, host: str, mask: int) -> List[str]:
def construct_command_host(self, host: str, mask: int) -> List[str]:
"""
Construct the Nmap command to be run.
Expand Down Expand Up @@ -96,7 +96,7 @@ def scan_hosts(self, hosts: str, mask: int) -> Tuple[Dict[str, Any], str]:
result of the scan.
"""
logger.info("running the nmap scan")
command = self._construct_command_host(hosts, mask)
command = self.construct_command_host(hosts, mask)

subprocess.run(command, check=True)

Expand Down
4 changes: 4 additions & 0 deletions ostorlab.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ description: |
* `fast_mode` (`-F`): Fast mode scans fewer ports than the default mode.
* `ports` (`-p`): List of ports to scan.
* `top_ports` (`--top-ports`): Top ports to scan.
* `no_ping` (`-Pn`): Treat all hosts as online, skip host discovery.
* `version_info` (`-sV`): Probe open ports to determine service/version info.
* `timing_template` (`-Tx`): Template of timing settings (T0, T1, ... T5)..
Expand Down Expand Up @@ -84,6 +85,9 @@ args:
type: "string"
description: "List of ports to scan."
value: "0-65535"
- name: "top_ports"
type: "int"
description: "Top ports to scan."
- name: "no_ping"
description: "Treat all hosts as online, skip host discovery."
type: "boolean"
Expand Down
91 changes: 91 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,94 @@ def ipv6_msg_above_limit() -> message.Message:
"mask": "64",
},
)


@pytest.fixture(scope="function")
def nmap_agent_fast_mode(
agent_mock: List[message.Message],
agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]],
) -> nmap_agent.NmapAgent:
"""Fixture of the Nmap Agent to be used for testing purposes."""
del agent_persist_mock
with (pathlib.Path(__file__).parent.parent / "ostorlab.yaml").open() as yaml_o:
definition = agent_definitions.AgentDefinition.from_yaml(yaml_o)
settings = runtime_definitions.AgentSettings(
key="agent/ostorlab/nmap_agent",
bus_url="NA",
bus_exchange_topic="NA",
args=[
utils_definitions.Arg(
name="fast_mode",
type="boolean",
value=json.dumps(True).encode(),
)
],
healthcheck_port=5301,
redis_url="redis://guest:guest@localhost:6379",
)

agent = nmap_agent.NmapAgent(definition, settings)
return agent


@pytest.fixture(scope="function")
def nmap_agent_top_ports(
request: Any,
agent_mock: List[message.Message],
agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]],
) -> nmap_agent.NmapAgent:
"""Fixture of the Nmap Agent to be used for testing purposes."""
del agent_persist_mock
with (pathlib.Path(__file__).parent.parent / "ostorlab.yaml").open() as yaml_o:
definition = agent_definitions.AgentDefinition.from_yaml(yaml_o)
settings = runtime_definitions.AgentSettings(
key="agent/ostorlab/nmap_agent",
bus_url="NA",
bus_exchange_topic="NA",
args=[
utils_definitions.Arg(
name="fast_mode",
type="boolean",
value=json.dumps(False).encode(),
),
utils_definitions.Arg(
name="top_ports",
type="int",
value=json.dumps("420").encode(),
),
],
healthcheck_port=5301,
redis_url="redis://guest:guest@localhost:6379",
)

agent = nmap_agent.NmapAgent(definition, settings)
return agent


@pytest.fixture(scope="function")
def nmap_agent_all_ports(
request: Any,
agent_mock: List[message.Message],
agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]],
) -> nmap_agent.NmapAgent:
"""Fixture of the Nmap Agent to be used for testing purposes."""
del agent_persist_mock
with (pathlib.Path(__file__).parent.parent / "ostorlab.yaml").open() as yaml_o:
definition = agent_definitions.AgentDefinition.from_yaml(yaml_o)
settings = runtime_definitions.AgentSettings(
key="agent/ostorlab/nmap_agent",
bus_url="NA",
bus_exchange_topic="NA",
args=[
utils_definitions.Arg(
name="fast_mode",
type="boolean",
value=json.dumps(False).encode(),
)
],
healthcheck_port=5301,
redis_url="redis://guest:guest@localhost:6379",
)

agent = nmap_agent.NmapAgent(definition, settings)
return agent
118 changes: 118 additions & 0 deletions tests/nmap_wrapper_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""Nmap wrapper unit tests"""

import agent.nmap_agent
from agent import nmap_options
from agent import nmap_wrapper


def testNmapWrapper_whenFastMode_returnCommand(
nmap_agent_fast_mode: agent.nmap_agent.NmapAgent,
) -> None:
args = nmap_agent_fast_mode.args
options = nmap_options.NmapOptions(
dns_resolution=False,
ports=args.get("ports"),
top_ports=args.get("top_ports"),
fast_mode=args.get("fast_mode", False),
no_ping=args.get("no_ping", False),
timing_template=nmap_options.TimingTemplate[args["timing_template"]],
scripts=args.get("scripts"),
script_default=args.get("script_default", False),
version_detection=args.get("version_info", False),
)
client = nmap_wrapper.NmapWrapper(options)

command = client.construct_command_host("127.0.0.1", 24)

assert command == [
"nmap",
"-sV",
"-n",
"-F",
"-T3",
"-sT",
"--script",
"banner",
"-sC",
"-oX",
"/tmp/xmloutput",
"-oN",
"/tmp/normal",
"127.0.0.1/24",
]


def testNmapWrapper_whenTopPortsUsed_returnCommand(
nmap_agent_top_ports: agent.nmap_agent.NmapAgent,
) -> None:
args = nmap_agent_top_ports.args
options = nmap_options.NmapOptions(
dns_resolution=False,
ports=args.get("ports"),
top_ports=args.get("top_ports"),
fast_mode=args.get("fast_mode", False),
no_ping=args.get("no_ping", False),
timing_template=nmap_options.TimingTemplate[args["timing_template"]],
scripts=args.get("scripts"),
script_default=args.get("script_default", False),
version_detection=args.get("version_info", False),
)
client = nmap_wrapper.NmapWrapper(options)

command = client.construct_command_host("127.0.0.1", 24)

assert command == [
"nmap",
"-sV",
"-n",
"--top-ports",
"420",
"-T3",
"-sT",
"--script",
"banner",
"-sC",
"-oX",
"/tmp/xmloutput",
"-oN",
"/tmp/normal",
"127.0.0.1/24",
]


def testNmapWrapper_whenAllTopPortsUsed_returnCommand(
nmap_agent_all_ports: agent.nmap_agent.NmapAgent,
) -> None:
args = nmap_agent_all_ports.args
options = nmap_options.NmapOptions(
dns_resolution=False,
ports=args.get("ports"),
top_ports=args.get("top_ports"),
fast_mode=args.get("fast_mode", False),
no_ping=args.get("no_ping", False),
timing_template=nmap_options.TimingTemplate[args["timing_template"]],
scripts=args.get("scripts"),
script_default=args.get("script_default", False),
version_detection=args.get("version_info", False),
)
client = nmap_wrapper.NmapWrapper(options)

command = client.construct_command_host("127.0.0.1", 24)

assert command == [
"nmap",
"-sV",
"-n",
"-p",
"0-65535",
"-T3",
"-sT",
"--script",
"banner",
"-sC",
"-oX",
"/tmp/xmloutput",
"-oN",
"/tmp/normal",
"127.0.0.1/24",
]

0 comments on commit c1868d4

Please sign in to comment.