From 522c8624fe83d4a810af2d02bf3ecf6e10d0fa80 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Mon, 31 Jan 2022 14:11:15 +0100 Subject: [PATCH] Implement `DeepUpdate` on event and use it in `add_fields` processor (#30092) We have a couple of special cases when we set fields on the event based on target field keys: 1. `@timestamp` must be set directly to the `Timestamp` property of the event 2. `@metadata.*` must be put in the `Meta` map instead of `Fields` 3. The rest is put in `Fields` Currently the `Event` struct has the `PutValue` function to account for these special cases, however, some of the processors use `event.Fields.DeepUpdate` which would not handle the special cases. To make it easier to migrate those processors, the new function `event.DeepUpdate` (that can be safely used with the special cases) is introduced. As an example, the `add_fields` processor is migrated to using this new function and its tests have been updated to assert the correct behaviour. --- CHANGELOG.next.asciidoc | 1 + libbeat/beat/event.go | 117 +++++++++++-- libbeat/beat/event_test.go | 162 ++++++++++++++++++ libbeat/processors/actions/add_fields.go | 14 +- libbeat/processors/actions/add_fields_test.go | 52 +++--- libbeat/processors/actions/add_labels_test.go | 20 +-- libbeat/processors/actions/add_tags_test.go | 32 ++-- libbeat/processors/actions/common_test.go | 18 +- 8 files changed, 344 insertions(+), 72 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index eb30942af8d..0eab8d3a464 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -33,6 +33,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Enrich kubernetes metadata with node annotations. {pull}29605[29605] - Allign kubernetes configuration settings. {pull}29908[29908] - Remove legacy support for SSLv3. {pull}30071[30071] +- `add_fields` processor is now able to set metadata in events {pull}30092[30092] *Auditbeat* diff --git a/libbeat/beat/event.go b/libbeat/beat/event.go index 4ef56042039..7fd016a14dc 100644 --- a/libbeat/beat/event.go +++ b/libbeat/beat/event.go @@ -28,6 +28,11 @@ import ( // FlagField fields used to keep information or errors when events are parsed. const FlagField = "log.flags" +const ( + timestampFieldKey = "@timestamp" + metadataFieldKey = "@metadata" +) + // Event is the common event format shared by all beats. // Every event must have a timestamp and provide encodable Fields in `Fields`. // The `Meta`-fields can be used to pass additional meta-data to the outputs. @@ -55,7 +60,7 @@ func (e *Event) SetID(id string) { } func (e *Event) GetValue(key string) (interface{}, error) { - if key == "@timestamp" { + if key == timestampFieldKey { return e.Timestamp, nil } else if subKey, ok := metadataKey(key); ok { if subKey == "" || e.Meta == nil { @@ -66,17 +71,103 @@ func (e *Event) GetValue(key string) (interface{}, error) { return e.Fields.GetValue(key) } -func (e *Event) PutValue(key string, v interface{}) (interface{}, error) { - if key == "@timestamp" { - switch ts := v.(type) { - case time.Time: - e.Timestamp = ts - case common.Time: - e.Timestamp = time.Time(ts) - default: - return nil, errNoTimestamp +// DeepUpdate recursively copies the key-value pairs from `d` to various properties of the event. +// When the key equals `@timestamp` it's set as the `Timestamp` property of the event. +// When the key equals `@metadata` the update is routed into the `Meta` map instead of `Fields` +// The rest of the keys are set to the `Fields` map. +// If the key is present and the value is a map as well, the sub-map will be updated recursively +// via `DeepUpdate`. +// `DeepUpdateNoOverwrite` is a version of this function that does not +// overwrite existing values. +func (e *Event) DeepUpdate(d common.MapStr) { + e.deepUpdate(d, true) +} + +// DeepUpdateNoOverwrite recursively copies the key-value pairs from `d` to various properties of the event. +// The `@timestamp` update is ignored due to "no overwrite" behavior. +// When the key equals `@metadata` the update is routed into the `Meta` map instead of `Fields`. +// The rest of the keys are set to the `Fields` map. +// If the key is present and the value is a map as well, the sub-map will be updated recursively +// via `DeepUpdateNoOverwrite`. +// `DeepUpdate` is a version of this function that overwrites existing values. +func (e *Event) DeepUpdateNoOverwrite(d common.MapStr) { + e.deepUpdate(d, false) +} + +func (e *Event) deepUpdate(d common.MapStr, overwrite bool) { + if len(d) == 0 { + return + } + fieldsUpdate := d.Clone() // so we can delete redundant keys + + var metaUpdate common.MapStr + + for fieldKey, value := range d { + switch fieldKey { + + // one of the updates is the timestamp which is not a part of the event fields + case timestampFieldKey: + if overwrite { + _ = e.setTimestamp(value) + } + delete(fieldsUpdate, fieldKey) + + // some updates are addressed for the metadata not the fields + case metadataFieldKey: + switch meta := value.(type) { + case common.MapStr: + metaUpdate = meta + case map[string]interface{}: + metaUpdate = common.MapStr(meta) + } + + delete(fieldsUpdate, fieldKey) } - return nil, nil + } + + if metaUpdate != nil { + if e.Meta == nil { + e.Meta = common.MapStr{} + } + if overwrite { + e.Meta.DeepUpdate(metaUpdate) + } else { + e.Meta.DeepUpdateNoOverwrite(metaUpdate) + } + } + + if len(fieldsUpdate) == 0 { + return + } + + if e.Fields == nil { + e.Fields = common.MapStr{} + } + + if overwrite { + e.Fields.DeepUpdate(fieldsUpdate) + } else { + e.Fields.DeepUpdateNoOverwrite(fieldsUpdate) + } +} + +func (e *Event) setTimestamp(v interface{}) error { + switch ts := v.(type) { + case time.Time: + e.Timestamp = ts + case common.Time: + e.Timestamp = time.Time(ts) + default: + return errNoTimestamp + } + + return nil +} + +func (e *Event) PutValue(key string, v interface{}) (interface{}, error) { + if key == timestampFieldKey { + err := e.setTimestamp(v) + return nil, err } else if subKey, ok := metadataKey(key); ok { if subKey == "" { switch meta := v.(type) { @@ -111,11 +202,11 @@ func (e *Event) Delete(key string) error { } func metadataKey(key string) (string, bool) { - if !strings.HasPrefix(key, "@metadata") { + if !strings.HasPrefix(key, metadataFieldKey) { return "", false } - subKey := key[len("@metadata"):] + subKey := key[len(metadataFieldKey):] if subKey == "" { return "", true } diff --git a/libbeat/beat/event_test.go b/libbeat/beat/event_test.go index 384ece4d1ae..2f4f226ecf9 100644 --- a/libbeat/beat/event_test.go +++ b/libbeat/beat/event_test.go @@ -48,6 +48,168 @@ func TestEventPutGetTimestamp(t *testing.T) { assert.Nil(t, evt.Fields["@timestamp"]) } +func TestDeepUpdate(t *testing.T) { + ts := time.Now() + + cases := []struct { + name string + event *Event + update common.MapStr + overwrite bool + expected *Event + }{ + { + name: "does nothing if no update", + event: &Event{}, + update: common.MapStr{}, + expected: &Event{}, + }, + { + name: "updates timestamp", + event: &Event{}, + update: common.MapStr{ + timestampFieldKey: ts, + }, + overwrite: true, + expected: &Event{ + Timestamp: ts, + }, + }, + { + name: "does not overwrite timestamp", + event: &Event{ + Timestamp: ts, + }, + update: common.MapStr{ + timestampFieldKey: time.Now().Add(time.Hour), + }, + overwrite: false, + expected: &Event{ + Timestamp: ts, + }, + }, + { + name: "initializes metadata if nil", + event: &Event{}, + update: common.MapStr{ + metadataFieldKey: common.MapStr{ + "first": "new", + "second": 42, + }, + }, + expected: &Event{ + Meta: common.MapStr{ + "first": "new", + "second": 42, + }, + }, + }, + { + name: "updates metadata but does not overwrite", + event: &Event{ + Meta: common.MapStr{ + "first": "initial", + }, + }, + update: common.MapStr{ + metadataFieldKey: common.MapStr{ + "first": "new", + "second": 42, + }, + }, + overwrite: false, + expected: &Event{ + Meta: common.MapStr{ + "first": "initial", + "second": 42, + }, + }, + }, + { + name: "updates metadata and overwrites", + event: &Event{ + Meta: common.MapStr{ + "first": "initial", + }, + }, + update: common.MapStr{ + metadataFieldKey: common.MapStr{ + "first": "new", + "second": 42, + }, + }, + overwrite: true, + expected: &Event{ + Meta: common.MapStr{ + "first": "new", + "second": 42, + }, + }, + }, + { + name: "updates fields but does not overwrite", + event: &Event{ + Fields: common.MapStr{ + "first": "initial", + }, + }, + update: common.MapStr{ + "first": "new", + "second": 42, + }, + overwrite: false, + expected: &Event{ + Fields: common.MapStr{ + "first": "initial", + "second": 42, + }, + }, + }, + { + name: "updates metadata and overwrites", + event: &Event{ + Fields: common.MapStr{ + "first": "initial", + }, + }, + update: common.MapStr{ + "first": "new", + "second": 42, + }, + overwrite: true, + expected: &Event{ + Fields: common.MapStr{ + "first": "new", + "second": 42, + }, + }, + }, + { + name: "initializes fields if nil", + event: &Event{}, + update: common.MapStr{ + "first": "new", + "second": 42, + }, + expected: &Event{ + Fields: common.MapStr{ + "first": "new", + "second": 42, + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + tc.event.deepUpdate(tc.update, tc.overwrite) + assert.Equal(t, tc.expected.Timestamp, tc.event.Timestamp) + assert.Equal(t, tc.expected.Fields, tc.event.Fields) + assert.Equal(t, tc.expected.Meta, tc.event.Meta) + }) + } +} + func TestEventMetadata(t *testing.T) { const id = "123" newMeta := func() common.MapStr { return common.MapStr{"_id": id} } diff --git a/libbeat/processors/actions/add_fields.go b/libbeat/processors/actions/add_fields.go index d2dd2d6e867..42fc2f2b443 100644 --- a/libbeat/processors/actions/add_fields.go +++ b/libbeat/processors/actions/add_fields.go @@ -72,17 +72,19 @@ func NewAddFields(fields common.MapStr, shared bool, overwrite bool) processors. } func (af *addFields) Run(event *beat.Event) (*beat.Event, error) { + if event == nil || len(af.fields) == 0 { + return event, nil + } + fields := af.fields - if af.shared || event.Fields == nil { + if af.shared { fields = fields.Clone() } - if event.Fields == nil { - event.Fields = fields - } else if af.overwrite { - event.Fields.DeepUpdate(fields) + if af.overwrite { + event.DeepUpdate(fields) } else { - event.Fields.DeepUpdateNoOverwrite(fields) + event.DeepUpdateNoOverwrite(fields) } return event, nil diff --git a/libbeat/processors/actions/add_fields_test.go b/libbeat/processors/actions/add_fields_test.go index 41a0e75d5c9..44bebb949b8 100644 --- a/libbeat/processors/actions/add_fields_test.go +++ b/libbeat/processors/actions/add_fields_test.go @@ -29,38 +29,48 @@ func TestAddFields(t *testing.T) { testProcessors(t, map[string]testCase{ "add field": { - event: common.MapStr{}, - want: common.MapStr{ + eventFields: common.MapStr{}, + wantFields: common.MapStr{ "fields": common.MapStr{"field": "test"}, }, cfg: single(`{add_fields: {fields: {field: test}}}`), }, "custom target": { - event: common.MapStr{}, - want: common.MapStr{ + eventFields: common.MapStr{}, + wantFields: common.MapStr{ "my": common.MapStr{"field": "test"}, }, cfg: single(`{add_fields: {target: my, fields: {field: test}}}`), }, "overwrite existing field": { - event: common.MapStr{ + eventFields: common.MapStr{ "fields": common.MapStr{"field": "old"}, }, - want: common.MapStr{"fields": common.MapStr{"field": "test"}}, - cfg: single(`{add_fields: {fields: {field: test}}}`), + wantFields: common.MapStr{"fields": common.MapStr{"field": "test"}}, + cfg: single(`{add_fields: {fields: {field: test}}}`), + }, + "merge with existing meta": { + eventMeta: common.MapStr{ + "_id": "unique", + }, + wantMeta: common.MapStr{ + "_id": "unique", + "op_type": "index", + }, + cfg: single(`{add_fields: {target: "@metadata", fields: {op_type: "index"}}}`), }, "merge with existing fields": { - event: common.MapStr{ + eventFields: common.MapStr{ "fields": common.MapStr{"existing": "a"}, }, - want: common.MapStr{ + wantFields: common.MapStr{ "fields": common.MapStr{"existing": "a", "field": "test"}, }, cfg: single(`{add_fields: {fields: {field: test}}}`), }, "combine 2 processors": { - event: common.MapStr{}, - want: common.MapStr{ + eventFields: common.MapStr{}, + wantFields: common.MapStr{ "fields": common.MapStr{ "l1": "a", "l2": "b", @@ -72,8 +82,8 @@ func TestAddFields(t *testing.T) { ), }, "different targets": { - event: common.MapStr{}, - want: common.MapStr{ + eventFields: common.MapStr{}, + wantFields: common.MapStr{ "a": common.MapStr{"l1": "a"}, "b": common.MapStr{"l2": "b"}, }, @@ -83,8 +93,8 @@ func TestAddFields(t *testing.T) { ), }, "under root": { - event: common.MapStr{}, - want: common.MapStr{ + eventFields: common.MapStr{}, + wantFields: common.MapStr{ "a": common.MapStr{"b": "test"}, }, cfg: single( @@ -92,10 +102,10 @@ func TestAddFields(t *testing.T) { ), }, "merge under root": { - event: common.MapStr{ + eventFields: common.MapStr{ "a": common.MapStr{"old": "value"}, }, - want: common.MapStr{ + wantFields: common.MapStr{ "a": common.MapStr{"old": "value", "new": "test"}, }, cfg: single( @@ -103,10 +113,10 @@ func TestAddFields(t *testing.T) { ), }, "overwrite existing under root": { - event: common.MapStr{ + eventFields: common.MapStr{ "a": common.MapStr{"keep": "value", "change": "a"}, }, - want: common.MapStr{ + wantFields: common.MapStr{ "a": common.MapStr{"keep": "value", "change": "b"}, }, cfg: single( @@ -114,8 +124,8 @@ func TestAddFields(t *testing.T) { ), }, "add fields to nil event": { - event: nil, - want: common.MapStr{ + eventFields: nil, + wantFields: common.MapStr{ "fields": common.MapStr{"field": "test"}, }, cfg: single(`{add_fields: {fields: {field: test}}}`), diff --git a/libbeat/processors/actions/add_labels_test.go b/libbeat/processors/actions/add_labels_test.go index ca9dd22a528..2b7b30aab15 100644 --- a/libbeat/processors/actions/add_labels_test.go +++ b/libbeat/processors/actions/add_labels_test.go @@ -29,29 +29,29 @@ func TestAddLabels(t *testing.T) { testProcessors(t, map[string]testCase{ "add label": { - event: common.MapStr{}, - want: common.MapStr{ + eventFields: common.MapStr{}, + wantFields: common.MapStr{ "labels": common.MapStr{"label": "test"}, }, cfg: single(`{add_labels: {labels: {label: test}}}`), }, "add dotted label": { - event: common.MapStr{}, - want: common.MapStr{ + eventFields: common.MapStr{}, + wantFields: common.MapStr{ "labels": common.MapStr{"a.b": "test"}, }, cfg: single(`{add_labels: {labels: {a.b: test}}}`), }, "add nested labels": { - event: common.MapStr{}, - want: common.MapStr{ + eventFields: common.MapStr{}, + wantFields: common.MapStr{ "labels": common.MapStr{"a.b": "test", "a.c": "test2"}, }, cfg: single(`{add_labels: {labels: {a: {b: test, c: test2}}}}`), }, "merge labels": { - event: common.MapStr{}, - want: common.MapStr{ + eventFields: common.MapStr{}, + wantFields: common.MapStr{ "labels": common.MapStr{"l1": "a", "l2": "b", "lc": "b"}, }, cfg: multi( @@ -60,8 +60,8 @@ func TestAddLabels(t *testing.T) { ), }, "add array": { - event: common.MapStr{}, - want: common.MapStr{ + eventFields: common.MapStr{}, + wantFields: common.MapStr{ "labels": common.MapStr{ "array.0": "foo", "array.1": "bar", diff --git a/libbeat/processors/actions/add_tags_test.go b/libbeat/processors/actions/add_tags_test.go index 9ec78b927b1..920fde65306 100644 --- a/libbeat/processors/actions/add_tags_test.go +++ b/libbeat/processors/actions/add_tags_test.go @@ -29,40 +29,40 @@ func TestAddTags(t *testing.T) { testProcessors(t, map[string]testCase{ "create tags": { - event: common.MapStr{}, - want: common.MapStr{"tags": []string{"t1", "t2"}}, - cfg: single(`{add_tags: {tags: [t1, t2]}}`), + eventFields: common.MapStr{}, + wantFields: common.MapStr{"tags": []string{"t1", "t2"}}, + cfg: single(`{add_tags: {tags: [t1, t2]}}`), }, "append to tags": { - event: common.MapStr{"tags": []string{"t1"}}, - want: common.MapStr{"tags": []string{"t1", "t2", "t3"}}, - cfg: single(`{add_tags: {tags: [t2, t3]}}`), + eventFields: common.MapStr{"tags": []string{"t1"}}, + wantFields: common.MapStr{"tags": []string{"t1", "t2", "t3"}}, + cfg: single(`{add_tags: {tags: [t2, t3]}}`), }, "combine from 2 processors": { - event: common.MapStr{}, - want: common.MapStr{"tags": []string{"t1", "t2", "t3", "t4"}}, + eventFields: common.MapStr{}, + wantFields: common.MapStr{"tags": []string{"t1", "t2", "t3", "t4"}}, cfg: multi( `{add_tags: {tags: [t1, t2]}}`, `{add_tags: {tags: [t3, t4]}}`, ), }, "with custom target": { - event: common.MapStr{}, - want: common.MapStr{"custom": []string{"t1", "t2"}}, - cfg: single(`{add_tags: {tags: [t1, t2], target: custom}}`), + eventFields: common.MapStr{}, + wantFields: common.MapStr{"custom": []string{"t1", "t2"}}, + cfg: single(`{add_tags: {tags: [t1, t2], target: custom}}`), }, "different targets": { - event: common.MapStr{}, - want: common.MapStr{"tags1": []string{"t1"}, "tags2": []string{"t2"}}, + eventFields: common.MapStr{}, + wantFields: common.MapStr{"tags1": []string{"t1"}, "tags2": []string{"t2"}}, cfg: multi( `{add_tags: {target: tags1, tags: [t1]}}`, `{add_tags: {target: tags2, tags: [t2]}}`, ), }, "single tag config without array notation": { - event: common.MapStr{}, - want: common.MapStr{"tags": []string{"t1"}}, - cfg: single(`{add_tags: {tags: t1}}`), + eventFields: common.MapStr{}, + wantFields: common.MapStr{"tags": []string{"t1"}}, + cfg: single(`{add_tags: {tags: t1}}`), }, }) } diff --git a/libbeat/processors/actions/common_test.go b/libbeat/processors/actions/common_test.go index f733c90450a..b4d30bfa9cd 100644 --- a/libbeat/processors/actions/common_test.go +++ b/libbeat/processors/actions/common_test.go @@ -28,9 +28,11 @@ import ( ) type testCase struct { - event common.MapStr - want common.MapStr - cfg []string + eventFields common.MapStr + eventMeta common.MapStr + wantFields common.MapStr + wantMeta common.MapStr + cfg []string } func testProcessors(t *testing.T, cases map[string]testCase) { @@ -51,8 +53,11 @@ func testProcessors(t *testing.T, cases map[string]testCase) { } current := &beat.Event{} - if test.event != nil { - current.Fields = test.event.Clone() + if test.eventFields != nil { + current.Fields = test.eventFields.Clone() + } + if test.eventMeta != nil { + current.Meta = test.eventMeta.Clone() } for i, processor := range ps { var err error @@ -65,7 +70,8 @@ func testProcessors(t *testing.T, cases map[string]testCase) { } } - assert.Equal(t, test.want, current.Fields) + assert.Equal(t, test.wantFields, current.Fields) + assert.Equal(t, test.wantMeta, current.Meta) }) } }