Skip to content

Commit

Permalink
Merge pull request #44 from Ostorlab/feat_update_cidr_limit
Browse files Browse the repository at this point in the history
Feat : Add CIDR Limit
  • Loading branch information
benyissa authored Dec 19, 2023
2 parents 6e7c248 + 591e487 commit ee5768a
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 3 deletions.
18 changes: 16 additions & 2 deletions agent/virus_total_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@

logger = logging.getLogger(__name__)

IPV4_CIDR_LIMIT = 16
IPV6_CIDR_LIMIT = 112


class VirusTotalAgent(
agent.Agent, agent_report_vulnerability_mixin.AgentReportVulnMixin
Expand Down Expand Up @@ -106,10 +109,21 @@ def _prepare_targets(self, message: msg.Message) -> list[str]:
from the config."""
if message.data.get("host") is not None:
host = str(message.data.get("host"))
if message.data.get("mask") is None:
mask = message.data.get("mask")
if mask is None:
ip_network = ipaddress.ip_network(host)
else:
mask = message.data.get("mask")
version = message.data.get("version")
if version not in (4, 6):
raise ValueError(f"Incorrect ip version {version}.")
elif version == 4 and int(mask) < IPV4_CIDR_LIMIT:
raise ValueError(
f"Subnet mask below {IPV4_CIDR_LIMIT} is not supported."
)
elif version == 6 and int(mask) < IPV6_CIDR_LIMIT:
raise ValueError(
f"Subnet mask below {IPV6_CIDR_LIMIT} is not supported."
)
ip_network = ipaddress.ip_network(f"{host}/{mask}", strict=False)
return [str(h) for h in ip_network.hosts()]

Expand Down
2 changes: 1 addition & 1 deletion ostorlab.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
kind: Agent
name: virustotal
version: 0.2.0
version: 0.2.1
image: images/logo.png
description: |
This repository is an implementation of the VirusTotal agent.
Expand Down
52 changes: 52 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,55 @@ def message_without_path() -> msg.Message:
selector = "v3.asset.file"
msg_data = {"content": file_content}
return msg.Message.from_data(selector, data=msg_data)


@pytest.fixture()
def scan_message_ipv4_with_mask8() -> msg.Message:
"""Creates a message of type v3.asset.ip.v4 to be used by the agent for testing purposes."""
selector = "v3.asset.ip.v4"
msg_data = {"host": "192.168.1.17", "mask": "8", "version": 4}
return msg.Message.from_data(selector, data=msg_data)


@pytest.fixture()
def scan_message_ipv4_with_mask16() -> msg.Message:
"""Creates a message of type v3.asset.ip.v4 to be used by the agent for testing purposes."""
selector = "v3.asset.ip.v4"
msg_data = {"host": "192.168.1.17", "mask": "16", "version": 4}
return msg.Message.from_data(selector, data=msg_data)


@pytest.fixture()
def scan_message_ipv6_with_mask64() -> msg.Message:
"""Creates a message of type v3.asset.ip.v6 to be used by the agent for testing purposes."""
selector = "v3.asset.ip.v6"
msg_data = {
"host": "2001:db8:3333:4444:5555:6666:7777:8888",
"mask": "64",
"version": 6,
}
return msg.Message.from_data(selector, data=msg_data)


@pytest.fixture()
def scan_message_ipv6_with_mask112() -> msg.Message:
"""Creates a message of type v3.asset.ip.v6 to be used by the agent for testing purposes."""
selector = "v3.asset.ip.v6"
msg_data = {
"host": "2001:db8:3333:4444:5555:6666:7777:8888",
"mask": "112",
"version": 6,
}
return msg.Message.from_data(selector, data=msg_data)


@pytest.fixture()
def scan_message_ipv_with_incorrect_version() -> msg.Message:
"""Creates a message of type v3.asset.ip with an incorrect version."""
selector = "v3.asset.ip"
msg_data = {
"host": "0.0.0.0",
"mask": "32",
"version": 5,
}
return msg.Message.from_data(selector, data=msg_data)
56 changes: 56 additions & 0 deletions tests/virus_total_agent_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from ostorlab.agent.message import message as msg
from pytest_mock import plugin
import pytest

from agent import virus_total_agent
from agent import virustotal
Expand Down Expand Up @@ -334,3 +335,58 @@ def testVirusTotalAgent_whenFileHasNoPath_shouldReportWithHash(
"Analysis of the target `44d88612fea8a8f36de82e1278abb02f`:\n|Package| Result | \n"
"|-------|----------| \n|Bkav |_Safe_ | \n|Elastic|_Malicous_| \n"
)


def testVirusTotalAgent_whenIPv4AssetReachCIDRLimit_raiseValueError(
scan_message_ipv4_with_mask8: msg.Message,
virustotal_agent: virus_total_agent.VirusTotalAgent,
) -> None:
"""Test the CIDR Limit in case IPV4 and the Limit is reached."""
with pytest.raises(ValueError, match="Subnet mask below 16 is not supported."):
virustotal_agent.process(scan_message_ipv4_with_mask8)


def testVirusTotalAgent_whenIPv4AssetDoesNotReachCIDRLimit_doesNotRaiseValueError(
mocker: plugin.MockerFixture,
scan_message_ipv4_with_mask16: msg.Message,
virustotal_agent: virus_total_agent.VirusTotalAgent,
) -> None:
"""Test the CIDR Limit in case IPV4 and the Limit is not reached."""
mocker.patch(
"agent.virustotal.scan_url_from_message",
return_value={},
)

virustotal_agent.process(scan_message_ipv4_with_mask16)


def testVirusTotalAgent_whenIPv6AssetReachCIDRLimit_raiseValueError(
scan_message_ipv6_with_mask64: msg.Message,
virustotal_agent: virus_total_agent.VirusTotalAgent,
) -> None:
"""Test the CIDR Limit in case IPV6 and the Limit is reached."""
with pytest.raises(ValueError, match="Subnet mask below 112 is not supported."):
virustotal_agent.process(scan_message_ipv6_with_mask64)


def testVirusTotalAgent_whenIPv6AssetDoesNotReachCIDRLimit_doesNotRaiseValueError(
mocker: plugin.MockerFixture,
scan_message_ipv6_with_mask112: msg.Message,
virustotal_agent: virus_total_agent.VirusTotalAgent,
) -> None:
"""Test the CIDR Limit in case IPV6 and the Limit is not reached."""
mocker.patch(
"agent.virustotal.scan_url_from_message",
return_value={},
)

virustotal_agent.process(scan_message_ipv6_with_mask112)


def testVirusTotalAgent_whenIPAssetHasIncorrectVersion_raiseValueError(
scan_message_ipv_with_incorrect_version: msg.Message,
virustotal_agent: virus_total_agent.VirusTotalAgent,
) -> None:
"""Test the CIDR Limit in case IP has incorrect version."""
with pytest.raises(ValueError, match="Incorrect ip version 5."):
virustotal_agent.process(scan_message_ipv_with_incorrect_version)

0 comments on commit ee5768a

Please sign in to comment.