From 3b698510d64377fc728c646ad31e6546408507ea Mon Sep 17 00:00:00 2001 From: ErebusZ Date: Wed, 6 Mar 2024 14:17:12 +0100 Subject: [PATCH 1/6] Fix the segregation --- agent/process_scans.py | 15 ++--- agent/virus_total_agent.py | 26 +++----- tests/virus_total_agent_test.py | 111 ++++++++++++++++++++------------ 3 files changed, 82 insertions(+), 70 deletions(-) diff --git a/agent/process_scans.py b/agent/process_scans.py index 6c30386..a54b067 100644 --- a/agent/process_scans.py +++ b/agent/process_scans.py @@ -40,16 +40,9 @@ def get_technical_details(scans: dict[str, Any], target: str | None) -> str: return technical_detail -def split_scans_by_result( - scans: dict[str, Any], -) -> tuple[dict[str, Any], dict[str, Any]]: - secure_scans: dict[str, Any] = {} - vulnerable_scans: dict[str, Any] = {} - - for scan_type, scan_result in scans.items(): +def is_scan_malicious(scans: dict[str, Any]) -> bool: + for scan_result in scans.values(): if scan_result["detected"] is True: - vulnerable_scans[scan_type] = scan_result - else: - secure_scans[scan_type] = scan_result + return True - return secure_scans, vulnerable_scans + return False diff --git a/agent/virus_total_agent.py b/agent/virus_total_agent.py index 7a4c483..20fcf81 100644 --- a/agent/virus_total_agent.py +++ b/agent/virus_total_agent.py @@ -88,30 +88,20 @@ def _process_response(self, response: dict[str, Any], target: str | None) -> Non scans = virustotal.get_scans(response) try: if scans is not None: - ( - secure_scan_report, - vulnerable_scan_report, - ) = process_scans.split_scans_by_result(scans) - - if len(secure_scan_report) > 0: - technical_detail = process_scans.get_technical_details( - secure_scan_report, target - ) - self.report_vulnerability( - entry=kb.KB.SECURE_VIRUSTOTAL_SCAN, - technical_detail=technical_detail, - risk_rating=agent_report_vulnerability_mixin.RiskRating.SECURE, - ) + technical_detail = process_scans.get_technical_details(scans, target) - if len(vulnerable_scan_report) > 0: - technical_detail = process_scans.get_technical_details( - vulnerable_scan_report, target - ) + if process_scans.is_scan_malicious(scans) is True: self.report_vulnerability( entry=kb.KB.INSECURE_VIRUSTOTAL_SCAN, technical_detail=technical_detail, risk_rating=agent_report_vulnerability_mixin.RiskRating.HIGH, ) + else: + self.report_vulnerability( + entry=kb.KB.SECURE_VIRUSTOTAL_SCAN, + technical_detail=technical_detail, + risk_rating=agent_report_vulnerability_mixin.RiskRating.SECURE, + ) except NameError: logger.error("The scans list is empty.") diff --git a/tests/virus_total_agent_test.py b/tests/virus_total_agent_test.py index df289d0..89a8b1f 100644 --- a/tests/virus_total_agent_test.py +++ b/tests/virus_total_agent_test.py @@ -71,6 +71,35 @@ def virustotal_valid_response(message: msg.Message) -> dict[str, Any]: return response +def virustotal_secure_valid_response(message: msg.Message) -> dict[str, Any]: + """Method for mocking the Virus Total public API valid response.""" + del message + response = { + "results": { + "scans": { + "Bkav": { + "detected": False, + "version": "1.3.0.9899", + "result": None, + "update": "20220107", + }, + "Elastic": { + "detected": False, + "version": "4.0.32", + "result": "eicar", + "update": "20211223", + }, + }, + "scan_id": "ID42", + "sha1": "some_sha1", + "resource": "some_ressource_id", + "response_code": 1, + }, + "response_code": 200, + } + return response + + def testVirusTotalAgent_whenVirusTotalApiReturnsValidResponse_noExceptionRaised( mocker: plugin.MockerFixture, agent_mock: list[msg.Message], @@ -89,16 +118,11 @@ def testVirusTotalAgent_whenVirusTotalApiReturnsValidResponse_noExceptionRaised( ) virustotal_agent.process(message) - assert len(agent_mock) == 2 + assert len(agent_mock) == 1 assert all(msg.selector == "v3.report.vulnerability" for msg in agent_mock) - assert agent_mock[0].data["risk_rating"] == "SECURE" - assert agent_mock[1].data["risk_rating"] == "HIGH" + assert agent_mock[0].data["risk_rating"] == "HIGH" assert ( agent_mock[0].data["title"] - == "Secure Virustotal malware analysis (MD5 based search)" - ) - assert ( - agent_mock[1].data["title"] == "VirusTotal scan flagged malicious asset(s) (MD5 based search)" ) assert isinstance(agent_mock[0].data["technical_detail"], str) @@ -159,16 +183,11 @@ def testVirusTotalAgent_whenLinkReceived_virusTotalApiReturnsValidResponse( virustotal_agent.process(url_message) - assert len(agent_mock) == 2 + assert len(agent_mock) == 1 assert all(msg.selector == "v3.report.vulnerability" for msg in agent_mock) - assert agent_mock[0].data["risk_rating"] == "SECURE" - assert agent_mock[1].data["risk_rating"] == "HIGH" + assert agent_mock[0].data["risk_rating"] == "HIGH" assert ( agent_mock[0].data["title"] - == "Secure Virustotal malware analysis (MD5 based search)" - ) - assert ( - agent_mock[1].data["title"] == "VirusTotal scan flagged malicious asset(s) (MD5 based search)" ) assert isinstance(agent_mock[0].data["technical_detail"], str) @@ -199,16 +218,11 @@ def testVirusTotalAgent_whenDomainReceived_virusTotalApiReturnsValidResponse( virustotal_agent.process(create_domain_message) - assert len(agent_mock) == 2 + assert len(agent_mock) == 1 assert all(msg.selector == "v3.report.vulnerability" for msg in agent_mock) - assert agent_mock[0].data["risk_rating"] == "SECURE" - assert agent_mock[1].data["risk_rating"] == "HIGH" + assert agent_mock[0].data["risk_rating"] == "HIGH" assert ( agent_mock[0].data["title"] - == "Secure Virustotal malware analysis (MD5 based search)" - ) - assert ( - agent_mock[1].data["title"] == "VirusTotal scan flagged malicious asset(s) (MD5 based search)" ) assert isinstance(agent_mock[0].data["technical_detail"], str) @@ -239,21 +253,9 @@ def testVirusTotalAgent_whenApisReceived_virusTotalApiReturnsValidResponse( virustotal_agent.process(create_network_range_message) - assert len(agent_mock) == 28 + assert len(agent_mock) == 14 assert agent_mock[0].selector == "v3.report.vulnerability" - assert len([msg for msg in agent_mock if msg.data["risk_rating"] == "SECURE"]) == 14 assert len([msg for msg in agent_mock if msg.data["risk_rating"] == "HIGH"]) == 14 - assert ( - len( - [ - msg - for msg in agent_mock - if msg.data["title"] - == "Secure Virustotal malware analysis (MD5 based search)" - ] - ) - == 14 - ) assert ( len( [ @@ -370,14 +372,10 @@ def testVirusTotalAgent_whenFileHasNoPath_shouldReportWithHash( virustotal_agent.process(message_without_path) - assert len(agent_mock) == 2 + assert len(agent_mock) == 1 assert agent_mock[0].data["technical_detail"] == ( - "Analysis of the target `44d88612fea8a8f36de82e1278abb02f`:\n|Package|Result|" - " \n|-------|------| \n|Bkav |_Safe_| \n" - ) - assert agent_mock[1].data["technical_detail"] == ( - "Analysis of the target `44d88612fea8a8f36de82e1278abb02f`:\n|Package| Result" - " | \n|-------|----------| \n|Elastic|_Malicous_| \n" + "Analysis of the target `44d88612fea8a8f36de82e1278abb02f`:\n|Package| Result |" + " \n|-------|----------| \n|Bkav |_Safe_ | \n|Elastic|_Malicous_| \n" ) @@ -434,3 +432,34 @@ def testVirusTotalAgent_whenIPAssetHasIncorrectVersion_raiseValueError( """Test the CIDR Limit in case IP has incorrect version.""" with pytest.raises(ValueError, match="Incorrect ip version 5."): virustotal_agent.process(scan_message_ipv_with_incorrect_version) + + +def testVirusTotalAgent_whenReportIsSecure_shouldReportAsSecure( + mocker: plugin.MockerFixture, + virustotal_agent: virus_total_agent.VirusTotalAgent, + message_without_path: msg.Message, + agent_mock: list[msg.Message], +) -> None: + """Test that the agent report secure reports with correct kb entry.""" + mocker.patch( + "virus_total_apis.PublicApi.get_file_report", + side_effect=virustotal_secure_valid_response, + ) + + virustotal_agent.process(message_without_path) + + assert len(agent_mock) == 1 + assert agent_mock[0].data["risk_rating"] == "SECURE" + assert agent_mock[0].selector == "v3.report.vulnerability" + assert agent_mock[0].data["technical_detail"] == ( + "Analysis of the target `44d88612fea8a8f36de82e1278abb02f`:\n|Package|Result|" + " \n|-------|------| \n|Bkav |_Safe_| \n|Elastic|_Safe_| \n" + ) + assert ( + agent_mock[0].data["title"] + == "Secure Virustotal malware analysis (MD5 based search)" + ) + assert agent_mock[0].data["short_description"] == "VirusTotal Malware analysis." + assert agent_mock[0].data["references"] == [ + {"title": "Virustotal", "url": "https://www.virustotal.com/"} + ] From d60d819439a7e07083c13d659d49ea5a3c593e8c Mon Sep 17 00:00:00 2001 From: ErebusZ Date: Wed, 6 Mar 2024 14:46:57 +0100 Subject: [PATCH 2/6] Exclude unreliable scanners --- agent/process_scans.py | 22 ++++----- agent/virus_total_agent.py | 1 + tests/virus_total_agent_test.py | 80 +++++++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 13 deletions(-) diff --git a/agent/process_scans.py b/agent/process_scans.py index a54b067..a6a794c 100644 --- a/agent/process_scans.py +++ b/agent/process_scans.py @@ -6,19 +6,7 @@ from agent import markdown - -def get_risk_rating( - scans: dict[str, Any], -) -> agent_report_vulnerability_mixin.RiskRating: - """Assign risk level based on scanned file report. - - Returns: - 'HIGH' if at least one anti-virus detected the file as a virus, else Secure. - """ - for scanner_result in scans.values(): - if scanner_result["detected"] is True: - return agent_report_vulnerability_mixin.RiskRating.HIGH - return agent_report_vulnerability_mixin.RiskRating.SECURE +EXCLUDED_SCANNERS = ["K7GW", "TrendMicro-HouseCall"] def get_technical_details(scans: dict[str, Any], target: str | None) -> str: @@ -46,3 +34,11 @@ def is_scan_malicious(scans: dict[str, Any]) -> bool: return True return False + + +def exclude_unreliable_scans(scans: dict[str, Any]) -> None: + for scanner in EXCLUDED_SCANNERS: + try: + scans.pop(scanner) + except KeyError: + continue diff --git a/agent/virus_total_agent.py b/agent/virus_total_agent.py index 20fcf81..b44393e 100644 --- a/agent/virus_total_agent.py +++ b/agent/virus_total_agent.py @@ -88,6 +88,7 @@ def _process_response(self, response: dict[str, Any], target: str | None) -> Non scans = virustotal.get_scans(response) try: if scans is not None: + process_scans.exclude_unreliable_scans(scans) technical_detail = process_scans.get_technical_details(scans, target) if process_scans.is_scan_malicious(scans) is True: diff --git a/tests/virus_total_agent_test.py b/tests/virus_total_agent_test.py index 89a8b1f..958a80a 100644 --- a/tests/virus_total_agent_test.py +++ b/tests/virus_total_agent_test.py @@ -100,6 +100,53 @@ def virustotal_secure_valid_response(message: msg.Message) -> dict[str, Any]: return response +def virustotal_unreliable_scanner_response(message: msg.Message) -> dict[str, Any]: + """Method for mocking the Virus Total public API unreliable scanner response.""" + del message + response = { + "results": { + "scans": { + "TrendMicro-HouseCall": { + "detected": True, + "result": None, + "update": "20240305", + "version": "2.0.0.8", + }, + "K7GW": { + "detected": True, + "result": None, + "update": "20240305", + "version": "23.9.8494.0", + }, + "Acronis": { + "detected": False, + "result": None, + "update": "20230828", + "version": "1.2.0.121", + }, + "AhnLab-V3": { + "detected": False, + "result": None, + "update": "20240305", + "version": "3.25.1.10473", + }, + "Alibaba": { + "detected": False, + "result": None, + "update": "20190527", + "version": "0.3.0.5", + }, + }, + "scan_id": "ID42", + "sha1": "some_sha1", + "resource": "some_ressource_id", + "response_code": 1, + }, + "response_code": 200, + } + return response + + def testVirusTotalAgent_whenVirusTotalApiReturnsValidResponse_noExceptionRaised( mocker: plugin.MockerFixture, agent_mock: list[msg.Message], @@ -463,3 +510,36 @@ def testVirusTotalAgent_whenReportIsSecure_shouldReportAsSecure( assert agent_mock[0].data["references"] == [ {"title": "Virustotal", "url": "https://www.virustotal.com/"} ] + + +def testVirusTotalAgent_whenScannerIsExcluded_shouldNotBeConsidered( + mocker: plugin.MockerFixture, + virustotal_agent: virus_total_agent.VirusTotalAgent, + message_without_path: msg.Message, + agent_mock: list[msg.Message], +) -> None: + """Test that the agent exclude the unreliable scanners specified.""" + mocker.patch( + "virus_total_apis.PublicApi.get_file_report", + side_effect=virustotal_unreliable_scanner_response, + ) + + virustotal_agent.process(message_without_path) + + assert len(agent_mock) == 1 + assert agent_mock[0].data["risk_rating"] == "SECURE" + assert agent_mock[0].selector == "v3.report.vulnerability" + assert "K7GW" not in agent_mock[0].data["technical_detail"] + assert "TrendMicro-HouseCall" not in agent_mock[0].data["technical_detail"] + assert agent_mock[0].data["technical_detail"] == ( + "Analysis of the target `44d88612fea8a8f36de82e1278abb02f`:\n| Package |Result|" + " \n|---------|------| \n|Acronis |_Safe_| \n|AhnLab-V3|_Safe_| \n|Alibaba |_Safe_| \n" + ) + assert ( + agent_mock[0].data["title"] + == "Secure Virustotal malware analysis (MD5 based search)" + ) + assert agent_mock[0].data["short_description"] == "VirusTotal Malware analysis." + assert agent_mock[0].data["references"] == [ + {"title": "Virustotal", "url": "https://www.virustotal.com/"} + ] From b83aa32ed47bfdae1f8869adb5286e962457c52a Mon Sep 17 00:00:00 2001 From: ErebusZ Date: Wed, 6 Mar 2024 15:41:36 +0100 Subject: [PATCH 3/6] ruff --fix --- agent/process_scans.py | 1 - 1 file changed, 1 deletion(-) diff --git a/agent/process_scans.py b/agent/process_scans.py index a6a794c..5fdc6dc 100644 --- a/agent/process_scans.py +++ b/agent/process_scans.py @@ -2,7 +2,6 @@ from typing import Any -from ostorlab.agent.mixins import agent_report_vulnerability_mixin from agent import markdown From 4d70b1645ed7956f406d91feacf69a83c4e8695f Mon Sep 17 00:00:00 2001 From: ErebusZ Date: Wed, 6 Mar 2024 15:50:07 +0100 Subject: [PATCH 4/6] Add docstring --- agent/process_scans.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/agent/process_scans.py b/agent/process_scans.py index 5fdc6dc..26f8384 100644 --- a/agent/process_scans.py +++ b/agent/process_scans.py @@ -28,6 +28,13 @@ def get_technical_details(scans: dict[str, Any], target: str | None) -> str: def is_scan_malicious(scans: dict[str, Any]) -> bool: + """Checks if any scanner reports the target as malicious. + Args: + scans : Dictionary of the scans. + + Returns: + is_malicious : True if the target is reported as malicious false otherwise. + """ for scan_result in scans.values(): if scan_result["detected"] is True: return True @@ -36,6 +43,11 @@ def is_scan_malicious(scans: dict[str, Any]) -> bool: def exclude_unreliable_scans(scans: dict[str, Any]) -> None: + """Excludes unreliable reports from the scans. + + Args: + scans : Dictionary of the scans. + """ for scanner in EXCLUDED_SCANNERS: try: scans.pop(scanner) From 55b1ad50cd11fd0ba96cd9fe37310f75ffaae479 Mon Sep 17 00:00:00 2001 From: ErebusZ Date: Wed, 6 Mar 2024 18:22:42 +0100 Subject: [PATCH 5/6] a more pythonic exclusion method and better tests --- agent/process_scans.py | 12 +-- agent/virus_total_agent.py | 2 +- tests/virus_total_agent_test.py | 154 ++++++++++++++++---------------- 3 files changed, 86 insertions(+), 82 deletions(-) diff --git a/agent/process_scans.py b/agent/process_scans.py index 26f8384..9b7702b 100644 --- a/agent/process_scans.py +++ b/agent/process_scans.py @@ -42,14 +42,16 @@ def is_scan_malicious(scans: dict[str, Any]) -> bool: return False -def exclude_unreliable_scans(scans: dict[str, Any]) -> None: +def exclude_unreliable_scans(scans: dict[str, Any]) -> dict[str, Any]: """Excludes unreliable reports from the scans. Args: scans : Dictionary of the scans. + + Returns: + scans: Dictionary of the scans with only reliable reports. """ for scanner in EXCLUDED_SCANNERS: - try: - scans.pop(scanner) - except KeyError: - continue + scans.pop(scanner, None) + + return scans diff --git a/agent/virus_total_agent.py b/agent/virus_total_agent.py index b44393e..a62aada 100644 --- a/agent/virus_total_agent.py +++ b/agent/virus_total_agent.py @@ -88,7 +88,7 @@ def _process_response(self, response: dict[str, Any], target: str | None) -> Non scans = virustotal.get_scans(response) try: if scans is not None: - process_scans.exclude_unreliable_scans(scans) + scans = process_scans.exclude_unreliable_scans(scans) technical_detail = process_scans.get_technical_details(scans, target) if process_scans.is_scan_malicious(scans) is True: diff --git a/tests/virus_total_agent_test.py b/tests/virus_total_agent_test.py index 958a80a..f00f424 100644 --- a/tests/virus_total_agent_test.py +++ b/tests/virus_total_agent_test.py @@ -12,6 +12,72 @@ from agent import virus_total_agent from agent import virustotal +SECURE_VALID_RESPONSE = { + "results": { + "scans": { + "Bkav": { + "detected": False, + "version": "1.3.0.9899", + "result": None, + "update": "20220107", + }, + "Elastic": { + "detected": False, + "version": "4.0.32", + "result": "eicar", + "update": "20211223", + }, + }, + "scan_id": "ID42", + "sha1": "some_sha1", + "resource": "some_ressource_id", + "response_code": 1, + }, + "response_code": 200, +} + +UNRELIABLE_SCANNERS_RESPONSE = { + "results": { + "scans": { + "TrendMicro-HouseCall": { + "detected": True, + "result": None, + "update": "20240305", + "version": "2.0.0.8", + }, + "K7GW": { + "detected": True, + "result": None, + "update": "20240305", + "version": "23.9.8494.0", + }, + "Acronis": { + "detected": False, + "result": None, + "update": "20230828", + "version": "1.2.0.121", + }, + "AhnLab-V3": { + "detected": False, + "result": None, + "update": "20240305", + "version": "3.25.1.10473", + }, + "Alibaba": { + "detected": False, + "result": None, + "update": "20190527", + "version": "0.3.0.5", + }, + }, + "scan_id": "ID42", + "sha1": "some_sha1", + "resource": "some_ressource_id", + "response_code": 1, + }, + "response_code": 200, +} + def virustotal_url_valid_response(url: str, timeout: int) -> dict[str, Any]: """Method for mocking the Virus Total public API valid response.""" @@ -71,82 +137,6 @@ def virustotal_valid_response(message: msg.Message) -> dict[str, Any]: return response -def virustotal_secure_valid_response(message: msg.Message) -> dict[str, Any]: - """Method for mocking the Virus Total public API valid response.""" - del message - response = { - "results": { - "scans": { - "Bkav": { - "detected": False, - "version": "1.3.0.9899", - "result": None, - "update": "20220107", - }, - "Elastic": { - "detected": False, - "version": "4.0.32", - "result": "eicar", - "update": "20211223", - }, - }, - "scan_id": "ID42", - "sha1": "some_sha1", - "resource": "some_ressource_id", - "response_code": 1, - }, - "response_code": 200, - } - return response - - -def virustotal_unreliable_scanner_response(message: msg.Message) -> dict[str, Any]: - """Method for mocking the Virus Total public API unreliable scanner response.""" - del message - response = { - "results": { - "scans": { - "TrendMicro-HouseCall": { - "detected": True, - "result": None, - "update": "20240305", - "version": "2.0.0.8", - }, - "K7GW": { - "detected": True, - "result": None, - "update": "20240305", - "version": "23.9.8494.0", - }, - "Acronis": { - "detected": False, - "result": None, - "update": "20230828", - "version": "1.2.0.121", - }, - "AhnLab-V3": { - "detected": False, - "result": None, - "update": "20240305", - "version": "3.25.1.10473", - }, - "Alibaba": { - "detected": False, - "result": None, - "update": "20190527", - "version": "0.3.0.5", - }, - }, - "scan_id": "ID42", - "sha1": "some_sha1", - "resource": "some_ressource_id", - "response_code": 1, - }, - "response_code": 200, - } - return response - - def testVirusTotalAgent_whenVirusTotalApiReturnsValidResponse_noExceptionRaised( mocker: plugin.MockerFixture, agent_mock: list[msg.Message], @@ -488,6 +478,12 @@ def testVirusTotalAgent_whenReportIsSecure_shouldReportAsSecure( agent_mock: list[msg.Message], ) -> None: """Test that the agent report secure reports with correct kb entry.""" + + def virustotal_secure_valid_response(message: msg.Message) -> dict[str, Any]: + """Method for mocking the Virus Total public API valid response.""" + del message + return SECURE_VALID_RESPONSE + mocker.patch( "virus_total_apis.PublicApi.get_file_report", side_effect=virustotal_secure_valid_response, @@ -519,6 +515,12 @@ def testVirusTotalAgent_whenScannerIsExcluded_shouldNotBeConsidered( agent_mock: list[msg.Message], ) -> None: """Test that the agent exclude the unreliable scanners specified.""" + + def virustotal_unreliable_scanner_response(message: None) -> dict[str, Any]: + """Method for mocking the Virus Total public API unreliable scanner response.""" + del message + return UNRELIABLE_SCANNERS_RESPONSE + mocker.patch( "virus_total_apis.PublicApi.get_file_report", side_effect=virustotal_unreliable_scanner_response, From 3c3b60bdebed454386af6d66c678c4240a313b9e Mon Sep 17 00:00:00 2001 From: ErebusZ Date: Wed, 6 Mar 2024 18:47:35 +0100 Subject: [PATCH 6/6] Add a comment for the excluded reports --- agent/process_scans.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/agent/process_scans.py b/agent/process_scans.py index 9b7702b..c7eec5e 100644 --- a/agent/process_scans.py +++ b/agent/process_scans.py @@ -5,6 +5,8 @@ from agent import markdown +# These are tools used by VirusTotal to scan files that report a big +# number of false positives, as a consequence their reports are excluded. EXCLUDED_SCANNERS = ["K7GW", "TrendMicro-HouseCall"]