Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

decode_json_field: process objects and arrays only #11312

Merged
merged 12 commits into from
Mar 26, 2019
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Include ip and boolean type when generating index pattern. {pull}10995[10995]
- Cancelling enrollment of a beat will not enroll the beat. {issue}10150[10150]
- Add missing fields and test cases for libbeat add_kubernetes_metadata processor. {issue}11133[11133], {pull}11134[11134]
- decode_json_field: process objects and arrays only {pull}11312[11312]
- decode_json_field: do not process arrays when flag not set. {pull}11318[11318]
- Report faulting file when config reload fails. {pull}[11304]11304
- Remove IP fields from default_field in Elasticsearch template.
Expand Down
14 changes: 14 additions & 0 deletions libbeat/processors/actions/decode_json_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package actions
import (
"encoding/json"
"fmt"
"io"
"strings"

"github.com/pkg/errors"
Expand Down Expand Up @@ -146,6 +147,8 @@ func unmarshal(maxDepth int, text string, fields *interface{}, processArray bool
str, isString := v.(string)
if !isString {
return v, false
} else if !isStructured(str) {
return str, false
}

var tmp interface{}
Expand Down Expand Up @@ -193,6 +196,10 @@ func decodeJSON(text string, to *interface{}) error {
return errors.New("multiple json elements found")
}

if _, err := dec.Token(); err != nil && err != io.EOF {
return err
}

switch O := interface{}(*to).(type) {
case map[string]interface{}:
jsontransform.TransformNumbers(O)
Expand All @@ -203,3 +210,10 @@ func decodeJSON(text string, to *interface{}) error {
func (f decodeJSONFields) String() string {
return "decode_json_fields=" + strings.Join(f.fields, ", ")
}

func isStructured(s string) bool {
s = strings.TrimSpace(s)
end := len(s) - 1
return end > 0 && ((s[0] == '[' && s[end] == ']') ||
(s[0] == '{' && s[end] == '}'))
}
87 changes: 87 additions & 0 deletions libbeat/processors/actions/decode_json_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package actions

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -199,6 +200,67 @@ func TestTargetRootOption(t *testing.T) {
assert.Equal(t, expected.String(), actual.String())
}

func TestNotJsonObjectOrArray(t *testing.T) {
var cases = []struct {
MaxDepth int
Expected common.MapStr
}{
{
MaxDepth: 1,
Expected: common.MapStr{
"msg": common.MapStr{
"someDate": "2016-09-28T01:40:26.760+0000",
"someNumber": 1475026826760,
"someNumberAsString": "1475026826760",
"someString": "foobar",
"someString2": "2017 is awesome",
"someMap": "{\"a\":\"b\"}",
"someArray": "[1,2,3]",
},
},
},
{
MaxDepth: 10,
Expected: common.MapStr{
"msg": common.MapStr{
"someDate": "2016-09-28T01:40:26.760+0000",
"someNumber": 1475026826760,
"someNumberAsString": "1475026826760",
"someString": "foobar",
"someString2": "2017 is awesome",
"someMap": common.MapStr{"a": "b"},
"someArray": []int{1, 2, 3},
},
},
},
}

for _, testCase := range cases {
t.Run(fmt.Sprintf("TestNotJsonObjectOrArrayDepth-%v", testCase.MaxDepth), func(t *testing.T) {
input := common.MapStr{
"msg": `{
"someDate": "2016-09-28T01:40:26.760+0000",
"someNumberAsString": "1475026826760",
"someNumber": 1475026826760,
"someString": "foobar",
"someString2": "2017 is awesome",
"someMap": "{\"a\":\"b\"}",
"someArray": "[1,2,3]"
}`,
}

testConfig, _ = common.NewConfigFrom(map[string]interface{}{
"fields": fields,
"process_array": true,
"max_depth": testCase.MaxDepth,
})

actual := getActualValue(t, testConfig, input)
assert.Equal(t, testCase.Expected.String(), actual.String())
})
}
}

func TestArrayWithArraysDisabled(t *testing.T) {
input := common.MapStr{
"msg": `{
Expand All @@ -222,6 +284,7 @@ func TestArrayWithArraysDisabled(t *testing.T) {

assert.Equal(t, expected.String(), actual.String())
}

func TestArrayWithArraysEnabled(t *testing.T) {
input := common.MapStr{
"msg": `{
Expand All @@ -246,6 +309,30 @@ func TestArrayWithArraysEnabled(t *testing.T) {
assert.Equal(t, expected.String(), actual.String())
}

func TestArrayWithInvalidArray(t *testing.T) {
input := common.MapStr{
"msg": `{
"arrayOfMap": "[]]"
}`,
}

testConfig, _ = common.NewConfigFrom(map[string]interface{}{
"fields": fields,
"max_depth": 10,
"process_array": true,
})

actual := getActualValue(t, testConfig, input)

expected := common.MapStr{
"msg": common.MapStr{
"arrayOfMap": "[]]",
},
}

assert.Equal(t, expected.String(), actual.String())
}

func getActualValue(t *testing.T, config *common.Config, input common.MapStr) common.MapStr {
logp.TestingSetup()

Expand Down