Skip to content

Commit

Permalink
Implement DeepUpdate on event and use it in add_fields processor (#…
Browse files Browse the repository at this point in the history
…30092)

We have a couple of special cases when we set fields on the event
based on target field keys:

1. `@timestamp` must be set directly to the `Timestamp` property of
the event
2. `@metadata.*` must be put in the `Meta` map instead of `Fields`
3. The rest is put in `Fields`

Currently the `Event` struct has the `PutValue` function to account for
these special cases, however, some of the processors use
`event.Fields.DeepUpdate` which would not handle the special cases.

To make it easier to migrate those processors, the new function
`event.DeepUpdate` (that can be safely used with the
special cases) is introduced.

As an example, the `add_fields` processor is migrated to using this
new function and its tests have been updated to assert the correct behaviour.
  • Loading branch information
rdner authored Jan 31, 2022
1 parent 16fded1 commit 522c862
Show file tree
Hide file tree
Showing 8 changed files with 344 additions and 72 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Enrich kubernetes metadata with node annotations. {pull}29605[29605]
- Allign kubernetes configuration settings. {pull}29908[29908]
- Remove legacy support for SSLv3. {pull}30071[30071]
- `add_fields` processor is now able to set metadata in events {pull}30092[30092]

*Auditbeat*

Expand Down
117 changes: 104 additions & 13 deletions libbeat/beat/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ import (
// FlagField fields used to keep information or errors when events are parsed.
const FlagField = "log.flags"

const (
timestampFieldKey = "@timestamp"
metadataFieldKey = "@metadata"
)

// Event is the common event format shared by all beats.
// Every event must have a timestamp and provide encodable Fields in `Fields`.
// The `Meta`-fields can be used to pass additional meta-data to the outputs.
Expand Down Expand Up @@ -55,7 +60,7 @@ func (e *Event) SetID(id string) {
}

func (e *Event) GetValue(key string) (interface{}, error) {
if key == "@timestamp" {
if key == timestampFieldKey {
return e.Timestamp, nil
} else if subKey, ok := metadataKey(key); ok {
if subKey == "" || e.Meta == nil {
Expand All @@ -66,17 +71,103 @@ func (e *Event) GetValue(key string) (interface{}, error) {
return e.Fields.GetValue(key)
}

func (e *Event) PutValue(key string, v interface{}) (interface{}, error) {
if key == "@timestamp" {
switch ts := v.(type) {
case time.Time:
e.Timestamp = ts
case common.Time:
e.Timestamp = time.Time(ts)
default:
return nil, errNoTimestamp
// DeepUpdate recursively copies the key-value pairs from `d` to various properties of the event.
// When the key equals `@timestamp` it's set as the `Timestamp` property of the event.
// When the key equals `@metadata` the update is routed into the `Meta` map instead of `Fields`
// The rest of the keys are set to the `Fields` map.
// If the key is present and the value is a map as well, the sub-map will be updated recursively
// via `DeepUpdate`.
// `DeepUpdateNoOverwrite` is a version of this function that does not
// overwrite existing values.
func (e *Event) DeepUpdate(d common.MapStr) {
e.deepUpdate(d, true)
}

// DeepUpdateNoOverwrite recursively copies the key-value pairs from `d` to various properties of the event.
// The `@timestamp` update is ignored due to "no overwrite" behavior.
// When the key equals `@metadata` the update is routed into the `Meta` map instead of `Fields`.
// The rest of the keys are set to the `Fields` map.
// If the key is present and the value is a map as well, the sub-map will be updated recursively
// via `DeepUpdateNoOverwrite`.
// `DeepUpdate` is a version of this function that overwrites existing values.
func (e *Event) DeepUpdateNoOverwrite(d common.MapStr) {
e.deepUpdate(d, false)
}

func (e *Event) deepUpdate(d common.MapStr, overwrite bool) {
if len(d) == 0 {
return
}
fieldsUpdate := d.Clone() // so we can delete redundant keys

var metaUpdate common.MapStr

for fieldKey, value := range d {
switch fieldKey {

// one of the updates is the timestamp which is not a part of the event fields
case timestampFieldKey:
if overwrite {
_ = e.setTimestamp(value)
}
delete(fieldsUpdate, fieldKey)

// some updates are addressed for the metadata not the fields
case metadataFieldKey:
switch meta := value.(type) {
case common.MapStr:
metaUpdate = meta
case map[string]interface{}:
metaUpdate = common.MapStr(meta)
}

delete(fieldsUpdate, fieldKey)
}
return nil, nil
}

if metaUpdate != nil {
if e.Meta == nil {
e.Meta = common.MapStr{}
}
if overwrite {
e.Meta.DeepUpdate(metaUpdate)
} else {
e.Meta.DeepUpdateNoOverwrite(metaUpdate)
}
}

if len(fieldsUpdate) == 0 {
return
}

if e.Fields == nil {
e.Fields = common.MapStr{}
}

if overwrite {
e.Fields.DeepUpdate(fieldsUpdate)
} else {
e.Fields.DeepUpdateNoOverwrite(fieldsUpdate)
}
}

func (e *Event) setTimestamp(v interface{}) error {
switch ts := v.(type) {
case time.Time:
e.Timestamp = ts
case common.Time:
e.Timestamp = time.Time(ts)
default:
return errNoTimestamp
}

return nil
}

func (e *Event) PutValue(key string, v interface{}) (interface{}, error) {
if key == timestampFieldKey {
err := e.setTimestamp(v)
return nil, err
} else if subKey, ok := metadataKey(key); ok {
if subKey == "" {
switch meta := v.(type) {
Expand Down Expand Up @@ -111,11 +202,11 @@ func (e *Event) Delete(key string) error {
}

func metadataKey(key string) (string, bool) {
if !strings.HasPrefix(key, "@metadata") {
if !strings.HasPrefix(key, metadataFieldKey) {
return "", false
}

subKey := key[len("@metadata"):]
subKey := key[len(metadataFieldKey):]
if subKey == "" {
return "", true
}
Expand Down
162 changes: 162 additions & 0 deletions libbeat/beat/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,168 @@ func TestEventPutGetTimestamp(t *testing.T) {
assert.Nil(t, evt.Fields["@timestamp"])
}

func TestDeepUpdate(t *testing.T) {
ts := time.Now()

cases := []struct {
name string
event *Event
update common.MapStr
overwrite bool
expected *Event
}{
{
name: "does nothing if no update",
event: &Event{},
update: common.MapStr{},
expected: &Event{},
},
{
name: "updates timestamp",
event: &Event{},
update: common.MapStr{
timestampFieldKey: ts,
},
overwrite: true,
expected: &Event{
Timestamp: ts,
},
},
{
name: "does not overwrite timestamp",
event: &Event{
Timestamp: ts,
},
update: common.MapStr{
timestampFieldKey: time.Now().Add(time.Hour),
},
overwrite: false,
expected: &Event{
Timestamp: ts,
},
},
{
name: "initializes metadata if nil",
event: &Event{},
update: common.MapStr{
metadataFieldKey: common.MapStr{
"first": "new",
"second": 42,
},
},
expected: &Event{
Meta: common.MapStr{
"first": "new",
"second": 42,
},
},
},
{
name: "updates metadata but does not overwrite",
event: &Event{
Meta: common.MapStr{
"first": "initial",
},
},
update: common.MapStr{
metadataFieldKey: common.MapStr{
"first": "new",
"second": 42,
},
},
overwrite: false,
expected: &Event{
Meta: common.MapStr{
"first": "initial",
"second": 42,
},
},
},
{
name: "updates metadata and overwrites",
event: &Event{
Meta: common.MapStr{
"first": "initial",
},
},
update: common.MapStr{
metadataFieldKey: common.MapStr{
"first": "new",
"second": 42,
},
},
overwrite: true,
expected: &Event{
Meta: common.MapStr{
"first": "new",
"second": 42,
},
},
},
{
name: "updates fields but does not overwrite",
event: &Event{
Fields: common.MapStr{
"first": "initial",
},
},
update: common.MapStr{
"first": "new",
"second": 42,
},
overwrite: false,
expected: &Event{
Fields: common.MapStr{
"first": "initial",
"second": 42,
},
},
},
{
name: "updates metadata and overwrites",
event: &Event{
Fields: common.MapStr{
"first": "initial",
},
},
update: common.MapStr{
"first": "new",
"second": 42,
},
overwrite: true,
expected: &Event{
Fields: common.MapStr{
"first": "new",
"second": 42,
},
},
},
{
name: "initializes fields if nil",
event: &Event{},
update: common.MapStr{
"first": "new",
"second": 42,
},
expected: &Event{
Fields: common.MapStr{
"first": "new",
"second": 42,
},
},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
tc.event.deepUpdate(tc.update, tc.overwrite)
assert.Equal(t, tc.expected.Timestamp, tc.event.Timestamp)
assert.Equal(t, tc.expected.Fields, tc.event.Fields)
assert.Equal(t, tc.expected.Meta, tc.event.Meta)
})
}
}

func TestEventMetadata(t *testing.T) {
const id = "123"
newMeta := func() common.MapStr { return common.MapStr{"_id": id} }
Expand Down
14 changes: 8 additions & 6 deletions libbeat/processors/actions/add_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,19 @@ func NewAddFields(fields common.MapStr, shared bool, overwrite bool) processors.
}

func (af *addFields) Run(event *beat.Event) (*beat.Event, error) {
if event == nil || len(af.fields) == 0 {
return event, nil
}

fields := af.fields
if af.shared || event.Fields == nil {
if af.shared {
fields = fields.Clone()
}

if event.Fields == nil {
event.Fields = fields
} else if af.overwrite {
event.Fields.DeepUpdate(fields)
if af.overwrite {
event.DeepUpdate(fields)
} else {
event.Fields.DeepUpdateNoOverwrite(fields)
event.DeepUpdateNoOverwrite(fields)
}

return event, nil
Expand Down
Loading

0 comments on commit 522c862

Please sign in to comment.