Skip to content

Commit

Permalink
Merge pull request #51 from Ostorlab/fix/fix_segregation_and_exclude_…
Browse files Browse the repository at this point in the history
…rule

Fix segregation and exclude rule
  • Loading branch information
3asm authored Mar 6, 2024
2 parents 08acc88 + 3c3b60b commit 5d2b871
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 83 deletions.
52 changes: 28 additions & 24 deletions agent/process_scans.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,12 @@

from typing import Any

from ostorlab.agent.mixins import agent_report_vulnerability_mixin

from agent import markdown


def get_risk_rating(
scans: dict[str, Any],
) -> agent_report_vulnerability_mixin.RiskRating:
"""Assign risk level based on scanned file report.
Returns:
'HIGH' if at least one anti-virus detected the file as a virus, else Secure.
"""
for scanner_result in scans.values():
if scanner_result["detected"] is True:
return agent_report_vulnerability_mixin.RiskRating.HIGH
return agent_report_vulnerability_mixin.RiskRating.SECURE
# These are tools used by VirusTotal to scan files that report a big
# number of false positives, as a consequence their reports are excluded.
EXCLUDED_SCANNERS = ["K7GW", "TrendMicro-HouseCall"]


def get_technical_details(scans: dict[str, Any], target: str | None) -> str:
Expand All @@ -40,16 +29,31 @@ def get_technical_details(scans: dict[str, Any], target: str | None) -> str:
return technical_detail


def split_scans_by_result(
scans: dict[str, Any],
) -> tuple[dict[str, Any], dict[str, Any]]:
secure_scans: dict[str, Any] = {}
vulnerable_scans: dict[str, Any] = {}
def is_scan_malicious(scans: dict[str, Any]) -> bool:
"""Checks if any scanner reports the target as malicious.
Args:
scans : Dictionary of the scans.
for scan_type, scan_result in scans.items():
Returns:
is_malicious : True if the target is reported as malicious false otherwise.
"""
for scan_result in scans.values():
if scan_result["detected"] is True:
vulnerable_scans[scan_type] = scan_result
else:
secure_scans[scan_type] = scan_result
return True

return False


def exclude_unreliable_scans(scans: dict[str, Any]) -> dict[str, Any]:
"""Excludes unreliable reports from the scans.
Args:
scans : Dictionary of the scans.
Returns:
scans: Dictionary of the scans with only reliable reports.
"""
for scanner in EXCLUDED_SCANNERS:
scans.pop(scanner, None)

return secure_scans, vulnerable_scans
return scans
27 changes: 9 additions & 18 deletions agent/virus_total_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,30 +88,21 @@ def _process_response(self, response: dict[str, Any], target: str | None) -> Non
scans = virustotal.get_scans(response)
try:
if scans is not None:
(
secure_scan_report,
vulnerable_scan_report,
) = process_scans.split_scans_by_result(scans)

if len(secure_scan_report) > 0:
technical_detail = process_scans.get_technical_details(
secure_scan_report, target
)
self.report_vulnerability(
entry=kb.KB.SECURE_VIRUSTOTAL_SCAN,
technical_detail=technical_detail,
risk_rating=agent_report_vulnerability_mixin.RiskRating.SECURE,
)
scans = process_scans.exclude_unreliable_scans(scans)
technical_detail = process_scans.get_technical_details(scans, target)

if len(vulnerable_scan_report) > 0:
technical_detail = process_scans.get_technical_details(
vulnerable_scan_report, target
)
if process_scans.is_scan_malicious(scans) is True:
self.report_vulnerability(
entry=kb.KB.INSECURE_VIRUSTOTAL_SCAN,
technical_detail=technical_detail,
risk_rating=agent_report_vulnerability_mixin.RiskRating.HIGH,
)
else:
self.report_vulnerability(
entry=kb.KB.SECURE_VIRUSTOTAL_SCAN,
technical_detail=technical_detail,
risk_rating=agent_report_vulnerability_mixin.RiskRating.SECURE,
)

except NameError:
logger.error("The scans list is empty.")
Expand Down
193 changes: 152 additions & 41 deletions tests/virus_total_agent_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,72 @@
from agent import virus_total_agent
from agent import virustotal

SECURE_VALID_RESPONSE = {
"results": {
"scans": {
"Bkav": {
"detected": False,
"version": "1.3.0.9899",
"result": None,
"update": "20220107",
},
"Elastic": {
"detected": False,
"version": "4.0.32",
"result": "eicar",
"update": "20211223",
},
},
"scan_id": "ID42",
"sha1": "some_sha1",
"resource": "some_ressource_id",
"response_code": 1,
},
"response_code": 200,
}

UNRELIABLE_SCANNERS_RESPONSE = {
"results": {
"scans": {
"TrendMicro-HouseCall": {
"detected": True,
"result": None,
"update": "20240305",
"version": "2.0.0.8",
},
"K7GW": {
"detected": True,
"result": None,
"update": "20240305",
"version": "23.9.8494.0",
},
"Acronis": {
"detected": False,
"result": None,
"update": "20230828",
"version": "1.2.0.121",
},
"AhnLab-V3": {
"detected": False,
"result": None,
"update": "20240305",
"version": "3.25.1.10473",
},
"Alibaba": {
"detected": False,
"result": None,
"update": "20190527",
"version": "0.3.0.5",
},
},
"scan_id": "ID42",
"sha1": "some_sha1",
"resource": "some_ressource_id",
"response_code": 1,
},
"response_code": 200,
}


def virustotal_url_valid_response(url: str, timeout: int) -> dict[str, Any]:
"""Method for mocking the Virus Total public API valid response."""
Expand Down Expand Up @@ -89,16 +155,11 @@ def testVirusTotalAgent_whenVirusTotalApiReturnsValidResponse_noExceptionRaised(
)
virustotal_agent.process(message)

assert len(agent_mock) == 2
assert len(agent_mock) == 1
assert all(msg.selector == "v3.report.vulnerability" for msg in agent_mock)
assert agent_mock[0].data["risk_rating"] == "SECURE"
assert agent_mock[1].data["risk_rating"] == "HIGH"
assert agent_mock[0].data["risk_rating"] == "HIGH"
assert (
agent_mock[0].data["title"]
== "Secure Virustotal malware analysis (MD5 based search)"
)
assert (
agent_mock[1].data["title"]
== "VirusTotal scan flagged malicious asset(s) (MD5 based search)"
)
assert isinstance(agent_mock[0].data["technical_detail"], str)
Expand Down Expand Up @@ -159,16 +220,11 @@ def testVirusTotalAgent_whenLinkReceived_virusTotalApiReturnsValidResponse(

virustotal_agent.process(url_message)

assert len(agent_mock) == 2
assert len(agent_mock) == 1
assert all(msg.selector == "v3.report.vulnerability" for msg in agent_mock)
assert agent_mock[0].data["risk_rating"] == "SECURE"
assert agent_mock[1].data["risk_rating"] == "HIGH"
assert agent_mock[0].data["risk_rating"] == "HIGH"
assert (
agent_mock[0].data["title"]
== "Secure Virustotal malware analysis (MD5 based search)"
)
assert (
agent_mock[1].data["title"]
== "VirusTotal scan flagged malicious asset(s) (MD5 based search)"
)
assert isinstance(agent_mock[0].data["technical_detail"], str)
Expand Down Expand Up @@ -199,16 +255,11 @@ def testVirusTotalAgent_whenDomainReceived_virusTotalApiReturnsValidResponse(

virustotal_agent.process(create_domain_message)

assert len(agent_mock) == 2
assert len(agent_mock) == 1
assert all(msg.selector == "v3.report.vulnerability" for msg in agent_mock)
assert agent_mock[0].data["risk_rating"] == "SECURE"
assert agent_mock[1].data["risk_rating"] == "HIGH"
assert agent_mock[0].data["risk_rating"] == "HIGH"
assert (
agent_mock[0].data["title"]
== "Secure Virustotal malware analysis (MD5 based search)"
)
assert (
agent_mock[1].data["title"]
== "VirusTotal scan flagged malicious asset(s) (MD5 based search)"
)
assert isinstance(agent_mock[0].data["technical_detail"], str)
Expand Down Expand Up @@ -239,21 +290,9 @@ def testVirusTotalAgent_whenApisReceived_virusTotalApiReturnsValidResponse(

virustotal_agent.process(create_network_range_message)

assert len(agent_mock) == 28
assert len(agent_mock) == 14
assert agent_mock[0].selector == "v3.report.vulnerability"
assert len([msg for msg in agent_mock if msg.data["risk_rating"] == "SECURE"]) == 14
assert len([msg for msg in agent_mock if msg.data["risk_rating"] == "HIGH"]) == 14
assert (
len(
[
msg
for msg in agent_mock
if msg.data["title"]
== "Secure Virustotal malware analysis (MD5 based search)"
]
)
== 14
)
assert (
len(
[
Expand Down Expand Up @@ -370,14 +409,10 @@ def testVirusTotalAgent_whenFileHasNoPath_shouldReportWithHash(

virustotal_agent.process(message_without_path)

assert len(agent_mock) == 2
assert len(agent_mock) == 1
assert agent_mock[0].data["technical_detail"] == (
"Analysis of the target `44d88612fea8a8f36de82e1278abb02f`:\n|Package|Result|"
" \n|-------|------| \n|Bkav |_Safe_| \n"
)
assert agent_mock[1].data["technical_detail"] == (
"Analysis of the target `44d88612fea8a8f36de82e1278abb02f`:\n|Package| Result"
" | \n|-------|----------| \n|Elastic|_Malicous_| \n"
"Analysis of the target `44d88612fea8a8f36de82e1278abb02f`:\n|Package| Result |"
" \n|-------|----------| \n|Bkav |_Safe_ | \n|Elastic|_Malicous_| \n"
)


Expand Down Expand Up @@ -434,3 +469,79 @@ def testVirusTotalAgent_whenIPAssetHasIncorrectVersion_raiseValueError(
"""Test the CIDR Limit in case IP has incorrect version."""
with pytest.raises(ValueError, match="Incorrect ip version 5."):
virustotal_agent.process(scan_message_ipv_with_incorrect_version)


def testVirusTotalAgent_whenReportIsSecure_shouldReportAsSecure(
mocker: plugin.MockerFixture,
virustotal_agent: virus_total_agent.VirusTotalAgent,
message_without_path: msg.Message,
agent_mock: list[msg.Message],
) -> None:
"""Test that the agent report secure reports with correct kb entry."""

def virustotal_secure_valid_response(message: msg.Message) -> dict[str, Any]:
"""Method for mocking the Virus Total public API valid response."""
del message
return SECURE_VALID_RESPONSE

mocker.patch(
"virus_total_apis.PublicApi.get_file_report",
side_effect=virustotal_secure_valid_response,
)

virustotal_agent.process(message_without_path)

assert len(agent_mock) == 1
assert agent_mock[0].data["risk_rating"] == "SECURE"
assert agent_mock[0].selector == "v3.report.vulnerability"
assert agent_mock[0].data["technical_detail"] == (
"Analysis of the target `44d88612fea8a8f36de82e1278abb02f`:\n|Package|Result|"
" \n|-------|------| \n|Bkav |_Safe_| \n|Elastic|_Safe_| \n"
)
assert (
agent_mock[0].data["title"]
== "Secure Virustotal malware analysis (MD5 based search)"
)
assert agent_mock[0].data["short_description"] == "VirusTotal Malware analysis."
assert agent_mock[0].data["references"] == [
{"title": "Virustotal", "url": "https://www.virustotal.com/"}
]


def testVirusTotalAgent_whenScannerIsExcluded_shouldNotBeConsidered(
mocker: plugin.MockerFixture,
virustotal_agent: virus_total_agent.VirusTotalAgent,
message_without_path: msg.Message,
agent_mock: list[msg.Message],
) -> None:
"""Test that the agent exclude the unreliable scanners specified."""

def virustotal_unreliable_scanner_response(message: None) -> dict[str, Any]:
"""Method for mocking the Virus Total public API unreliable scanner response."""
del message
return UNRELIABLE_SCANNERS_RESPONSE

mocker.patch(
"virus_total_apis.PublicApi.get_file_report",
side_effect=virustotal_unreliable_scanner_response,
)

virustotal_agent.process(message_without_path)

assert len(agent_mock) == 1
assert agent_mock[0].data["risk_rating"] == "SECURE"
assert agent_mock[0].selector == "v3.report.vulnerability"
assert "K7GW" not in agent_mock[0].data["technical_detail"]
assert "TrendMicro-HouseCall" not in agent_mock[0].data["technical_detail"]
assert agent_mock[0].data["technical_detail"] == (
"Analysis of the target `44d88612fea8a8f36de82e1278abb02f`:\n| Package |Result|"
" \n|---------|------| \n|Acronis |_Safe_| \n|AhnLab-V3|_Safe_| \n|Alibaba |_Safe_| \n"
)
assert (
agent_mock[0].data["title"]
== "Secure Virustotal malware analysis (MD5 based search)"
)
assert agent_mock[0].data["short_description"] == "VirusTotal Malware analysis."
assert agent_mock[0].data["references"] == [
{"title": "Virustotal", "url": "https://www.virustotal.com/"}
]

0 comments on commit 5d2b871

Please sign in to comment.