diff --git a/agent/nmap_agent.py b/agent/nmap_agent.py index a23641f3..2ef65eef 100644 --- a/agent/nmap_agent.py +++ b/agent/nmap_agent.py @@ -374,16 +374,26 @@ def _emit_fingerprints( raise ValueError(f"Incorrect ip version {version}") if host.get("os", {}).get("osmatch") is not None: - os_match_highest = host.get("os").get("osmatch", {})[0] + os_match = host.get("os").get("osmatch") + if len(os_match) > 0: + os_match_highest = os_match[0] + else: + continue + + if isinstance(os_match_highest, list) and len(os_match_highest) > 0: + os_match_highest = os_match_highest[0] + os_class = os_match_highest.get("osclass", {}) + + if isinstance(os_class, list) and len(os_class) > 0: + os_class = os_class[0] + elif os_class == []: + continue + fingerprint_data = { "host": host.get("address", {}).get("@addr"), "library_type": "OS", - "library_name": os_match_highest.get("osclass", {}).get( - "@osfamily" - ), - "library_version": os_match_highest.get("osclass").get( - "@osgen" - ), + "library_name": os_class.get("@osfamily"), + "library_version": os_class.get("@osgen"), "detail": os_match_highest.get("@name"), } self.emit(selector, fingerprint_data) diff --git a/tests/nmap_agent_test.py b/tests/nmap_agent_test.py index e99add96..413d2476 100644 --- a/tests/nmap_agent_test.py +++ b/tests/nmap_agent_test.py @@ -751,7 +751,7 @@ def testAgent_whenHostHaveOs_fingerprintMessageShouldHaveOs( agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], mocker: plugin.MockerFixture, ) -> None: - """Ensure the agents emits the detected library name with its version.""" + """Ensure the agent fingerprint OS.""" del agent_persist_mock product_fake_output = { "nmaprun": { @@ -810,3 +810,176 @@ def testAgent_whenHostHaveOs_fingerprintMessageShouldHaveOs( assert fingerprint_msg.data["host"] == "127.0.0.1" assert fingerprint_msg.data["library_name"] == "Windows" assert fingerprint_msg.data["library_version"] == "10" + + +def testAgent_whenOsClassIsList_fingerprintMessageShouldHaveOs( + nmap_test_agent: nmap_agent.NmapAgent, + agent_mock: List[message.Message], + ipv4_msg: message.Message, + agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], + mocker: plugin.MockerFixture, +) -> None: + """Ensure the agent handel osclass when it's a list.""" + del agent_persist_mock + product_fake_output = { + "nmaprun": { + "host": { + "address": {"@addr": "127.0.0.1", "@addrtype": "ipv4"}, + "ports": { + "port": { + "@portid": "22", + "@protocol": "tcp", + "state": { + "@state": "open", + "@reason": "syn-ack", + "@reason_ttl": "0", + }, + "service": { + "@name": "ssh", + "@product": "OpenSSH", + "@version": "7.4", + "cpe": "cpe:/a:openbsd:openssh:7.4", + }, + } + }, + "os": { + "osmatch": [ + [ + { + "@name": "Microsoft Windows 10 1511", + "@accuracy": "88", + "@line": "69505", + "osclass": [ + { + "@type": "specialized", + "@vendor": "Microsoft", + "@osfamily": "Windows", + "@osgen": "10", + "@accuracy": "88", + "cpe": "cpe:/o:microsoft:windows_10:1511", + } + ], + } + ] + ] + }, + } + } + } + + mocker.patch( + "agent.nmap_wrapper.NmapWrapper.scan_hosts", + return_value=(product_fake_output, ""), + ) + + nmap_test_agent.process(ipv4_msg) + + assert len(agent_mock) == 4 + assert agent_mock[0].selector == "v3.asset.ip.v4.port.service" + assert agent_mock[1].selector == "v3.report.vulnerability" + fingerprint_msg = agent_mock[2] + assert fingerprint_msg.selector == "v3.fingerprint.ip.v4.service.library" + assert fingerprint_msg.data["host"] == "127.0.0.1" + assert fingerprint_msg.data["library_name"] == "Windows" + assert fingerprint_msg.data["library_version"] == "10" + + +def testAgent_whenOsMatchIsEmptyList_fingerprintMessageShouldHaveOs( + nmap_test_agent: nmap_agent.NmapAgent, + agent_mock: List[message.Message], + ipv4_msg: message.Message, + agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], + mocker: plugin.MockerFixture, +) -> None: + """Ensure the agent handel osmatch when it's an empty list.""" + del agent_persist_mock + product_fake_output = { + "nmaprun": { + "host": { + "address": {"@addr": "127.0.0.1", "@addrtype": "ipv4"}, + "ports": { + "port": { + "@portid": "22", + "@protocol": "tcp", + "state": { + "@state": "open", + "@reason": "syn-ack", + "@reason_ttl": "0", + }, + "service": { + "@name": "ssh", + "@product": "OpenSSH", + "@version": "7.4", + "cpe": "cpe:/a:openbsd:openssh:7.4", + }, + } + }, + "os": {"osmatch": []}, + } + } + } + + mocker.patch( + "agent.nmap_wrapper.NmapWrapper.scan_hosts", + return_value=(product_fake_output, ""), + ) + + nmap_test_agent.process(ipv4_msg) + + assert len(agent_mock) == 2 + + +def testAgent_whenOsMatchIsList_fingerprintMessageShouldHaveOs( + nmap_test_agent: nmap_agent.NmapAgent, + agent_mock: List[message.Message], + ipv4_msg: message.Message, + agent_persist_mock: Dict[Union[str, bytes], Union[str, bytes]], + mocker: plugin.MockerFixture, +) -> None: + """Ensure the agent handel osmatch when it's a list.""" + del agent_persist_mock + product_fake_output = { + "nmaprun": { + "host": { + "address": {"@addr": "127.0.0.1", "@addrtype": "ipv4"}, + "ports": { + "port": { + "@portid": "22", + "@protocol": "tcp", + "state": { + "@state": "open", + "@reason": "syn-ack", + "@reason_ttl": "0", + }, + "service": { + "@name": "ssh", + "@product": "OpenSSH", + "@version": "7.4", + "cpe": "cpe:/a:openbsd:openssh:7.4", + }, + } + }, + "os": { + "osmatch": [ + [ + { + "@name": "Microsoft Windows 10 1511", + "@accuracy": "88", + "@line": "69505", + "osclass": [], + } + ] + ] + }, + } + } + } + + mocker.patch( + "agent.nmap_wrapper.NmapWrapper.scan_hosts", + return_value=(product_fake_output, ""), + ) + + nmap_test_agent.process(ipv4_msg) + + assert len(agent_mock) == 2