-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FR] Add support for configurable tests and validation #4
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
|
||
# set the environment variable DETECTION_RULES_TEST_CONFIG | ||
|
||
# `bypass` and `test_only` are mutually exclusive and will cause an error if both are specified. | ||
# | ||
# tests can be defined by their full name or using glob-style patterns with the following notation | ||
# pattern:*rule* | ||
# the patterns are case sensitive | ||
|
||
unit_tests: | ||
# define tests to explicitly bypass, with all others being run | ||
# | ||
# to run all tests, set bypass to empty or leave this file commented out | ||
bypass: | ||
# - tests.test_all_rules.TestRuleMetadata.test_event_dataset | ||
# - tests.test_all_rules.TestRuleMetadata.test_integration_tag | ||
# - tests.test_gh_workflows.TestWorkflows.test_matrix_to_lock_version_defaults | ||
# - pattern:*rule* | ||
# - pattern:*kuery* | ||
|
||
# define tests to explicitly run, with all others being bypassed | ||
# | ||
# to bypass all tests, set test_only to empty | ||
test_only: | ||
# - tests.test_all_rules.TestRuleMetadata.test_event_dataset | ||
# - pattern:*rule* | ||
|
||
|
||
# `bypass` and `test_only` are mutually exclusive and will cause an error if both are specified. | ||
# | ||
# both variables require a list of rule_ids | ||
rule_validation: | ||
|
||
bypass: | ||
# - "34fde489-94b0-4500-a76f-b8a157cf9269" | ||
|
||
|
||
test_only: | ||
# - "34fde489-94b0-4500-a76f-b8a157cf9269" |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,7 +33,9 @@ | |
from .schemas import all_versions, definitions, get_incompatible_fields, get_schema_file | ||
from .utils import Ndjson, get_path, get_etc_path, clear_caches, load_dump, load_rule_contents | ||
|
||
|
||
RULES_DIR = get_path('rules') | ||
ROOT_DIR = Path(RULES_DIR).parent | ||
RULES_CONFIG = parse_rules_config() | ||
|
||
|
||
|
@@ -415,8 +417,16 @@ def test_rules(ctx): | |
"""Run unit tests over all of the rules.""" | ||
import pytest | ||
|
||
rules_config = ctx.obj['rules_config'] | ||
test_config = rules_config.test_config | ||
tests, skipped = test_config.get_test_names(formatted=True) | ||
|
||
if skipped: | ||
click.echo(f'Tests skipped per config ({len(skipped)}):') | ||
click.echo('\n'.join(skipped)) | ||
|
||
clear_caches() | ||
ctx.exit(pytest.main(["-v"])) | ||
ctx.exit(pytest.main(['-v'] + tests)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I initially built a custom class around |
||
|
||
|
||
@root.group('typosquat') | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,14 +4,16 @@ | |
# 2.0. | ||
|
||
"""Misc support.""" | ||
import fnmatch | ||
import os | ||
import re | ||
import time | ||
import unittest | ||
import uuid | ||
from dataclasses import dataclass | ||
from pathlib import Path | ||
from functools import wraps | ||
from typing import Dict, NoReturn, Optional | ||
from functools import cached_property, wraps | ||
from typing import Dict, List, NoReturn, Optional | ||
|
||
import click | ||
import requests | ||
|
@@ -52,6 +54,7 @@ | |
""".strip().format("\n".join(' * ' + line for line in LICENSE_LINES)) | ||
|
||
|
||
ROOT_DIR = Path(__file__).parent.parent | ||
CUSTOM_RULES_DIR = os.getenv('CUSTOM_RULES_DIR', None) | ||
|
||
|
||
|
@@ -115,8 +118,8 @@ def nest_from_dot(dots, value): | |
|
||
nested = {fields.pop(): value} | ||
|
||
for field in reversed(fields): | ||
nested = {field: nested} | ||
for field_ in reversed(fields): | ||
nested = {field_: nested} | ||
|
||
return nested | ||
|
||
|
@@ -296,6 +299,127 @@ def parse_user_config(): | |
return config | ||
|
||
|
||
def discover_tests(start_dir: str = 'tests', pattern: str = 'test*.py', top_level_dir: Optional[str] = None): | ||
def list_tests(s, tests=None): | ||
if tests is None: | ||
tests = [] | ||
for test in s: | ||
if isinstance(test, unittest.TestSuite): | ||
list_tests(test, tests) | ||
else: | ||
tests.append(test.id()) | ||
return tests | ||
|
||
loader = unittest.defaultTestLoader | ||
suite = loader.discover(start_dir, pattern=pattern, top_level_dir=top_level_dir or str(ROOT_DIR)) | ||
return list_tests(suite) | ||
|
||
|
||
@dataclass | ||
class UnitTest: | ||
bypass: Optional[List[str]] = None | ||
test_only: Optional[List[str]] = None | ||
|
||
def __post_init__(self): | ||
assert not (self.bypass and self.test_only), 'Cannot use both test_only and bypass' | ||
|
||
|
||
@dataclass | ||
class RuleValidation: | ||
bypass: Optional[List[str]] = None | ||
test_only: Optional[List[str]] = None | ||
|
||
def __post_init__(self): | ||
assert not (self.bypass and self.test_only), 'Cannot use both test_only and bypass' | ||
brokensound77 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
@dataclass | ||
class TestConfig: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd recommend moving this class to a dedicated module since it seems like it has a mode dedicated purpose than miscellaneous methods/classes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ++ totes, I'll make a config.py There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and move all of this |
||
"""Detection rules test config file""" | ||
|
||
@classmethod | ||
def from_dict(cls, test_file: Optional[Path] = None, unit_tests: Optional[dict] = None, | ||
rule_validation: Optional[dict] = None): | ||
return cls(test_file=test_file or None, unit_tests=UnitTest(**unit_tests or {}), | ||
rule_validation=RuleValidation(**rule_validation or {})) | ||
|
||
test_file: Optional[Path] = None | ||
unit_tests: Optional[UnitTest] = None | ||
rule_validation: Optional[RuleValidation] = None | ||
|
||
@cached_property | ||
def all_tests(self): | ||
"""Get the list of all test names.""" | ||
return discover_tests() | ||
|
||
def tests_by_patterns(self, *patterns: str) -> List[str]: | ||
"""Get the list of test names by patterns.""" | ||
tests = set() | ||
for pattern in patterns: | ||
tests.update(list(fnmatch.filter(self.all_tests, pattern))) | ||
brokensound77 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return sorted(tests) | ||
|
||
@staticmethod | ||
def parse_out_patterns(names: List[str]) -> (List[str], List[str]): | ||
"""Parse out test patterns from a list of test names.""" | ||
patterns = [] | ||
tests = [] | ||
for name in names: | ||
if name.startswith('pattern:') and '*' in name: | ||
patterns.append(name[len('pattern:'):]) | ||
else: | ||
tests.append(name) | ||
return patterns, tests | ||
Comment on lines
+363
to
+372
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we don't prefix lines in the config with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤔 that should work, yea. Since full name would have a single match only. I can't remember if there was any other reason why I broke them out though. If not, I'll simplify it with this |
||
|
||
def get_test_names(self, formatted: bool = False) -> (List[str], List[str]): | ||
"""Get the list of test names to run.""" | ||
patterns_t, tests_t = self.parse_out_patterns(self.unit_tests.test_only or []) | ||
patterns_b, tests_b = self.parse_out_patterns(self.unit_tests.bypass or []) | ||
tests = tests_t + tests_b | ||
patterns = patterns_t + patterns_b | ||
unknowns = sorted(set(tests) - set(self.all_tests)) | ||
assert not unknowns, f'Unrecognized test names in config ({self.test_file}): {unknowns}' | ||
|
||
combined_tests = sorted(set(tests + self.tests_by_patterns(*patterns))) | ||
|
||
if self.unit_tests.test_only: | ||
tests = combined_tests | ||
skipped = [t for t in self.all_tests if t not in tests] | ||
elif self.unit_tests.bypass: | ||
tests = [] | ||
skipped = [] | ||
for test in self.all_tests: | ||
if test not in combined_tests: | ||
tests.append(test) | ||
else: | ||
skipped.append(test) | ||
Comment on lines
+388
to
+395
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think with a few set / list operations we can condense this. basically all_tests - skipped should be tests right? |
||
else: | ||
tests = self.all_tests | ||
skipped = [] | ||
|
||
if formatted: | ||
def fmt_tests(lt) -> List[str]: | ||
raw = [t.rsplit('.', maxsplit=2) for t in lt] | ||
ft = [] | ||
for test in raw: | ||
path, clazz, method = test | ||
path = f'{path.replace(".", os.path.sep)}.py' | ||
ft.append('::'.join([path, clazz, method])) | ||
return ft | ||
|
||
return fmt_tests(tests), fmt_tests(skipped) | ||
else: | ||
return tests, skipped | ||
Comment on lines
+401
to
+412
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we break out this to a staticmethod of the class w/doc strings, renamed function arg, + doc string etc. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good idea 👍 |
||
|
||
def check_skip_by_rule_id(self, rule_id: str) -> bool: | ||
"""Check if a rule_id should be skipped.""" | ||
bypass = self.rule_validation.bypass | ||
test_only = self.rule_validation.test_only | ||
if not (bypass or test_only): | ||
return False | ||
return (bypass and rule_id in bypass) or (test_only and rule_id not in test_only) | ||
Comment on lines
+418
to
+420
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For readability, do we want to expand/refactor this? e.g. if self.rule_validation.bypass and rule_id in self.rule_validation.bypass:
return True
if self.rule_validation.test_only and rule_id not in self.rule_validation.test_only:
return True
return False or even, close to as-is bypass_tests = self.rule_validation.bypass
test_only_tests = self.rule_validation.test_only
if bypass_tests and rule_id in bypass_tests:
return True
if test_only_tests and rule_id not in test_only_tests:
return True
return False |
||
|
||
|
||
@dataclass | ||
class RulesConfig: | ||
"""Detection rules config file.""" | ||
|
@@ -307,27 +431,47 @@ class RulesConfig: | |
stack_schema_map: Dict[str, dict] | ||
version_lock_file: Path | ||
version_lock: Dict[str, dict] | ||
test_config: TestConfig | ||
|
||
|
||
@cached | ||
def parse_rules_config(path: Optional[Path] = None) -> RulesConfig: | ||
"""Parse the _config.yaml file for default or custom rules.""" | ||
if path: | ||
assert path.exists(), f'rules config file does not exist: {path}' | ||
loaded = yaml.safe_load(path.read_text())['files'] | ||
loaded = yaml.safe_load(path.read_text()) | ||
elif CUSTOM_RULES_DIR: | ||
path = Path(CUSTOM_RULES_DIR) / '_config.yaml' | ||
assert path.exists(), f'_config.yaml file missing in {CUSTOM_RULES_DIR}' | ||
loaded = yaml.safe_load(path.read_text())['files'] | ||
loaded = yaml.safe_load(path.read_text()) | ||
else: | ||
path = Path(get_etc_path('_config.yaml')) | ||
loaded = load_etc_dump('_config.yaml')['files'] | ||
loaded = load_etc_dump('_config.yaml') | ||
|
||
base_dir = path.resolve().parent | ||
files = {f'{k}_file': base_dir.joinpath(v) for k, v in loaded.items()} | ||
contents = {k: load_dump(str(base_dir.joinpath(v))) for k, v in loaded.items()} | ||
|
||
# precedence to the environment variable | ||
# environment variable is absolute path and config file is relative to the _config.yaml file | ||
test_config_ev = os.getenv('DETECTION_RULES_TEST_CONFIG', None) | ||
if test_config_ev: | ||
test_config_path = Path(test_config_ev) | ||
else: | ||
test_config_file = loaded.get('testing', {}).get('config') | ||
if test_config_file: | ||
test_config_path = base_dir.joinpath(test_config_file) | ||
else: | ||
test_config_path = None | ||
|
||
if test_config_path: | ||
test_config_data = yaml.safe_load(test_config_path.read_text()) | ||
test_config = TestConfig.from_dict(test_file=test_config_path, **test_config_data) | ||
else: | ||
test_config = TestConfig.from_dict() | ||
|
||
files = {f'{k}_file': base_dir.joinpath(v) for k, v in loaded['files'].items()} | ||
contents = {k: load_dump(str(base_dir.joinpath(v))) for k, v in loaded['files'].items()} | ||
contents.update(**files) | ||
rules_config = RulesConfig(**contents) | ||
rules_config = RulesConfig(test_config=test_config, **contents) | ||
return rules_config | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may need to specify a custom test path