diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a4f34cb48ba..51e6623bb60 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -145,6 +145,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix `fingerprint` processor to give it access to the `@timestamp` field. {issue}28683[28683] - Fix the wrong beat name on monitoring and state endpoint {issue}27755[27755] - Skip configuration checks in autodiscover for configurations that are already running {pull}29048[29048] +- Fix `decode_json_processor` to always respect `add_error_key` {pull}29107[29107] *Auditbeat* diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index 611b5b8d8e3..be852a47133 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -122,6 +122,11 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) { if err != nil { f.logger.Debugf("Error trying to unmarshal %s", text) errs = append(errs, err.Error()) + event.SetErrorWithOption(common.MapStr{ + "message": "parsing input as JSON: " + err.Error(), + "data": text, + "field": field, + }, f.addErrorKey) continue } diff --git a/libbeat/processors/actions/decode_json_fields_test.go b/libbeat/processors/actions/decode_json_fields_test.go index 73d4f3a5fcc..f58efd29b46 100644 --- a/libbeat/processors/actions/decode_json_fields_test.go +++ b/libbeat/processors/actions/decode_json_fields_test.go @@ -491,6 +491,27 @@ func TestOverwriteMetadata(t *testing.T) { assert.Equal(t, expected, actual) } +func TestAddErrorToEventOnUnmarshalError(t *testing.T) { + testConfig := common.MustNewConfigFrom(map[string]interface{}{ + "fields": "message", + "add_error_key": true, + }) + + input := common.MapStr{ + "message": "Broken JSON [[", + } + + actual := getActualValue(t, testConfig, input) + + errObj, ok := actual["error"].(common.MapStr) + require.True(t, ok, "'error' field not present or of invalid type") + require.NotNil(t, actual["error"]) + + assert.Equal(t, "message", errObj["field"]) + assert.NotNil(t, errObj["data"]) + assert.NotNil(t, errObj["message"]) +} + func getActualValue(t *testing.T, config *common.Config, input common.MapStr) common.MapStr { log := logp.NewLogger("decode_json_fields_test")