diff --git a/libbeat/beat/event.go b/libbeat/beat/event.go index 28549001c56a..94aa16bb63f4 100644 --- a/libbeat/beat/event.go +++ b/libbeat/beat/event.go @@ -228,9 +228,18 @@ func metadataKey(key string) (string, bool) { return "", false } -// SetErrorWithOption sets jsonErr value in the event fields according to addErrKey value. -func (e *Event) SetErrorWithOption(jsonErr mapstr.M, addErrKey bool) { +// SetErrorWithOption sets the event error field with the message when the addErrKey is set to true. +// If you want to include the data and field you can pass them as parameters and will be appended into the +// error as fields with the corresponding name. +func (e *Event) SetErrorWithOption(message string, addErrKey bool, data string, field string) { if addErrKey { - e.Fields["error"] = jsonErr + errorField := mapstr.M{"message": message, "type": "json"} + if data != "" { + errorField["data"] = data + } + if field != "" { + errorField["field"] = field + } + e.Fields["error"] = errorField } } diff --git a/libbeat/common/jsontransform/expand.go b/libbeat/common/jsontransform/expand.go index c026343a3b04..be07c4200740 100644 --- a/libbeat/common/jsontransform/expand.go +++ b/libbeat/common/jsontransform/expand.go @@ -34,7 +34,7 @@ import ( func ExpandFields(logger *logp.Logger, event *beat.Event, m mapstr.M, addErrorKey bool) { if err := expandFields(m); err != nil { logger.Errorf("JSON: failed to expand fields: %s", err) - event.SetErrorWithOption(createJSONError(err.Error()), addErrorKey) + event.SetErrorWithOption(err.Error(), addErrorKey, "", "") } } diff --git a/libbeat/common/jsontransform/jsonhelper.go b/libbeat/common/jsontransform/jsonhelper.go index 87c3c1a1eb27..15d70ae929ac 100644 --- a/libbeat/common/jsontransform/jsonhelper.go +++ b/libbeat/common/jsontransform/jsonhelper.go @@ -23,7 +23,6 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -39,11 +38,9 @@ var ( // WriteJSONKeys writes the json keys to the given event based on the overwriteKeys option and the addErrKey func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, expandKeys, overwriteKeys, addErrKey bool) { - logger := logp.NewLogger("jsonhelper") if expandKeys { if err := expandFields(keys); err != nil { - logger.Errorf("JSON: failed to expand fields: %s", err) - event.SetErrorWithOption(createJSONError(err.Error()), addErrKey) + event.SetErrorWithOption(err.Error(), addErrKey, "", "") return } } @@ -62,16 +59,14 @@ func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, expandKeys, o case "@timestamp": vstr, ok := v.(string) if !ok { - logger.Error("JSON: Won't overwrite @timestamp because value is not string") - event.SetErrorWithOption(createJSONError("@timestamp not overwritten (not string)"), addErrKey) + event.SetErrorWithOption("@timestamp not overwritten (not string)", addErrKey, "", "") continue } // @timestamp must be of format RFC3339 or ISO8601 ts, err := parseTimestamp(vstr) if err != nil { - logger.Errorf("JSON: Won't overwrite @timestamp because of parsing error: %v", err) - event.SetErrorWithOption(createJSONError(fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr)), addErrKey) + event.SetErrorWithOption(fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr), addErrKey, "", "") continue } event.Timestamp = ts @@ -93,19 +88,17 @@ func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, expandKeys, o event.Meta.DeepUpdate(mapstr.M(m)) default: - event.SetErrorWithOption(createJSONError("failed to update @metadata"), addErrKey) + event.SetErrorWithOption("failed to update @metadata", addErrKey, "", "") } case "type": vstr, ok := v.(string) if !ok { - logger.Error("JSON: Won't overwrite type because value is not string") - event.SetErrorWithOption(createJSONError("type not overwritten (not string)"), addErrKey) + event.SetErrorWithOption("type not overwritten (not string)", addErrKey, "", "") continue } if len(vstr) == 0 || vstr[0] == '_' { - logger.Error("JSON: Won't overwrite type because value is empty or starts with an underscore") - event.SetErrorWithOption(createJSONError(fmt.Sprintf("type not overwritten (invalid value [%s])", vstr)), addErrKey) + event.SetErrorWithOption(fmt.Sprintf("type not overwritten (invalid value [%s])", vstr), addErrKey, "", "") continue } event.Fields[k] = vstr @@ -118,10 +111,6 @@ func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, expandKeys, o event.Fields.DeepUpdate(keys) } -func createJSONError(message string) mapstr.M { - return mapstr.M{"message": message, "type": "json"} -} - func removeKeys(keys map[string]interface{}, names ...string) { for _, name := range names { delete(keys, name) diff --git a/libbeat/common/jsontransform/jsonhelper_test.go b/libbeat/common/jsontransform/jsonhelper_test.go index bed1de4c0fd9..c8791b3c6107 100644 --- a/libbeat/common/jsontransform/jsonhelper_test.go +++ b/libbeat/common/jsontransform/jsonhelper_test.go @@ -53,6 +53,7 @@ func TestWriteJSONKeys(t *testing.T) { expectedMetadata mapstr.M expectedTimestamp time.Time expectedFields mapstr.M + addErrorKeys bool }{ "overwrite_true": { overwriteKeys: true, @@ -192,6 +193,32 @@ func TestWriteJSONKeys(t *testing.T) { }, }, }, + // This benchmark makes sure that when an error is found in the event, the proper fields are defined and measured + "error_case": { + expandKeys: false, + overwriteKeys: true, + keys: map[string]interface{}{ + "top_b": map[string]interface{}{ + "inner_d.inner_e": "COMPLETELY_NEW_e", + }, + "@timestamp": map[string]interface{}{"when": "now", "another": "yesterday"}, + }, + expectedMetadata: eventMetadata.Clone(), + expectedTimestamp: eventTimestamp, + expectedFields: mapstr.M{ + "error": mapstr.M{ + "message": "@timestamp not overwritten (not string)", + "type": "json", + }, + "top_a": 23, + "top_b": mapstr.M{ + "inner_c": "see", + "inner_d": "dee", + "inner_d.inner_e": "COMPLETELY_NEW_e", + }, + }, + addErrorKeys: true, + }, } for name, test := range tests { @@ -202,10 +229,207 @@ func TestWriteJSONKeys(t *testing.T) { Fields: eventFields.Clone(), } - WriteJSONKeys(event, test.keys, test.expandKeys, test.overwriteKeys, false) + WriteJSONKeys(event, test.keys, test.expandKeys, test.overwriteKeys, test.addErrorKeys) require.Equal(t, test.expectedMetadata, event.Meta) require.Equal(t, test.expectedTimestamp.UnixNano(), event.Timestamp.UnixNano()) require.Equal(t, test.expectedFields, event.Fields) }) } } + +func BenchmarkWriteJSONKeys(b *testing.B) { + now := time.Now() + now = now.Round(time.Second) + + eventTimestamp := time.Date(2020, 01, 01, 01, 01, 00, 0, time.UTC) + eventMetadata := mapstr.M{ + "foo": "bar", + "baz": mapstr.M{ + "qux": 17, + }, + } + eventFields := mapstr.M{ + "top_a": 23, + "top_b": mapstr.M{ + "inner_c": "see", + "inner_d": "dee", + }, + } + + tests := map[string]struct { + keys map[string]interface{} + expandKeys bool + overwriteKeys bool + expectedFields mapstr.M + addErrorKeys bool + }{ + "overwrite_true": { + overwriteKeys: true, + keys: map[string]interface{}{ + "@metadata": map[string]interface{}{ + "foo": "NEW_bar", + "baz": map[string]interface{}{ + "qux": "NEW_qux", + "durrr": "COMPLETELY_NEW", + }, + }, + "@timestamp": now.Format(time.RFC3339), + "top_b": map[string]interface{}{ + "inner_d": "NEW_dee", + "inner_e": "COMPLETELY_NEW_e", + }, + "top_c": "COMPLETELY_NEW_c", + }, + expectedFields: mapstr.M{ + "top_a": 23, + "top_b": mapstr.M{ + "inner_c": "see", + "inner_d": "NEW_dee", + "inner_e": "COMPLETELY_NEW_e", + }, + "top_c": "COMPLETELY_NEW_c", + }, + }, + "overwrite_true_ISO8601": { + overwriteKeys: true, + keys: map[string]interface{}{ + "@metadata": map[string]interface{}{ + "foo": "NEW_bar", + "baz": map[string]interface{}{ + "qux": "NEW_qux", + "durrr": "COMPLETELY_NEW", + }, + }, + "@timestamp": now.Format(iso8601), + "top_b": map[string]interface{}{ + "inner_d": "NEW_dee", + "inner_e": "COMPLETELY_NEW_e", + }, + "top_c": "COMPLETELY_NEW_c", + }, + expectedFields: mapstr.M{ + "top_a": 23, + "top_b": mapstr.M{ + "inner_c": "see", + "inner_d": "NEW_dee", + "inner_e": "COMPLETELY_NEW_e", + }, + "top_c": "COMPLETELY_NEW_c", + }, + }, + "overwrite_false": { + overwriteKeys: false, + keys: map[string]interface{}{ + "@metadata": map[string]interface{}{ + "foo": "NEW_bar", + "baz": map[string]interface{}{ + "qux": "NEW_qux", + "durrr": "COMPLETELY_NEW", + }, + }, + "@timestamp": now.Format(time.RFC3339), + "top_b": map[string]interface{}{ + "inner_d": "NEW_dee", + "inner_e": "COMPLETELY_NEW_e", + }, + "top_c": "COMPLETELY_NEW_c", + }, + expectedFields: mapstr.M{ + "top_a": 23, + "top_b": mapstr.M{ + "inner_c": "see", + "inner_d": "dee", + "inner_e": "COMPLETELY_NEW_e", + }, + "top_c": "COMPLETELY_NEW_c", + }, + }, + "expand_true": { + expandKeys: true, + overwriteKeys: true, + keys: map[string]interface{}{ + "top_b": map[string]interface{}{ + "inner_d.inner_e": "COMPLETELY_NEW_e", + }, + }, + expectedFields: mapstr.M{ + "top_a": 23, + "top_b": mapstr.M{ + "inner_c": "see", + "inner_d": mapstr.M{ + "inner_e": "COMPLETELY_NEW_e", + }, + }, + }, + }, + "expand_false": { + expandKeys: false, + overwriteKeys: true, + keys: map[string]interface{}{ + "top_b": map[string]interface{}{ + "inner_d.inner_e": "COMPLETELY_NEW_e", + }, + }, + expectedFields: mapstr.M{ + "top_a": 23, + "top_b": mapstr.M{ + "inner_c": "see", + "inner_d": "dee", + "inner_d.inner_e": "COMPLETELY_NEW_e", + }, + }, + }, + // This benchmark makes sure that when an error is found in the event, the proper fields are defined and measured + "error_case": { + expandKeys: false, + overwriteKeys: true, + keys: map[string]interface{}{ + "top_b": map[string]interface{}{ + "inner_d.inner_e": "COMPLETELY_NEW_e", + }, + "@timestamp": "invalid string", + }, + expectedFields: mapstr.M{ + "error": mapstr.M{ + "message": "@timestamp not overwritten (parse error on invalid string)", + "type": "json", + }, + "top_a": 23, + "top_b": mapstr.M{ + "inner_c": "see", + "inner_d": "dee", + "inner_d.inner_e": "COMPLETELY_NEW_e", + }, + }, + addErrorKeys: true, + }, + } + + for name, test := range tests { + b.Run(name, func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + event := &beat.Event{ + Timestamp: eventTimestamp, + Meta: eventMetadata.Clone(), + Fields: eventFields.Clone(), + } + // The WriteJSONKeys will override the keys, so we need to clone it. + keys := clone(test.keys) + b.StartTimer() + WriteJSONKeys(event, keys, test.expandKeys, test.overwriteKeys, test.addErrorKeys) + require.Equal(b, test.expectedFields, event.Fields) + } + }) + } +} + +func clone(a map[string]interface{}) map[string]interface{} { + newMap := make(map[string]interface{}) + for k, v := range a { + newMap[k] = v + } + return newMap +} diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index 8dcc39e0dbcc..b47ebd646d9c 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -122,11 +122,7 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) { if err != nil { f.logger.Debugf("Error trying to unmarshal %s", text) errs = append(errs, err.Error()) - event.SetErrorWithOption(mapstr.M{ - "message": "parsing input as JSON: " + err.Error(), - "data": text, - "field": field, - }, f.addErrorKey) + event.SetErrorWithOption(fmt.Sprintf("parsing input as JSON: %s", err.Error()), f.addErrorKey, text, field) continue }