Skip to content

Commit

Permalink
[7.x] Deep merge event fields and metadata maps (#17958) (#18230)
Browse files Browse the repository at this point in the history
* Deep merge event fields and metadata maps (#17958)

* Deep merge event fields and metadata maps

* Add CHANGELOG entry

* Use loop to remove keys; extract into function

* Relocating CHANGELOG entry

* Rewording and moving to bugfix section per review feedback

* Adapting other CHANGELOG entry

* Fix comparison in test

* Fixing up CHANGELOG

* Fixing up CHANGELOG
  • Loading branch information
ycombinator authored May 5, 2020
1 parent 87fe454 commit 6524eaa
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 9 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix goroutine leak and Elasticsearch output file descriptor leak when output reloading is in use. {issue}10491[10491] {pull}17381[17381]
- Fix Elasticsearch license endpoint URL referenced in error message. {issue}17880[17880] {pull}18030[18030]
- Fix panic when assigning a key to a `nil` value in an event. {pull}18143[18143]
- Change `decode_json_fields` processor, to merge parsed json objects with existing objects in the event instead of fully replacing them. {pull}17958[17958]

*Auditbeat*

Expand Down Expand Up @@ -274,6 +275,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Set `agent.name` to the hostname by default. {issue}16377[16377] {pull}18000[18000]
- Add keystore support for autodiscover static configurations. {pull]16306[16306]
- Add config example of how to skip the `add_host_metadata` processor when forwarding logs. {issue}13920[13920] {pull}18153[18153]
- When using the `decode_json_fields` processor, decoded fields are now deep-merged into existing event. {pull}17958[17958]

*Auditbeat*

Expand Down Expand Up @@ -367,6 +369,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Added an input option `publisher_pipeline.disable_host` to disable `host.name`
from being added to events by default. {pull}18159[18159]
- Improve ECS categorization field mappings in system module. {issue}16031[16031] {pull}18065[18065]
- Change the `json.*` input settings implementation to merge parsed json objects with existing objects in the event instead of fully replacing them. {pull}17958[17958]

*Heartbeat*

Expand Down
27 changes: 18 additions & 9 deletions libbeat/common/jsontransform/jsonhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ import (
// WriteJSONKeys writes the json keys to the given event based on the overwriteKeys option and the addErrKey
func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, overwriteKeys bool, addErrKey bool) {
if !overwriteKeys {
for k, v := range keys {
if _, exists := event.Fields[k]; !exists && k != "@timestamp" && k != "@metadata" {
event.Fields[k] = v
}
}
// @timestamp and @metadata fields are root-level fields. We remove them so they
// don't become part of event.Fields.
removeKeys(keys, "@timestamp", "@metadata")

// Then, perform deep update without overwriting
event.Fields.DeepUpdateNoOverwrite(keys)
return
}

Expand Down Expand Up @@ -64,7 +65,7 @@ func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, overwriteKeys
}

case map[string]interface{}:
event.Meta.Update(common.MapStr(m))
event.Meta.DeepUpdate(common.MapStr(m))

default:
event.SetErrorWithOption(createJSONError("failed to update @metadata"), addErrKey)
Expand All @@ -83,13 +84,21 @@ func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, overwriteKeys
continue
}
event.Fields[k] = vstr

default:
event.Fields[k] = v
}
}

// We have accounted for @timestamp, @metadata, type above. So let's remove these keys and
// deep update the event with the rest of the keys.
removeKeys(keys, "@timestamp", "@metadata", "type")
event.Fields.DeepUpdate(keys)
}

func createJSONError(message string) common.MapStr {
return common.MapStr{"message": message, "type": "json"}
}

func removeKeys(keys map[string]interface{}, names ...string) {
for _, name := range names {
delete(keys, name)
}
}
136 changes: 136 additions & 0 deletions libbeat/common/jsontransform/jsonhelper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package jsontransform

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
)

func TestWriteJSONKeys(t *testing.T) {
now := time.Now()
now = now.Round(time.Second)

eventTimestamp := time.Date(2020, 01, 01, 01, 01, 00, 0, time.UTC)
eventMetadata := common.MapStr{
"foo": "bar",
"baz": common.MapStr{
"qux": 17,
},
}
eventFields := common.MapStr{
"top_a": 23,
"top_b": common.MapStr{
"inner_c": "see",
"inner_d": "dee",
},
}

tests := map[string]struct {
keys map[string]interface{}
overwriteKeys bool
expectedMetadata common.MapStr
expectedTimestamp time.Time
expectedFields common.MapStr
}{
"overwrite_true": {
overwriteKeys: true,
keys: map[string]interface{}{
"@metadata": map[string]interface{}{
"foo": "NEW_bar",
"baz": map[string]interface{}{
"qux": "NEW_qux",
"durrr": "COMPLETELY_NEW",
},
},
"@timestamp": now.Format(time.RFC3339),
"top_b": map[string]interface{}{
"inner_d": "NEW_dee",
"inner_e": "COMPLETELY_NEW_e",
},
"top_c": "COMPLETELY_NEW_c",
},
expectedMetadata: common.MapStr{
"foo": "NEW_bar",
"baz": common.MapStr{
"qux": "NEW_qux",
"durrr": "COMPLETELY_NEW",
},
},
expectedTimestamp: now,
expectedFields: common.MapStr{
"top_a": 23,
"top_b": common.MapStr{
"inner_c": "see",
"inner_d": "NEW_dee",
"inner_e": "COMPLETELY_NEW_e",
},
"top_c": "COMPLETELY_NEW_c",
},
},
"overwrite_false": {
overwriteKeys: false,
keys: map[string]interface{}{
"@metadata": map[string]interface{}{
"foo": "NEW_bar",
"baz": map[string]interface{}{
"qux": "NEW_qux",
"durrr": "COMPLETELY_NEW",
},
},
"@timestamp": now.Format(time.RFC3339),
"top_b": map[string]interface{}{
"inner_d": "NEW_dee",
"inner_e": "COMPLETELY_NEW_e",
},
"top_c": "COMPLETELY_NEW_c",
},
expectedMetadata: eventMetadata.Clone(),
expectedTimestamp: eventTimestamp,
expectedFields: common.MapStr{
"top_a": 23,
"top_b": common.MapStr{
"inner_c": "see",
"inner_d": "dee",
"inner_e": "COMPLETELY_NEW_e",
},
"top_c": "COMPLETELY_NEW_c",
},
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
event := &beat.Event{
Timestamp: eventTimestamp,
Meta: eventMetadata.Clone(),
Fields: eventFields.Clone(),
}

WriteJSONKeys(event, test.keys, test.overwriteKeys, false)
require.Equal(t, test.expectedMetadata, event.Meta)
require.Equal(t, test.expectedTimestamp.UnixNano(), event.Timestamp.UnixNano())
require.Equal(t, test.expectedFields, event.Fields)
})
}
}

0 comments on commit 6524eaa

Please sign in to comment.