Skip to content

Commit

Permalink
Merge pull request #5 from brokensound77/decoupled-exceptions-and-act…
Browse files Browse the repository at this point in the history
…ions

[FR] Add support to decouple actions and exceptions
  • Loading branch information
brokensound77 authored Mar 13, 2024
2 parents 89c3220 + f319f78 commit 1c327bd
Show file tree
Hide file tree
Showing 6 changed files with 417 additions and 0 deletions.
62 changes: 62 additions & 0 deletions detection_rules/action.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.

"""Dataclasses for Action."""
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional

from .mixins import MarshmallowDataclassMixin
from .schemas import definitions


@dataclass(frozen=True)
class ActionMeta(MarshmallowDataclassMixin):
"""Data stored in an exception's [metadata] section of TOML."""
creation_date: definitions.Date
rule_id: definitions.UUIDString
rule_name: str
updated_date: definitions.Date

# Optional fields
deprecation_date: Optional[definitions.Date]
comments: Optional[str]
maturity: Optional[definitions.Maturity]


@dataclass
class Action(MarshmallowDataclassMixin):
"""Data object for rule Action."""
@dataclass
class ActionParams:
body: str

action_type_id: str
group: str
params: ActionParams
id: Optional[str]
frequency: Optional[dict]


@dataclass(frozen=True)
class TOMLActionContents(MarshmallowDataclassMixin):
"""Object for action from TOML file."""
metadata: ActionMeta
actions: List[Action]


@dataclass(frozen=True)
class TOMLAction:
"""Object for action from TOML file."""
contents: TOMLActionContents
path: Path

@property
def name(self):
return self.contents.metadata.rule_name

@property
def id(self):
return self.contents.metadata.rule_id
9 changes: 9 additions & 0 deletions detection_rules/etc/_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ files:
packages: packages.yml
stack_schema_map: stack-schema-map.yaml
version_lock: version.lock.json
# directories:
# actions_dir: exceptions
# exceptions_dir: actions

# to set up a custom rules directory, copy this file to the root of the custom rules directory, which is set
# using the environment variable DETECTION_RULES_DIR
Expand All @@ -18,6 +21,12 @@ files:
# ├── packages.yml
# ├── stack-schema-map.yaml
# └── version.lock.json
# └── actions
## ├── action_1.toml
## ├── action_2.toml
# └── exceptions
## ├── exception_1.toml
## ├── exception_2.toml
#
# update custom-rules/_config.yaml with:
# deprecated_rules: etc/deprecated_rules.json
Expand Down
143 changes: 143 additions & 0 deletions detection_rules/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
"""Rule exceptions data."""
from dataclasses import dataclass
from pathlib import Path
from typing import List, Literal, Optional, Union

from marshmallow import validates_schema, ValidationError

from .mixins import MarshmallowDataclassMixin
from .schemas import definitions


# https://www.elastic.co/guide/en/security/current/exceptions-api-overview.html

@dataclass(frozen=True)
class ExceptionMeta(MarshmallowDataclassMixin):
"""Data stored in an exception's [metadata] section of TOML."""
creation_date: definitions.Date
rule_id: definitions.UUIDString
rule_name: str
updated_date: definitions.Date

# Optional fields
deprecation_date: Optional[definitions.Date]
comments: Optional[str]
maturity: Optional[definitions.Maturity]


@dataclass(frozen=True)
class BaseExceptionItemEntry(MarshmallowDataclassMixin):
field: str
type: definitions.ExceptionEntryType


@dataclass(frozen=True)
class NestedExceptionItemEntry(BaseExceptionItemEntry, MarshmallowDataclassMixin):
entries: List['ExceptionItemEntry']

@validates_schema
def validate_nested_entry(self, data: dict, **kwargs):
if data.get('list') is not None:
raise ValidationError('Nested entries cannot define a list')


@dataclass(frozen=True)
class ExceptionItemEntry(BaseExceptionItemEntry, MarshmallowDataclassMixin):
@dataclass(frozen=True)
class ListObject:
id: definitions.UUIDString
type: definitions.EsDataTypes

list: Optional[ListObject]
operator: definitions.ExceptionEntryOperator
value: Optional[Union[str, List[str]]]

@validates_schema
def validate_entry(self, data: dict, **kwargs):
value = data.get('value', '')
if data['type'] in ('exists', 'list') and value is not None:
raise ValidationError(f'Entry of type {data["type"]} cannot have a value')
elif data['type'] in ('match', 'wildcard') and not isinstance(value, str):
raise ValidationError(f'Entry of type {data["type"]} must have a string value')
elif data['type'] == 'match_any' and not isinstance(value, list):
raise ValidationError(f'Entry of type {data["type"]} must have a list of strings as a value')


@dataclass(frozen=True)
class ExceptionItem(MarshmallowDataclassMixin):
@dataclass(frozen=True)
class Comment:
comment: str

comments: List[Optional[Comment]]
description: str
entries: List[Union[ExceptionItemEntry, NestedExceptionItemEntry]]
list_id: str
item_id: Optional[str] # api sets field when not provided
meta: Optional[dict]
name: str
namespace_type: Optional[definitions.ExceptionNamespaceType] # defaults to "single" if not provided
tags: Optional[List[str]]
type: Literal['simple']


@dataclass(frozen=True)
class EndpointException(ExceptionItem, MarshmallowDataclassMixin):
_tags: List[definitions.ExceptionItemEndpointTags]

@validates_schema
def validate_endpoint(self, data: dict, **kwargs):
for entry in data['entries']:
if entry['operator'] == "excluded":
raise ValidationError("Endpoint exceptions cannot have an `excluded` operator")


@dataclass(frozen=True)
class DetectionException(ExceptionItem, MarshmallowDataclassMixin):
expire_time: Optional[str] # fields.DateTime] # maybe this is isoformat?


@dataclass(frozen=True)
class ExceptionContainer(MarshmallowDataclassMixin):
description: str
list_id: Optional[str]
meta: Optional[dict]
name: str
namespace_type: Optional[definitions.ExceptionNamespaceType]
tags: Optional[List[str]]
type: definitions.ExceptionContainerType

def to_rule_entry(self) -> dict:
"""Returns a dict of the format required in rule.exception_list."""
# requires KSO id to be consider valid structure
return dict(namespace_type=self.namespace_type, type=self.type, list_id=self.list_id)


@dataclass(frozen=True)
class Data(MarshmallowDataclassMixin):
container: ExceptionContainer
items: List[DetectionException] # Union[DetectionException, EndpointException]]


@dataclass(frozen=True)
class TOMLExceptionContents(MarshmallowDataclassMixin):
metadata: ExceptionMeta
exceptions: List[Data]


@dataclass(frozen=True)
class TOMLException:
contents: TOMLExceptionContents
path: Optional[Path] = None

@property
def name(self):
return self.contents.metadata.rule_name

@property
def id(self):
return self.contents.metadata.rule_id
Loading

0 comments on commit 1c327bd

Please sign in to comment.