Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support dict merging in fieldmanager and add_fields_to method #737

Merged
merged 10 commits into from
Jan 3, 2025
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
the list is now fixed inside the packaged logprep
* remove SQL feature from `generic_adder`, fields can only be added from rule config or from file
* use a single rule tree instead of a generic and a specific rule tree
* replace the `extend_target_list` parameter with `merge_with_target` for improved naming clarity
and functionality across `FieldManager` based processors (e.g., `FieldManager`, `Clusterer`,
`GenericAdder`).

### Features

Expand All @@ -34,6 +37,7 @@ the list is now fixed inside the packaged logprep
* fix wrong documentation for `timestamp_differ`
* add container signatures to images build in ci pipeline
* add sbom to images build in ci pipeline
* `FieldManager` supports merging dictionaries

### Bugfix

Expand Down
4 changes: 2 additions & 2 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,8 @@ def _write_target_field(self, event: dict, rule: "Rule", result: any) -> None:
add_fields_to(
event,
fields={rule.target_field: result},
extends_lists=rule.extend_target_list,
overwrite_target_field=rule.overwrite_target,
merge_with_target=rule.merge_with_target,
overwrite_target=rule.overwrite_target,
)

def setup(self):
Expand Down
2 changes: 1 addition & 1 deletion logprep/processor/calculator/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class Config(FieldManagerRule.Config):
"""The calculation expression. Fields from the event can be used by
surrounding them with :code:`${` and :code:`}`."""
source_fields: list = field(factory=list, init=False, repr=False, eq=False)
extend_target_list: bool = field(validator=validators.instance_of(bool), default=False)
merge_with_target: bool = field(validator=validators.instance_of(bool), default=False)
"""If the target field exists and is a list, the list will be extended with the values
of the source fields.
"""
Expand Down
4 changes: 2 additions & 2 deletions logprep/processor/clusterer/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ def _cluster(self, event: dict, rule: ClustererRule):
add_fields_to(
event,
fields={self._config.output_field_name: cluster_signature},
extends_lists=rule.extend_target_list,
overwrite_target_field=rule.overwrite_target,
merge_with_target=rule.merge_with_target,
overwrite_target=rule.overwrite_target,
)
self._last_non_extracted_signature = sig_text

Expand Down
4 changes: 1 addition & 3 deletions logprep/processor/dissector/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ def _apply_convert_datatype(self, event, rule):
for target_field, converter in rule.convert_actions:
try:
target_value = converter(get_dotted_field_value(event, target_field))
add_fields_to(
event, {target_field: target_value}, rule, overwrite_target_field=True
)
add_fields_to(event, {target_field: target_value}, rule, overwrite_target=True)
except ValueError as error:
self._handle_warning_error(event, rule, error)
2 changes: 1 addition & 1 deletion logprep/processor/domain_label_extractor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def _apply_rules(self, event, rule: DomainLabelExtractorRule):
f"{rule.target_field}.top_level_domain": labels.suffix,
f"{rule.target_field}.subdomain": labels.subdomain,
}
add_fields_to(event, fields, rule, overwrite_target_field=rule.overwrite_target)
add_fields_to(event, fields, rule, overwrite_target=rule.overwrite_target)
else:
tagging_field.append(f"invalid_domain_in_{rule.source_fields[0].replace('.', '_')}")
add_and_overwrite(
Expand Down
2 changes: 1 addition & 1 deletion logprep/processor/domain_resolver/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,4 @@ def _store_debug_infos(self, event, requires_storing):
"cache_size": len(self._domain_ip_map.keys()),
}
}
add_fields_to(event, event_dbg, overwrite_target_field=True)
add_fields_to(event, event_dbg, overwrite_target=True)
88 changes: 20 additions & 68 deletions logprep/processor/field_manager/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,9 @@
.. automodule:: logprep.processor.field_manager.rule
"""

from collections import namedtuple

from logprep.abc.processor import Processor
from logprep.processor.field_manager.rule import FieldManagerRule
from logprep.util.helper import (
add_and_overwrite,
add_fields_to,
get_dotted_field_value,
pop_dotted_field_value,
Expand All @@ -49,7 +46,7 @@ def _apply_rules(self, event, rule):
rule.source_fields,
rule.target_field,
rule.mapping,
rule.extend_target_list,
rule.merge_with_target,
rule.overwrite_target,
)
if rule.mapping:
Expand All @@ -58,17 +55,17 @@ def _apply_rules(self, event, rule):
self._apply_single_target_processing(event, rule, rule_args)

def _apply_single_target_processing(self, event, rule, rule_args):
source_fields, target_field, _, extend_target_list, overwrite_target = rule_args
source_fields, target_field, _, merge_with_target, overwrite_target = rule_args
source_field_values = self._get_field_values(event, rule.source_fields)
self._handle_missing_fields(event, rule, source_fields, source_field_values)
source_field_values = list(filter(lambda x: x is not None, source_field_values))
if not source_field_values:
return
args = (event, target_field, source_field_values)
self._write_to_single_target(args, extend_target_list, overwrite_target, rule)
self._write_to_single_target(args, merge_with_target, overwrite_target, rule)

def _apply_mapping(self, event, rule, rule_args):
source_fields, _, mapping, extend_target_list, overwrite_target = rule_args
source_fields, _, mapping, merge_with_target, overwrite_target = rule_args
source_fields, targets = list(zip(*mapping.items()))
source_field_values = self._get_field_values(event, mapping.keys())
self._handle_missing_fields(event, rule, source_fields, source_field_values)
Expand All @@ -79,75 +76,30 @@ def _apply_mapping(self, event, rule, rule_args):
event,
dict(zip(targets, source_field_values)),
rule,
extend_target_list,
merge_with_target,
overwrite_target,
)
if rule.delete_source_fields:
for dotted_field in source_fields:
pop_dotted_field_value(event, dotted_field)

def _write_to_single_target(self, args, extend_target_list, overwrite_target, rule):
def _write_to_single_target(self, args, merge_with_target, overwrite_target, rule):
event, target_field, source_fields_values = args
target_field_value = get_dotted_field_value(event, target_field)
State = namedtuple(
"State",
["overwrite", "extend", "single_source_element", "target_is_list", "target_is_none"],
)
state = State(
overwrite=overwrite_target,
extend=extend_target_list,
single_source_element=len(source_fields_values) == 1,
target_is_list=isinstance(target_field_value, list),
target_is_none=target_field_value is None,
)
if state.single_source_element and not state.extend:
if len(source_fields_values) == 1 and not merge_with_target:
source_fields_values = source_fields_values.pop()

match state:
case State(
extend=True, overwrite=True, single_source_element=False, target_is_list=False
):
add_and_overwrite(event, fields={target_field: source_fields_values}, rule=rule)
return

case State(
extend=True,
overwrite=False,
single_source_element=False,
target_is_list=False,
target_is_none=True,
):
flattened_source_fields = self._overwrite_from_source_values(source_fields_values)
source_fields_values = [*flattened_source_fields]
add_and_overwrite(event, fields={target_field: source_fields_values}, rule=rule)
return

case State(extend=True, overwrite=False, target_is_list=False, target_is_none=True):
add_and_overwrite(event, fields={target_field: source_fields_values}, rule=rule)
return

case State(extend=True, overwrite=False, target_is_list=False):
source_fields_values = [target_field_value, *source_fields_values]
add_and_overwrite(event, fields={target_field: source_fields_values}, rule=rule)
return

case State(
extend=True, overwrite=False, single_source_element=False, target_is_list=True
):
flattened_source_fields = self._overwrite_from_source_values(source_fields_values)
source_fields_values = [*target_field_value, *flattened_source_fields]
add_and_overwrite(event, fields={target_field: source_fields_values}, rule=rule)
return

case State(overwrite=True, extend=True):
flattened_source_fields = self._overwrite_from_source_values(source_fields_values)
source_fields_values = [*flattened_source_fields]
add_and_overwrite(event, fields={target_field: source_fields_values}, rule=rule)
return

case _:
field = {target_field: source_fields_values}
add_fields_to(event, field, rule, state.extend, state.overwrite)
if not merge_with_target:
field = {target_field: source_fields_values}
add_fields_to(event, field, rule, merge_with_target, overwrite_target)
return
new_values = self._overwrite_from_source_values(source_fields_values)
if all(isinstance(element, dict) for element in new_values):
# merge a list of dicts into one dict
new_values = {key: value for dict_ in new_values for key, value in dict_.items()}
if overwrite_target:
# source values are already included in new_values, add_fields_to also does not
# allow overwrite_target=True and marge_with_target=True at the same time
merge_with_target = False
add_fields_to(event, {target_field: new_values}, rule, merge_with_target, overwrite_target)

def _overwrite_from_source_values(self, source_fields_values):
duplicates = []
Expand Down
15 changes: 9 additions & 6 deletions logprep/processor/field_manager/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
- server.nat.ip
- client.nat.ip
target_field: related.ip
extend_target_list: True
merge_with_target: True
description: '...'

.. code-block:: json
Expand Down Expand Up @@ -120,11 +120,14 @@ class Config(Rule.Config):
"""Whether to delete all the source fields or not. Defaults to :code:`False`"""
overwrite_target: bool = field(validator=validators.instance_of(bool), default=False)
"""Overwrite the target field value if exists. Defaults to :code:`False`"""
extend_target_list: bool = field(validator=validators.instance_of(bool), default=False)
merge_with_target: bool = field(validator=validators.instance_of(bool), default=False)
"""If the target field exists and is a list, the list will be extended with the values
of the source fields. If the source field is a list, the lists will be merged.
of the source fields. If the source field is a list, the lists will be merged by appending
the source fields list to the target list. If the source field is a dict, the dict will be
merged with the target dict. If the source keys exist in the target dict, the values will be
overwritten. So this is not e deep merge.
If the target field does not exist, a new field will be added with the
source field value as list. Defaults to :code:`False`.
source field value as list or dict. Defaults to :code:`False`.
"""
ignore_missing_fields: bool = field(validator=validators.instance_of(bool), default=False)
"""If set to :code:`True` missing fields will be ignored, no warning is logged and the event
Expand Down Expand Up @@ -162,8 +165,8 @@ def overwrite_target(self):
return self._config.overwrite_target

@property
def extend_target_list(self):
return self._config.extend_target_list
def merge_with_target(self):
return self._config.merge_with_target

@property
def ignore_missing_fields(self):
Expand Down
2 changes: 1 addition & 1 deletion logprep/processor/generic_adder/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ class GenericAdder(Processor):
def _apply_rules(self, event: dict, rule: GenericAdderRule):
items_to_add = rule.add
if items_to_add:
add_fields_to(event, items_to_add, rule, rule.extend_target_list, rule.overwrite_target)
add_fields_to(event, items_to_add, rule, rule.merge_with_target, rule.overwrite_target)
2 changes: 1 addition & 1 deletion logprep/processor/generic_adder/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class GenericAdderRule(FieldManagerRule):
class Config(FieldManagerRule.Config):
"""Config for GenericAdderRule"""

extend_target_list: bool = field(validator=validators.instance_of(bool), default=False)
merge_with_target: bool = field(validator=validators.instance_of(bool), default=False)
"""If the target field exists and is a list, the list will be extended with the values
of the source fields.
"""
Expand Down
10 changes: 5 additions & 5 deletions logprep/processor/generic_resolver/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
"""

from functools import cached_property, lru_cache

from typing import Optional

from attrs import define, field, validators

from logprep.abc.processor import Processor
Expand All @@ -52,7 +52,7 @@ class Config(Processor.Config):
validator=validators.optional(validators.instance_of(int)), default=1
)
"""(Optional) Cache metrics won't be updated immediately.
Instead updating is skipped for a number of events before it's next update.
Instead updating is skipped for a number of events before it's next update.
:code:`cache_metrics_interval` sets the number of events between updates (default: 1)."""

@define(kw_only=True)
Expand Down Expand Up @@ -130,15 +130,15 @@ def _apply_rules(self, event, rule):
current_content = get_dotted_field_value(event, target_field)
if isinstance(current_content, list) and content in current_content:
continue
if rule.extend_target_list and current_content is None:
if rule.merge_with_target and current_content is None:
content = [content]
try:
add_fields_to(
event,
fields={target_field: content},
rule=rule,
extends_lists=rule.extend_target_list,
overwrite_target_field=rule.overwrite_target,
merge_with_target=rule.merge_with_target,
overwrite_target=rule.overwrite_target,
)
except FieldExistsWarning as error:
conflicting_fields.extend(error.skipped_fields)
Expand Down
4 changes: 2 additions & 2 deletions logprep/processor/geoip_enricher/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,6 @@ def _apply_rules(self, event, rule):
event,
fields,
rule=rule,
extends_lists=False,
overwrite_target_field=rule.overwrite_target,
merge_with_target=False,
overwrite_target=rule.overwrite_target,
)
4 changes: 2 additions & 2 deletions logprep/processor/grokker/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ def _apply_rules(self, event: dict, rule: GrokkerRule):
event,
result,
rule=rule,
extends_lists=rule.extend_target_list,
overwrite_target_field=rule.overwrite_target,
merge_with_target=rule.merge_with_target,
overwrite_target=rule.overwrite_target,
)
if self._handle_missing_fields(event, rule, rule.actions.keys(), source_values):
return
Expand Down
6 changes: 3 additions & 3 deletions logprep/processor/hyperscan_resolver/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,15 @@ def _apply_rules(self, event: dict, rule: HyperscanResolverRule):
current_content = get_dotted_field_value(event, resolve_target)
if isinstance(current_content, list) and dest_val in current_content:
continue
if rule.extend_target_list and current_content is None:
if rule.merge_with_target and current_content is None:
dest_val = [dest_val]
try:
add_fields_to(
event,
fields={resolve_target: dest_val},
rule=rule,
extends_lists=rule.extend_target_list,
overwrite_target_field=rule.overwrite_target,
merge_with_target=rule.merge_with_target,
overwrite_target=rule.overwrite_target,
)
except FieldExistsWarning as error:
conflicting_fields.extend(error.skipped_fields)
Expand Down
4 changes: 2 additions & 2 deletions logprep/processor/labeler/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ def setup(self):
def _apply_rules(self, event, rule):
"""Applies the rule to the current event"""
fields = {key: value for key, value in rule.prefixed_label.items()}
add_fields_to(event, fields, rule=rule, extends_lists=True)
add_fields_to(event, fields, rule=rule, merge_with_target=True)
# convert sets into sorted lists
fields = {
key: sorted(set(get_dotted_field_value(event, key)))
for key, _ in rule.prefixed_label.items()
}
add_fields_to(event, fields, rule=rule, overwrite_target_field=True)
add_fields_to(event, fields, rule=rule, overwrite_target=True)
2 changes: 1 addition & 1 deletion logprep/processor/labeler/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def label(self) -> dict:

@property
def prefixed_label(self) -> dict:
return {f"label.{key}": value for key, value in self.label.items()}
return {f"label.{key}": list(value) for key, value in self.label.items()}

def conforms_to_schema(self, schema: LabelingSchema) -> bool:
"""Check if labels are valid."""
Expand Down
2 changes: 1 addition & 1 deletion logprep/processor/list_comparison/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def _apply_rules(self, event, rule):
comparison_result, comparison_key = self._list_comparison(rule, event)
if comparison_result is not None:
fields = {f"{rule.target_field}.{comparison_key}": comparison_result}
add_fields_to(event, fields, rule=rule, extends_lists=True)
add_fields_to(event, fields, rule=rule, merge_with_target=True)

def _list_comparison(self, rule: ListComparisonRule, event: dict):
"""
Expand Down
2 changes: 1 addition & 1 deletion logprep/processor/pseudonymizer/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def _apply_rules(self, event: dict, rule: PseudonymizerRule):
else:
field_value = self._pseudonymize_field(rule, dotted_field, regex, field_value)
add_fields_to(
event, fields={dotted_field: field_value}, rule=rule, overwrite_target_field=True
event, fields={dotted_field: field_value}, rule=rule, overwrite_target=True
)
if "@timestamp" in event:
for pseudonym, _ in self.result.data:
Expand Down
6 changes: 3 additions & 3 deletions logprep/processor/requester/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ def _handle_response(self, event, rule, response):
event,
fields={rule.target_field: self._get_result(response)},
rule=rule,
extends_lists=rule.extend_target_list,
overwrite_target_field=rule.overwrite_target,
merge_with_target=rule.merge_with_target,
overwrite_target=rule.overwrite_target,
)
except FieldExistsWarning as error:
conflicting_fields.extend(error.skipped_fields)
Expand All @@ -86,7 +86,7 @@ def _handle_response(self, event, rule, response):
event,
dict(zip(targets, contents)),
rule,
rule.extend_target_list,
rule.merge_with_target,
rule.overwrite_target,
)
except FieldExistsWarning as error:
Expand Down
Loading
Loading