Skip to content

Commit

Permalink
Expand fields in decode_json_fields if target is set (#32010)
Browse files Browse the repository at this point in the history
## What does this PR do?

This PR applies the settings in `expand_keys` to the event even if the target is not empty. 

## Why is it important?

Previously, if target was set to anything besides `""`, `expand_keys` did not work. So you could not expand the keys of the parsed JSON if you wanted to put it under a custom field.
  • Loading branch information
kvch authored Jun 21, 2022
1 parent d1498f4 commit 7b99a8e
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]

- Allow loading secrets that contain commas from the keystore {pull}31694{pull}.
- Fix Windows service timeouts when the "TCP/IP NetBIOS Helper" service is disabled. {issue}31810[31810] {pull}31835[31835]
- Expand fields in `decode_json_fields` if target is set. {issue}31712[31712] {pull}32010[32010]

*Auditbeat*

Expand Down
24 changes: 18 additions & 6 deletions libbeat/common/jsontransform/expand.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,23 @@ import (
"fmt"
"strings"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)

// ExpandFields de-dots the keys in m by expanding them in-place into a
// nested object structure, merging objects as necessary. If there are any
// conflicts (i.e. a common prefix where one field is an object and another
// is a non-object), an error key is added to the event if add_error_key
// is enabled.
func ExpandFields(logger *logp.Logger, event *beat.Event, m mapstr.M, addErrorKey bool) {
if err := expandFields(m); err != nil {
logger.Errorf("JSON: failed to expand fields: %s", err)
event.SetErrorWithOption(createJSONError(err.Error()), addErrorKey)
}
}

// expandFields de-dots the keys in m by expanding them in-place into a
// nested object structure, merging objects as necessary. If there are any
// conflicts (i.e. a common prefix where one field is an object and another
Expand All @@ -38,7 +50,7 @@ func expandFields(m mapstr.M) error {
newMap, newIsMap := getMap(v)
if newIsMap {
if err := expandFields(newMap); err != nil {
return errors.Wrapf(err, "error expanding %q", k)
return fmt.Errorf("error expanding %q: %w", k, err)
}
}
if dot := strings.IndexRune(k, '.'); dot < 0 {
Expand All @@ -55,7 +67,7 @@ func expandFields(m mapstr.M) error {
old, err := m.Put(k, v)
if err != nil {
// Put will return an error if we attempt to insert into a non-object value.
return fmt.Errorf("cannot expand %q: found conflicting key", k)
return fmt.Errorf("cannot expand %q: found conflicting key: %w", k, err)
}
if old == nil {
continue
Expand All @@ -68,7 +80,7 @@ func expandFields(m mapstr.M) error {
return fmt.Errorf("cannot expand %q: found conflicting key", k)
}
if err := mergeObjects(newMap, oldMap); err != nil {
return errors.Wrapf(err, "cannot expand %q", k)
return fmt.Errorf("cannot expand %q: %w", k, err)
}
}
}
Expand Down Expand Up @@ -97,7 +109,7 @@ func mergeObjects(lhs, rhs mapstr.M) error {
return fmt.Errorf("cannot merge %q: found (%T) value", k, rhsValue)
}
if err := mergeObjects(lhsMap, rhsMap); err != nil {
return errors.Wrapf(err, "cannot merge %q", k)
return fmt.Errorf("cannot merge %q: %w", k, err)
}
}
return nil
Expand Down
23 changes: 15 additions & 8 deletions libbeat/processors/actions/decode_json_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ package actions

import (
"encoding/json"
"errors"
"fmt"
"io"
"strings"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/common/jsontransform"
Expand Down Expand Up @@ -84,7 +83,7 @@ func NewDecodeJSONFields(c *cfg.C) (processors.Processor, error) {
err := c.Unpack(&config)
if err != nil {
logger.Warn("Error unpacking config for decode_json_fields")
return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %s", err)
return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %w", err)
}

f := &decodeJSONFields{
Expand Down Expand Up @@ -142,13 +141,21 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) {
if tmp, err := mapstr.M(dict).GetValue(key); err == nil {
if v, ok := tmp.(string); ok {
id = v
mapstr.M(dict).Delete(key)
_ = mapstr.M(dict).Delete(key)
}
}
}
}

if target != "" {
if f.expandKeys {
switch t := output.(type) {
case map[string]interface{}:
jsontransform.ExpandFields(f.logger, event, t, f.addErrorKey)
default:
errs = append(errs, "failed to expand keys")
}
}
_, err = event.PutValue(target, output)
} else {
switch t := output.(type) {
Expand Down Expand Up @@ -200,14 +207,14 @@ func unmarshal(maxDepth int, text string, fields *interface{}, processArray bool
var tmp interface{}
err := unmarshal(maxDepth, str, &tmp, processArray)
if err != nil {
return v, err == errProcessingSkipped
return v, errors.Is(err, errProcessingSkipped)
}

return tmp, true
}

// try to deep unmarshal fields
switch O := interface{}(*fields).(type) {
switch O := (*fields).(type) {
case map[string]interface{}:
for k, v := range O {
if decoded, ok := tryUnmarshal(v); ok {
Expand Down Expand Up @@ -242,11 +249,11 @@ func decodeJSON(text string, to *interface{}) error {
return errors.New("multiple json elements found")
}

if _, err := dec.Token(); err != nil && err != io.EOF {
if _, err := dec.Token(); err != nil && !errors.Is(err, io.EOF) {
return err
}

switch O := interface{}(*to).(type) {
switch O := (*to).(type) {
case map[string]interface{}:
jsontransform.TransformNumbers(O)
}
Expand Down
64 changes: 45 additions & 19 deletions libbeat/processors/actions/decode_json_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,34 +485,60 @@ func TestExpandKeys(t *testing.T) {
assert.Equal(t, expected, actual)
}

func TestExpandKeysError(t *testing.T) {
func TestExpandKeysWithTarget(t *testing.T) {
testConfig := conf.MustNewConfigFrom(map[string]interface{}{
"fields": fields,
"expand_keys": true,
"add_error_key": true,
"target": "",
"fields": fields,
"expand_keys": true,
"target": "my_target",
})
input := mapstr.M{"msg": `{"a.b": "c", "a.b.c": "d"}`}
input := mapstr.M{"msg": `{"a.b": {"c": "c"}, "a.b.d": "d"}`}
expected := mapstr.M{
"msg": `{"a.b": "c", "a.b.c": "d"}`,
"error": mapstr.M{
"message": "cannot expand ...",
"type": "json",
"msg": `{"a.b": {"c": "c"}, "a.b.d": "d"}`,
"my_target": map[string]interface{}{
"a": mapstr.M{
"b": map[string]interface{}{
"c": "c",
"d": "d",
},
},
},
}

actual := getActualValue(t, testConfig, input)
assert.Contains(t, actual, "error")
errorField := actual["error"].(mapstr.M)
assert.Contains(t, errorField, "message")

// The order in which keys are processed is not defined, so the error
// message is not defined. Apart from that, the outcome is the same.
assert.Regexp(t, `cannot expand ".*": .*`, errorField["message"])
errorField["message"] = "cannot expand ..."
assert.Equal(t, expected, actual)
}

func TestExpandKeysError(t *testing.T) {
for _, target := range []string{"", "my_target"} {
t.Run(fmt.Sprintf("target set to '%s'", target), func(t *testing.T) {
testConfig := conf.MustNewConfigFrom(map[string]interface{}{
"fields": fields,
"expand_keys": true,
"add_error_key": true,
"target": "",
})
input := mapstr.M{"msg": `{"a.b": "c", "a.b.c": "d"}`}
expected := mapstr.M{
"msg": `{"a.b": "c", "a.b.c": "d"}`,
"error": mapstr.M{
"message": "cannot expand ...",
"type": "json",
},
}

actual := getActualValue(t, testConfig, input)
assert.Contains(t, actual, "error")
errorField := actual["error"].(mapstr.M)
assert.Contains(t, errorField, "message")

// The order in which keys are processed is not defined, so the error
// message is not defined. Apart from that, the outcome is the same.
assert.Regexp(t, `cannot expand ".*": .*`, errorField["message"])
errorField["message"] = "cannot expand ..."
assert.Equal(t, expected, actual)
})
}
}

func TestOverwriteMetadata(t *testing.T) {
testConfig := conf.MustNewConfigFrom(map[string]interface{}{
"fields": fields,
Expand Down

0 comments on commit 7b99a8e

Please sign in to comment.