Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FR] Adding Support for missing_field_strategy Field in Alert Suppression #3201

Merged
merged 13 commits into from
Oct 19, 2023
Merged
13 changes: 11 additions & 2 deletions detection_rules/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ class AlertSuppressionDuration:
value: int

group_by: List[definitions.NonEmptyStr]
duration: Optional[AlertSuppressionDuration] = field(metadata=dict(metadata=dict(min_compat="8.7")))
duration: Optional[AlertSuppressionDuration]
missing_fields_strategy: definitions.AlertSuppressionMissing


@dataclass(frozen=True)
Expand All @@ -247,7 +248,6 @@ class RelatedIntegrations:
integration: Optional[definitions.NonEmptyStr]

actions: Optional[list]
alert_suppression: Optional[AlertSuppressionMapping] = field(metadata=dict(metadata=dict(min_compat="8.6")))
author: List[str]
building_block_type: Optional[definitions.BuildingBlockType]
description: str
Expand Down Expand Up @@ -561,6 +561,7 @@ class QueryRuleData(BaseRuleData):
index: Optional[List[str]]
query: str
language: definitions.FilterLanguages
alert_suppression: Optional[AlertSuppressionMapping] = field(metadata=dict(metadata=dict(min_compat="8.8")))
terrancedejesus marked this conversation as resolved.
Show resolved Hide resolved

@cached_property
def validator(self) -> Optional[QueryValidator]:
Expand Down Expand Up @@ -592,6 +593,14 @@ def get_required_fields(self, index: str) -> List[dict]:
if validator is not None:
return validator.get_required_fields(index or [])

@validates_schema
def validate_exceptions(self, data, **kwargs):
"""Custom validation for query rule type and subclasses."""

# alert suppression is only valid for query rule type and not any of its subclasses
if data.get('alert_suppression') and data['type'] != 'query':
raise ValidationError("Alert suppression is only valid for query rule type.")


@dataclass(frozen=True)
class MachineLearningRuleData(BaseRuleData):
Expand Down
2 changes: 2 additions & 0 deletions detection_rules/schemas/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@

MACHINE_LEARNING_PACKAGES = ['LMD', 'DGA', 'DED', 'ProblemChild', 'Beaconing']

AlertSuppressionMissing = NewType('AlertSuppressionMissing', str,
validate=validate.OneOf(['suppress', 'doNotSuppress']))
NonEmptyStr = NewType('NonEmptyStr', str, validate=validate.Length(min=1))
TimeUnits = Literal['s', 'm', 'h']
BranchVer = NewType('BranchVer', str, validate=validate.Regexp(BRANCH_PATTERN))
Expand Down
40 changes: 5 additions & 35 deletions tests/test_all_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -1276,22 +1276,22 @@ def test_if_plugins_explicitly_defined(self):
class TestAlertSuppression(BaseRuleTest):
"""Test rule alert suppression."""

@unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.6.0"),
@unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.8.0"),
"Test only applicable to 8.6+ stacks for rule alert suppression feature.")
def test_group_length(self):
"""Test to ensure the rule alert suppression group_by does not exceed 3 elements."""
for rule in self.production_rules:
if rule.contents.data.alert_suppression:
if rule.contents.data.get('alert_suppression'):
group_length = len(rule.contents.data.alert_suppression.group_by)
if group_length > 3:
self.fail(f'{self.rule_str(rule)} has rule alert suppression with more than 3 elements.')

@unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.6.0"),
"Test only applicable to 8.6+ stacks for rule alert suppression feature.")
@unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.8.0"),
"Test only applicable to 8.8+ stacks for rule alert suppression feature.")
def test_group_field_in_schemas(self):
"""Test to ensure the fields are defined is in ECS/Beats/Integrations schema."""
for rule in self.production_rules:
if rule.contents.data.alert_suppression:
if rule.contents.data.get('alert_suppression'):
group_by_fields = rule.contents.data.alert_suppression.group_by
min_stack_version = rule.contents.metadata.get("min_stack_version")
if min_stack_version is None:
Expand All @@ -1316,33 +1316,3 @@ def test_group_field_in_schemas(self):
if fld not in schema.keys():
self.fail(f"{self.rule_str(rule)} alert suppression field {fld} not \
found in ECS, Beats, or non-ecs schemas")

@unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.6.0"),
"Test only applicable to 8.6+ stacks for rule alert suppression feature.")
def test_stack_version(self):
"""Test to ensure the stack version is 8.6+"""
for rule in self.production_rules:
if rule.contents.data.alert_suppression:
per_time = rule.contents.data.alert_suppression.get("duration", None)
min_stack_version = rule.contents.metadata.get("min_stack_version")
if min_stack_version is None:
min_stack_version = Version.parse(load_current_package_version(), optional_minor_and_patch=True)
else:
min_stack_version = Version.parse(min_stack_version)
if not per_time and min_stack_version < Version.parse("8.6.0"):
self.fail(f'{self.rule_str(rule)} has rule alert suppression but \
min_stack is not 8.6+')
elif per_time and min_stack_version < Version.parse("8.7.0"):
self.fail(f'{self.rule_str(rule)} has rule alert suppression with \
per time but min_stack is not 8.7+')

@unittest.skipIf(PACKAGE_STACK_VERSION < Version.parse("8.6.0"),
"Test only applicable to 8.6+ stacks for rule alert suppression feature.")
def test_query_type(self):
"""Test to ensure the query type is KQL only."""
for rule in self.production_rules:
if rule.contents.data.alert_suppression:
rule_type = rule.contents.data.language
if rule_type != 'kuery':
self.fail(f'{self.rule_str(rule)} has rule alert suppression with \
but query language is not KQL')
Loading