Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat / import exploits #12

Merged
merged 9 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions agent/asteroid_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from agent import targets_preparer
from agent import exploits_registry
from agent import definitions
from agent import exploits

logging.basicConfig(
format="%(message)s",
Expand Down Expand Up @@ -44,6 +45,7 @@ def __init__(
"""Initialize The Agent instance."""

super().__init__(agent_definition, agent_settings)
exploits.import_all()
self.exploits: list[
definitions.Exploit
] = exploits_registry.ExploitsRegistry.values()
Expand Down
4 changes: 4 additions & 0 deletions agent/exploits/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Importer module."""
from agent import importer

import_all = importer.importer_for(__file__, __name__)
2 changes: 2 additions & 0 deletions agent/exploits/cve_2021_22941.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from requests import exceptions as requests_exceptions

from agent import definitions
from agent import exploits_registry

VULNERABILITY_TITLE = (
"Improper Access Control in Citrix ShareFile storage zones controller"
Expand All @@ -31,6 +32,7 @@ def _encode_multipart_formdata(files: dict[str, str]) -> tuple[str, str]:
return body, content_type


@exploits_registry.register
class CVE20222941Exploit(definitions.Exploit):
"""
CVE-2021-22941: Improper Access Control in Citrix ShareFile storage zones controller
Expand Down
2 changes: 2 additions & 0 deletions agent/exploits/cve_2023_27997.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from scipy import stats

from agent import definitions
from agent import exploits_registry

VULNERABILITY_TITLE = "A heap-based buffer overflow vulnerability in FortiOS"
VULNERABILITY_REFERENCE = "CVE-2023-27997"
Expand Down Expand Up @@ -63,6 +64,7 @@ def _check_stats(regular: list[int], overflow: list[int]) -> tuple[int, int, Any
return len(overflow), len(regular), t_stat


@exploits_registry.register
class CVE202327997Exploit(definitions.Exploit):
"""
CVE-2021-22941: A heap-based buffer overflow vulnerability in FortiOS
Expand Down
23 changes: 23 additions & 0 deletions agent/exploits_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ def register_ref(
cls.registry[cls.__name__][getattr(obj, key)] = obj
return obj

@classmethod
def unregister_ref(
cls,
obj: definitions.Exploit,
key: str = "__key__",
) -> None:
cls.registry[cls.__name__].pop(getattr(obj, key))

@classmethod
def values(
cls,
Expand All @@ -42,3 +50,18 @@ def register(
"""
ExploitsRegistry.register_ref(obj=f())
return f


def unregister(
f: Type[definitions.Exploit],
) -> None:
"""
To be used for unregistering an exploit class.

Args:
f: The class which its object will be unregistered.

Returns:
None.
"""
ExploitsRegistry.unregister_ref(obj=f())
53 changes: 53 additions & 0 deletions agent/importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""Dynamic importer to enable registry pattern."""
import logging
import sys
import traceback
import os
import pkgutil
from typing import Callable

logger = logging.getLogger(__name__)


def importer_for(path: str, prefix: str) -> Callable[[], None]:
"""
Creates a function for importing all Python files in a specified folder and adding them as attributes
to a given module.
Args:
path (str): the path to the folder containing the Python files to import
prefix (str): the prefix to use when adding the imported files as attributes to the module
Returns:
A function that can be called to perform the import and attribute assignment. The function takes two
optional arguments:
- file_path (str): the path to the folder containing the Python files to import (default is `path`)
- stop_on_error (bool): whether to stop execution if an error occurs during import (default is `True`)
"""

def import_all(path: str = path, stop_on_error: bool = True) -> None:
"""
Imports all Python files in a specified folder and adds them as attributes to a given module.
Args:
file_path (str): the path to the folder containing the Python files to import
(default is the value passed to `importer_for`)
stop_on_error (bool): whether to stop execution if an error occurs during import (default is `True`)
"""
folder = os.path.dirname(path)
module = sys.modules[prefix]
for importer, name, _ in pkgutil.iter_modules([folder]):
absname = prefix + "." + name
if absname in sys.modules:
continue
loader = importer.find_module(absname) # type: ignore[call-arg]
try:
if loader is not None:
submod = loader.load_module(absname)
except ImportError as e:
if stop_on_error:
raise
# This is for debugging to print the full trace and pinpoint to source of the exception.
traceback.print_exc()
logger.warning("Cannot load REbus plugin [%s]. Root cause: %s", name, e)
else:
setattr(module, name, submod)

return import_all
9 changes: 5 additions & 4 deletions tests/asteroid_agent_test.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
"""Unit tests for AsteroidAgent."""
from typing import Type
from agent import asteroid_agent
from agent import definitions
from typing import Type, Generator

from ostorlab.agent.message import message as m

from agent import asteroid_agent
from agent import definitions


def testAsteroidAgent_whenExploitCheckDetectVulnz_EmitsVulnerabilityReport(
exploit_instance_with_report: Type[definitions.Exploit],
exploit_instance_with_report: Generator[Type[definitions.Exploit], None, None],
asteroid_agent_instance: asteroid_agent.AsteroidAgent,
agent_mock: list[m.Message],
scan_message_domain_name: m.Message,
Expand Down
86 changes: 37 additions & 49 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,20 @@
"""Pytest fixtures for agent Asteroid"""
import pathlib
import random
from typing import Type
from typing import Type, Generator

import pytest
from ostorlab.agent import definitions as agent_definitions
from ostorlab.agent.message import message
from ostorlab.runtimes import definitions as runtime_definitions

from ostorlab.agent.mixins import agent_report_vulnerability_mixin as vuln_mixin
from ostorlab.agent.kb import kb
from agent import asteroid_agent
from agent import exploits_registry
from agent import definitions


@pytest.fixture()
def exploit_instance() -> Type[definitions.Exploit]:
class TestExploit(definitions.Exploit):
"""test class Exploit."""

def accept(self, target: definitions.Target) -> bool:
return False

def check(self, target: definitions.Target) -> list[definitions.Vulnerability]:
return []

return TestExploit


@pytest.fixture()
def exploit_instance_with_report() -> Type[definitions.Exploit]:
@exploits_registry.register
class TestExploit(definitions.Exploit):
"""test class Exploit."""

def accept(self, target: definitions.Target) -> bool:
return True

def check(self, target: definitions.Target) -> list[definitions.Vulnerability]:
return [
definitions.Vulnerability(
technical_detail="test",
entry=kb.Entry(
title="test",
risk_rating="INFO",
short_description="test purposes",
description="test purposes",
recommendation="",
references={},
security_issue=False,
privacy_issue=False,
has_public_exploit=False,
targeted_by_malware=False,
targeted_by_ransomware=False,
targeted_by_nation_state=False,
),
risk_rating=vuln_mixin.RiskRating.HIGH,
)
]

return TestExploit


@pytest.fixture()
def scan_message_domain_name() -> message.Message:
"""Creates a message of type v3.asset.domain_name.service to be used by the agent for testing purposes."""
Expand Down Expand Up @@ -112,3 +65,38 @@ def asteroid_agent_instance() -> asteroid_agent.AsteroidAgent:
)

return asteroid_agent.AsteroidAgent(definition, settings)


@pytest.fixture()
def exploit_instance_with_report() -> Generator[Type[definitions.Exploit], None, None]:
@exploits_registry.register
class TestExploit(definitions.Exploit):
"""test class Exploit."""

def accept(self, target: definitions.Target) -> bool:
return True

def check(self, target: definitions.Target) -> list[definitions.Vulnerability]:
return [
definitions.Vulnerability(
technical_detail="test",
entry=kb.Entry(
title="test",
risk_rating="INFO",
short_description="test purposes",
description="test purposes",
recommendation="",
references={},
security_issue=False,
privacy_issue=False,
has_public_exploit=False,
targeted_by_malware=False,
targeted_by_ransomware=False,
targeted_by_nation_state=False,
),
risk_rating=vuln_mixin.RiskRating.HIGH,
)
]

yield TestExploit
exploits_registry.unregister(TestExploit)
29 changes: 21 additions & 8 deletions tests/exploits_registry_test.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
"""Unit tests for the exploits' registry."""
from typing import Type
from agent import definitions
import collections

from agent import exploits_registry
from agent import exploits


def testExploitsRegistry_importingAllExploits_registerAll() -> None:
"""Ensure the exploits_registry registers all exploits."""

exploits.import_all()

def testExploitsRegistry_whenRegisteringClassDirectly_shouldLoadClass(
exploit_instance: Type[definitions.Exploit],
) -> None:
"""Ensure the exploits_registry registers instances of the exploits."""
exploits_registry.register(exploit_instance)
registered_exploits = exploits_registry.ExploitsRegistry.values()

assert len(registered_exploits) >= 1
assert len(registered_exploits) == 2


def testExploitsRegistry_allExploits_mustBeRegisteredOnce() -> None:
"""Ensure that there are no exploits registered more than once."""

exploits.import_all()

cnt = collections.Counter(exploits_registry.ExploitsRegistry().values())

assert all(
v == 1 for v in cnt.values()
), f"Found {[(k, v) for k, v in cnt.items() if v > 1]}"
Loading