diff --git a/rapidast.py b/rapidast.py index ffe803f..ae1353b 100755 --- a/rapidast.py +++ b/rapidast.py @@ -100,7 +100,11 @@ def run_scanner(name, config, args, scan_exporter): return 1 # Part 2: setup the environment (e.g.: spawn a server) - scanner.setup() + try: + scanner.setup() + except Exception as excp: # pylint: disable=W0718 + logging.error(f"Failed to set up the scanner: {excp}") + scanner.state = scanners.State.ERROR logging.debug(scanner) diff --git a/scanners/zap/zap.py b/scanners/zap/zap.py index 71f90bd..20f624d 100644 --- a/scanners/zap/zap.py +++ b/scanners/zap/zap.py @@ -6,8 +6,10 @@ import re import shutil import tarfile +import xml.etree.ElementTree as ET from base64 import urlsafe_b64encode from collections import namedtuple +from pathlib import Path import yaml @@ -637,6 +639,10 @@ def _setup_active_scan(self): if not job["parameters"].get("policy"): job["parameters"]["policy"] = "API-scan-minimal" + validate_active_scan_policy( + policy_path=Path(MODULE_DIR) / "policies" / f"{job['parameters']['policy']}.policy", + ) + self.automation_config["jobs"].append(job) def _construct_report_af(self, report_format): @@ -1026,3 +1032,57 @@ def ensure_list(entry): automation_config["env"]["contexts"] = [] automation_config["env"]["contexts"].append({"name": context}) return ensure_default(automation_config["env"]["contexts"][-1]) + + +class PolicyFileNotFoundError(FileNotFoundError): + """Raised when the policy file is not found.""" + + +class MissingConfigurationNodeError(RuntimeError): + """Raised when the root node is missing""" + + +class MissingPolicyNodeError(RuntimeError): + """Raised when the node inside is missing""" + + +class MismatchedPolicyNameError(RuntimeError): + """Raised when the node content does not match the filename""" + + +class InvalidXMLFileError(RuntimeError): + """Raised when the policy file is not a valid XML""" + + +def validate_active_scan_policy(policy_path: Path): + policy_name = policy_path.stem + + logging.info(f"Starting validation of ZAP active scan policy: '{policy_path}'") + + if not policy_path.is_file(): + raise PolicyFileNotFoundError( + f"Policy '{policy_name}' not found in '{policy_path.parent}' directory. " + f"Please check the policy name in the configuration" + ) + + try: + tree = ET.parse(policy_path) + root = tree.getroot() + + if not root.tag or root.tag != "configuration": + raise MissingConfigurationNodeError(f"Missing node in '{policy_name}.policy'") + + policy_node = root.find("policy") + if policy_node is None: + raise MissingPolicyNodeError(f"Missing node inside in '{policy_name}.policy'") + + if policy_node.text.strip() != policy_name: + raise MismatchedPolicyNameError( + f"The node in '{policy_name}' does not match the filename. " + f"Expected '{policy_name}', but found '{policy_node.text.strip()}'" + ) + + except ET.ParseError as exc: + raise InvalidXMLFileError(f"Policy file '{policy_path}' is not a valid XML file") from exc + + logging.info(f"Validation successful for policy file: '{policy_path}'") diff --git a/tests/scanners/zap/test_setup.py b/tests/scanners/zap/test_setup.py index 9c076d6..e98b007 100644 --- a/tests/scanners/zap/test_setup.py +++ b/tests/scanners/zap/test_setup.py @@ -1,11 +1,15 @@ import os from pathlib import Path +from unittest.mock import MagicMock +from unittest.mock import patch import pytest import requests import configmodel.converter +import scanners from scanners.zap.zap import find_context +from scanners.zap.zap import MODULE_DIR from scanners.zap.zap_none import ZapNone # from pytest_mock import mocker @@ -232,9 +236,9 @@ def test_setup_include_urls(test_config): assert "def" in find_context(test_zap.automation_config)["includePaths"] -def test_setup_active_scan(test_config): +@patch("scanners.zap.zap.validate_active_scan_policy") +def test_setup_active_scan(mock_validate_active_scan_policy, test_config): test_config.set("scanners.zap.activeScan.maxRuleDurationInMins", 10) - test_zap = ZapNone(config=test_config) test_zap.setup() @@ -244,6 +248,10 @@ def test_setup_active_scan(test_config): assert item["parameters"]["maxRuleDurationInMins"] == 10 assert item["parameters"]["context"] == "Default Context" assert item["parameters"]["user"] == "" + mock_validate_active_scan_policy.assert_called_once_with( + policy_path=Path(f"{MODULE_DIR}/policies/API-scan-minimal.policy") + ) + break else: assert False diff --git a/tests/scanners/zap/test_setup_activescan_policy_validatio.py b/tests/scanners/zap/test_setup_activescan_policy_validatio.py new file mode 100644 index 0000000..115fcd0 --- /dev/null +++ b/tests/scanners/zap/test_setup_activescan_policy_validatio.py @@ -0,0 +1,85 @@ +from pathlib import Path + +import pytest + +from scanners.zap.zap import InvalidXMLFileError +from scanners.zap.zap import MismatchedPolicyNameError +from scanners.zap.zap import MissingConfigurationNodeError +from scanners.zap.zap import MissingPolicyNodeError +from scanners.zap.zap import PolicyFileNotFoundError +from scanners.zap.zap import validate_active_scan_policy + + +@pytest.fixture +def valid_policy_file(tmp_path): + policy_name = "policy1" + valid_xml_content = """ + policy1 + 100 + """ + file_path = tmp_path / f"{policy_name}.policy" + file_path.write_text(valid_xml_content) + return file_path + + +@pytest.fixture +def invalid_xml_file(tmp_path): + policy_name = "policy1" + invalid_xml_content = """ + policy2 + """ # Mismatched policy name + file_path = tmp_path / f"{policy_name}.policy" + file_path.write_text(invalid_xml_content) + return file_path + + +@pytest.fixture +def missing_policy_file(): + return Path("/non/existent/path/policy1.policy") + + +def test_valid_policy(valid_policy_file): + validate_active_scan_policy(valid_policy_file) + + +def test_missing_policy_file(missing_policy_file): + with pytest.raises(PolicyFileNotFoundError): + validate_active_scan_policy(missing_policy_file) + + +def test_invalid_xml_file(invalid_xml_file): + with pytest.raises(MismatchedPolicyNameError): + validate_active_scan_policy(invalid_xml_file) + + +def test_invalid_xml_parse(invalid_xml_file): + invalid_xml_content = """ + policy1 + + policy1 + """ + file_path = invalid_xml_file + file_path.write_text(invalid_xml_content) + + with pytest.raises(MissingConfigurationNodeError): + validate_active_scan_policy(file_path) + + +def test_missing_policy_node(invalid_xml_file): + invalid_xml_content = """ + policy1 + """ + file_path = invalid_xml_file + file_path.write_text(invalid_xml_content) + + with pytest.raises(MissingPolicyNodeError): + validate_active_scan_policy(file_path) diff --git a/tests/test_rapidast_run_scanner.py b/tests/test_rapidast_run_scanner.py new file mode 100644 index 0000000..cf05a37 --- /dev/null +++ b/tests/test_rapidast_run_scanner.py @@ -0,0 +1,61 @@ +from unittest.mock import MagicMock +from unittest.mock import patch + +import rapidast +from rapidast import scanners + + +@patch("rapidast.scanners.str_to_scanner") +def test_run_scanner_setup_failure(mock_str_to_scanner): + """ + Test that if an exception occurs during `scanner.setup`, the `run_scanner` method + catches the exception, returns 1, and updates the scanner's state to 'ERROR' + """ + + mock_config = MagicMock() + mock_args = MagicMock() + mock_scan_exporter = MagicMock() + + mock_scanner = MagicMock() + mock_str_to_scanner.return_value = lambda config, name: mock_scanner + + mock_scanner.setup.side_effect = Exception("Setup failed") + + result = rapidast.run_scanner("mock_name", mock_config, mock_args, mock_scan_exporter) + + assert result == 1 + mock_scanner.setup.assert_called_once() + assert mock_scanner.state == scanners.State.ERROR + + +@patch("rapidast.scanners.str_to_scanner") +def test_run_scanner_setup_success(mock_str_to_scanner): + """ + Test that if `scanner.setup` is successful, `run_scanner` continues as expected. + Subsequent actions are mocked to focus on ensuring `run_scanner` returns a successful + result (0) + """ + + def update_state(state): + mock_scanner.state = state + + def update_state_ready(): + update_state(scanners.State.READY) + + def update_state_processed(): + update_state(scanners.State.PROCESSED) + + mock_config = MagicMock() + mock_args = MagicMock() + mock_scan_exporter = MagicMock() + + mock_scanner = MagicMock() + mock_str_to_scanner.return_value = lambda config, name: mock_scanner + + mock_scanner.setup.side_effect = update_state_ready + mock_scanner.postprocess.side_effect = update_state_processed + + result = rapidast.run_scanner("mock_name", mock_config, mock_args, mock_scan_exporter) + + assert result == 0 + mock_scanner.setup.assert_called_once()