Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add top ports to agent nmap #80

Merged
merged 6 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Supported agent flags:

* `fast_mode` (`-F`): Fast mode scans fewer ports than the default mode.
* `ports` (`-p`): List of ports to scan.
* `top_ports` (`--top-ports`): Top ports to scan.
* `no_ping` (`-Pn`): Treat all hosts as online, skip host discovery.
* `version_info` (`-sV`): Probe open ports to determine service/version info.
* `timing_template` (`-Tx`): Template of timing settings (T0, T1, ... T5)..
Expand Down
2 changes: 2 additions & 0 deletions agent/nmap_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def _scan_host(self, host: str, mask: int) -> Tuple[Dict[str, Any], str]:
options = nmap_options.NmapOptions(
dns_resolution=False,
ports=self.args.get("ports"),
top_ports=self.args.get("top_ports"),
fast_mode=self.args.get("fast_mode", False),
no_ping=self.args.get("no_ping", False),
timing_template=nmap_options.TimingTemplate[self.args["timing_template"]],
Expand All @@ -172,6 +173,7 @@ def _scan_domain(self, domain_name: str) -> Tuple[Dict[str, Any], str]:
options = nmap_options.NmapOptions(
dns_resolution=False,
ports=self.args.get("ports"),
top_ports=self.args.get("top_ports"),
fast_mode=self.args.get("fast_mode", False),
no_ping=self.args.get("no_ping", False),
timing_template=nmap_options.TimingTemplate[self.args["timing_template"]],
Expand Down
5 changes: 4 additions & 1 deletion agent/nmap_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class NmapOptions:

dns_resolution: bool = True
dns_servers: List[str] | None = None
ports: Optional[str] | None = None
ports: Optional[str] = None
top_ports: Optional[int] = None
fast_mode: bool = False
timing_template: TimingTemplate = TimingTemplate.T3
script_default: bool = False
Expand Down Expand Up @@ -97,6 +98,8 @@ def _set_ports_option(self) -> List[str]:
"""Appends the ports option to the list of nmap options."""
if self.fast_mode is True:
return ["-F"]
elif self.top_ports is not None:
return ["--top-ports", str(self.top_ports)]
3asm marked this conversation as resolved.
Show resolved Hide resolved
elif self.ports is not None:
return ["-p", self.ports]
else:
Expand Down
4 changes: 2 additions & 2 deletions agent/nmap_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, options: nmap_options.NmapOptions) -> None:
"""
self._options = options

def _construct_command_host(self, host: str, mask: int) -> List[str]:
def construct_command_host(self, host: str, mask: int) -> List[str]:
3asm marked this conversation as resolved.
Show resolved Hide resolved
"""
Construct the Nmap command to be run.

Expand Down Expand Up @@ -96,7 +96,7 @@ def scan_hosts(self, hosts: str, mask: int) -> Tuple[Dict[str, Any], str]:
result of the scan.
"""
logger.info("running the nmap scan")
command = self._construct_command_host(hosts, mask)
command = self.construct_command_host(hosts, mask)

subprocess.run(command, check=True)

Expand Down
4 changes: 4 additions & 0 deletions ostorlab.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ description: |

* `fast_mode` (`-F`): Fast mode scans fewer ports than the default mode.
* `ports` (`-p`): List of ports to scan.
* `top_ports` (`--top-ports`): Top ports to scan.
* `no_ping` (`-Pn`): Treat all hosts as online, skip host discovery.
* `version_info` (`-sV`): Probe open ports to determine service/version info.
* `timing_template` (`-Tx`): Template of timing settings (T0, T1, ... T5)..
Expand Down Expand Up @@ -84,6 +85,9 @@ args:
type: "string"
description: "List of ports to scan."
value: "0-65535"
- name: "top_ports"
type: "int"
description: "Top ports to scan."
- name: "no_ping"
description: "Treat all hosts as online, skip host discovery."
type: "boolean"
Expand Down
91 changes: 91 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,94 @@ def ipv6_msg_above_limit() -> message.Message:
"mask": "64",
},
)


@pytest.fixture(scope="function")
def nmap_agent_fast_mode(
agent_mock: List[message.Message],
agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]],
) -> nmap_agent.NmapAgent:
"""Fixture of the Nmap Agent to be used for testing purposes."""
del agent_persist_mock
with (pathlib.Path(__file__).parent.parent / "ostorlab.yaml").open() as yaml_o:
definition = agent_definitions.AgentDefinition.from_yaml(yaml_o)
settings = runtime_definitions.AgentSettings(
key="agent/ostorlab/nmap_agent",
bus_url="NA",
bus_exchange_topic="NA",
args=[
utils_definitions.Arg(
name="fast_mode",
type="boolean",
value=json.dumps(True).encode(),
)
],
healthcheck_port=5301,
redis_url="redis://guest:guest@localhost:6379",
)

agent = nmap_agent.NmapAgent(definition, settings)
return agent


@pytest.fixture(scope="function")
def nmap_agent_top_ports(
request: Any,
agent_mock: List[message.Message],
agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]],
) -> nmap_agent.NmapAgent:
"""Fixture of the Nmap Agent to be used for testing purposes."""
del agent_persist_mock
with (pathlib.Path(__file__).parent.parent / "ostorlab.yaml").open() as yaml_o:
definition = agent_definitions.AgentDefinition.from_yaml(yaml_o)
settings = runtime_definitions.AgentSettings(
key="agent/ostorlab/nmap_agent",
bus_url="NA",
bus_exchange_topic="NA",
args=[
utils_definitions.Arg(
name="fast_mode",
type="boolean",
value=json.dumps(False).encode(),
),
utils_definitions.Arg(
name="top_ports",
type="int",
value=json.dumps("420").encode(),
),
],
healthcheck_port=5301,
redis_url="redis://guest:guest@localhost:6379",
)

agent = nmap_agent.NmapAgent(definition, settings)
return agent


@pytest.fixture(scope="function")
def nmap_agent_all_ports(
request: Any,
agent_mock: List[message.Message],
agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]],
) -> nmap_agent.NmapAgent:
"""Fixture of the Nmap Agent to be used for testing purposes."""
del agent_persist_mock
with (pathlib.Path(__file__).parent.parent / "ostorlab.yaml").open() as yaml_o:
definition = agent_definitions.AgentDefinition.from_yaml(yaml_o)
settings = runtime_definitions.AgentSettings(
key="agent/ostorlab/nmap_agent",
bus_url="NA",
bus_exchange_topic="NA",
args=[
utils_definitions.Arg(
name="fast_mode",
type="boolean",
value=json.dumps(False).encode(),
)
],
healthcheck_port=5301,
redis_url="redis://guest:guest@localhost:6379",
)

agent = nmap_agent.NmapAgent(definition, settings)
return agent
118 changes: 118 additions & 0 deletions tests/nmap_wrapper_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""Nmap wrapper unit tests"""

import agent.nmap_agent
from agent import nmap_options
from agent import nmap_wrapper


def testNmapWrapper_whenFastMode_returnCommand(
nmap_agent_fast_mode: agent.nmap_agent.NmapAgent,
) -> None:
args = nmap_agent_fast_mode.args
options = nmap_options.NmapOptions(
dns_resolution=False,
ports=args.get("ports"),
top_ports=args.get("top_ports"),
fast_mode=args.get("fast_mode", False),
no_ping=args.get("no_ping", False),
timing_template=nmap_options.TimingTemplate[args["timing_template"]],
scripts=args.get("scripts"),
script_default=args.get("script_default", False),
version_detection=args.get("version_info", False),
)
client = nmap_wrapper.NmapWrapper(options)

command = client.construct_command_host("127.0.0.1", 24)

assert command == [
"nmap",
"-sV",
"-n",
"-F",
"-T3",
"-sT",
"--script",
"banner",
"-sC",
"-oX",
"/tmp/xmloutput",
"-oN",
"/tmp/normal",
"127.0.0.1/24",
]


def testNmapWrapper_whenTopPortsUsed_returnCommand(
nmap_agent_top_ports: agent.nmap_agent.NmapAgent,
) -> None:
args = nmap_agent_top_ports.args
options = nmap_options.NmapOptions(
dns_resolution=False,
ports=args.get("ports"),
top_ports=args.get("top_ports"),
fast_mode=args.get("fast_mode", False),
no_ping=args.get("no_ping", False),
timing_template=nmap_options.TimingTemplate[args["timing_template"]],
scripts=args.get("scripts"),
script_default=args.get("script_default", False),
version_detection=args.get("version_info", False),
)
client = nmap_wrapper.NmapWrapper(options)

command = client.construct_command_host("127.0.0.1", 24)

assert command == [
"nmap",
"-sV",
"-n",
"--top-ports",
"420",
"-T3",
"-sT",
"--script",
"banner",
"-sC",
"-oX",
"/tmp/xmloutput",
"-oN",
"/tmp/normal",
"127.0.0.1/24",
]


def testNmapWrapper_whenAllTopPortsUsed_returnCommand(
nmap_agent_all_ports: agent.nmap_agent.NmapAgent,
) -> None:
args = nmap_agent_all_ports.args
options = nmap_options.NmapOptions(
dns_resolution=False,
ports=args.get("ports"),
top_ports=args.get("top_ports"),
fast_mode=args.get("fast_mode", False),
no_ping=args.get("no_ping", False),
timing_template=nmap_options.TimingTemplate[args["timing_template"]],
scripts=args.get("scripts"),
script_default=args.get("script_default", False),
version_detection=args.get("version_info", False),
)
client = nmap_wrapper.NmapWrapper(options)

command = client.construct_command_host("127.0.0.1", 24)

assert command == [
"nmap",
"-sV",
"-n",
"-p",
"0-65535",
"-T3",
"-sT",
"--script",
"banner",
"-sC",
"-oX",
"/tmp/xmloutput",
"-oN",
"/tmp/normal",
"127.0.0.1/24",
]
Loading