Skip to content

Commit

Permalink
Remove logger from WriteJSONKeys (elastic#35920)
Browse files Browse the repository at this point in the history
This commit removes the logger from the WriteJSONKeys which is executed as part of the hot path of the Event and was allocating a new logger for each event.
This has an impact on the memory allocation of the function and because it's executed for each event, we see a direct benefit in the overall memory allocations.
  • Loading branch information
alexsapran authored and Scholar-Li committed Feb 5, 2024
1 parent aa76408 commit 952889f
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 27 deletions.
15 changes: 12 additions & 3 deletions libbeat/beat/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,18 @@ func metadataKey(key string) (string, bool) {
return "", false
}

// SetErrorWithOption sets jsonErr value in the event fields according to addErrKey value.
func (e *Event) SetErrorWithOption(jsonErr mapstr.M, addErrKey bool) {
// SetErrorWithOption sets the event error field with the message when the addErrKey is set to true.
// If you want to include the data and field you can pass them as parameters and will be appended into the
// error as fields with the corresponding name.
func (e *Event) SetErrorWithOption(message string, addErrKey bool, data string, field string) {
if addErrKey {
e.Fields["error"] = jsonErr
errorField := mapstr.M{"message": message, "type": "json"}
if data != "" {
errorField["data"] = data
}
if field != "" {
errorField["field"] = field
}
e.Fields["error"] = errorField
}
}
2 changes: 1 addition & 1 deletion libbeat/common/jsontransform/expand.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
func ExpandFields(logger *logp.Logger, event *beat.Event, m mapstr.M, addErrorKey bool) {
if err := expandFields(m); err != nil {
logger.Errorf("JSON: failed to expand fields: %s", err)
event.SetErrorWithOption(createJSONError(err.Error()), addErrorKey)
event.SetErrorWithOption(err.Error(), addErrorKey, "", "")
}
}

Expand Down
23 changes: 6 additions & 17 deletions libbeat/common/jsontransform/jsonhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)

Expand All @@ -39,11 +38,9 @@ var (

// WriteJSONKeys writes the json keys to the given event based on the overwriteKeys option and the addErrKey
func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, expandKeys, overwriteKeys, addErrKey bool) {
logger := logp.NewLogger("jsonhelper")
if expandKeys {
if err := expandFields(keys); err != nil {
logger.Errorf("JSON: failed to expand fields: %s", err)
event.SetErrorWithOption(createJSONError(err.Error()), addErrKey)
event.SetErrorWithOption(err.Error(), addErrKey, "", "")
return
}
}
Expand All @@ -62,16 +59,14 @@ func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, expandKeys, o
case "@timestamp":
vstr, ok := v.(string)
if !ok {
logger.Error("JSON: Won't overwrite @timestamp because value is not string")
event.SetErrorWithOption(createJSONError("@timestamp not overwritten (not string)"), addErrKey)
event.SetErrorWithOption("@timestamp not overwritten (not string)", addErrKey, "", "")
continue
}

// @timestamp must be of format RFC3339 or ISO8601
ts, err := parseTimestamp(vstr)
if err != nil {
logger.Errorf("JSON: Won't overwrite @timestamp because of parsing error: %v", err)
event.SetErrorWithOption(createJSONError(fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr)), addErrKey)
event.SetErrorWithOption(fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr), addErrKey, "", "")
continue
}
event.Timestamp = ts
Expand All @@ -93,19 +88,17 @@ func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, expandKeys, o
event.Meta.DeepUpdate(mapstr.M(m))

default:
event.SetErrorWithOption(createJSONError("failed to update @metadata"), addErrKey)
event.SetErrorWithOption("failed to update @metadata", addErrKey, "", "")
}

case "type":
vstr, ok := v.(string)
if !ok {
logger.Error("JSON: Won't overwrite type because value is not string")
event.SetErrorWithOption(createJSONError("type not overwritten (not string)"), addErrKey)
event.SetErrorWithOption("type not overwritten (not string)", addErrKey, "", "")
continue
}
if len(vstr) == 0 || vstr[0] == '_' {
logger.Error("JSON: Won't overwrite type because value is empty or starts with an underscore")
event.SetErrorWithOption(createJSONError(fmt.Sprintf("type not overwritten (invalid value [%s])", vstr)), addErrKey)
event.SetErrorWithOption(fmt.Sprintf("type not overwritten (invalid value [%s])", vstr), addErrKey, "", "")
continue
}
event.Fields[k] = vstr
Expand All @@ -118,10 +111,6 @@ func WriteJSONKeys(event *beat.Event, keys map[string]interface{}, expandKeys, o
event.Fields.DeepUpdate(keys)
}

func createJSONError(message string) mapstr.M {
return mapstr.M{"message": message, "type": "json"}
}

func removeKeys(keys map[string]interface{}, names ...string) {
for _, name := range names {
delete(keys, name)
Expand Down
226 changes: 225 additions & 1 deletion libbeat/common/jsontransform/jsonhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func TestWriteJSONKeys(t *testing.T) {
expectedMetadata mapstr.M
expectedTimestamp time.Time
expectedFields mapstr.M
addErrorKeys bool
}{
"overwrite_true": {
overwriteKeys: true,
Expand Down Expand Up @@ -192,6 +193,32 @@ func TestWriteJSONKeys(t *testing.T) {
},
},
},
// This benchmark makes sure that when an error is found in the event, the proper fields are defined and measured
"error_case": {
expandKeys: false,
overwriteKeys: true,
keys: map[string]interface{}{
"top_b": map[string]interface{}{
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
"@timestamp": map[string]interface{}{"when": "now", "another": "yesterday"},
},
expectedMetadata: eventMetadata.Clone(),
expectedTimestamp: eventTimestamp,
expectedFields: mapstr.M{
"error": mapstr.M{
"message": "@timestamp not overwritten (not string)",
"type": "json",
},
"top_a": 23,
"top_b": mapstr.M{
"inner_c": "see",
"inner_d": "dee",
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
},
addErrorKeys: true,
},
}

for name, test := range tests {
Expand All @@ -202,10 +229,207 @@ func TestWriteJSONKeys(t *testing.T) {
Fields: eventFields.Clone(),
}

WriteJSONKeys(event, test.keys, test.expandKeys, test.overwriteKeys, false)
WriteJSONKeys(event, test.keys, test.expandKeys, test.overwriteKeys, test.addErrorKeys)
require.Equal(t, test.expectedMetadata, event.Meta)
require.Equal(t, test.expectedTimestamp.UnixNano(), event.Timestamp.UnixNano())
require.Equal(t, test.expectedFields, event.Fields)
})
}
}

func BenchmarkWriteJSONKeys(b *testing.B) {
now := time.Now()
now = now.Round(time.Second)

eventTimestamp := time.Date(2020, 01, 01, 01, 01, 00, 0, time.UTC)
eventMetadata := mapstr.M{
"foo": "bar",
"baz": mapstr.M{
"qux": 17,
},
}
eventFields := mapstr.M{
"top_a": 23,
"top_b": mapstr.M{
"inner_c": "see",
"inner_d": "dee",
},
}

tests := map[string]struct {
keys map[string]interface{}
expandKeys bool
overwriteKeys bool
expectedFields mapstr.M
addErrorKeys bool
}{
"overwrite_true": {
overwriteKeys: true,
keys: map[string]interface{}{
"@metadata": map[string]interface{}{
"foo": "NEW_bar",
"baz": map[string]interface{}{
"qux": "NEW_qux",
"durrr": "COMPLETELY_NEW",
},
},
"@timestamp": now.Format(time.RFC3339),
"top_b": map[string]interface{}{
"inner_d": "NEW_dee",
"inner_e": "COMPLETELY_NEW_e",
},
"top_c": "COMPLETELY_NEW_c",
},
expectedFields: mapstr.M{
"top_a": 23,
"top_b": mapstr.M{
"inner_c": "see",
"inner_d": "NEW_dee",
"inner_e": "COMPLETELY_NEW_e",
},
"top_c": "COMPLETELY_NEW_c",
},
},
"overwrite_true_ISO8601": {
overwriteKeys: true,
keys: map[string]interface{}{
"@metadata": map[string]interface{}{
"foo": "NEW_bar",
"baz": map[string]interface{}{
"qux": "NEW_qux",
"durrr": "COMPLETELY_NEW",
},
},
"@timestamp": now.Format(iso8601),
"top_b": map[string]interface{}{
"inner_d": "NEW_dee",
"inner_e": "COMPLETELY_NEW_e",
},
"top_c": "COMPLETELY_NEW_c",
},
expectedFields: mapstr.M{
"top_a": 23,
"top_b": mapstr.M{
"inner_c": "see",
"inner_d": "NEW_dee",
"inner_e": "COMPLETELY_NEW_e",
},
"top_c": "COMPLETELY_NEW_c",
},
},
"overwrite_false": {
overwriteKeys: false,
keys: map[string]interface{}{
"@metadata": map[string]interface{}{
"foo": "NEW_bar",
"baz": map[string]interface{}{
"qux": "NEW_qux",
"durrr": "COMPLETELY_NEW",
},
},
"@timestamp": now.Format(time.RFC3339),
"top_b": map[string]interface{}{
"inner_d": "NEW_dee",
"inner_e": "COMPLETELY_NEW_e",
},
"top_c": "COMPLETELY_NEW_c",
},
expectedFields: mapstr.M{
"top_a": 23,
"top_b": mapstr.M{
"inner_c": "see",
"inner_d": "dee",
"inner_e": "COMPLETELY_NEW_e",
},
"top_c": "COMPLETELY_NEW_c",
},
},
"expand_true": {
expandKeys: true,
overwriteKeys: true,
keys: map[string]interface{}{
"top_b": map[string]interface{}{
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
},
expectedFields: mapstr.M{
"top_a": 23,
"top_b": mapstr.M{
"inner_c": "see",
"inner_d": mapstr.M{
"inner_e": "COMPLETELY_NEW_e",
},
},
},
},
"expand_false": {
expandKeys: false,
overwriteKeys: true,
keys: map[string]interface{}{
"top_b": map[string]interface{}{
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
},
expectedFields: mapstr.M{
"top_a": 23,
"top_b": mapstr.M{
"inner_c": "see",
"inner_d": "dee",
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
},
},
// This benchmark makes sure that when an error is found in the event, the proper fields are defined and measured
"error_case": {
expandKeys: false,
overwriteKeys: true,
keys: map[string]interface{}{
"top_b": map[string]interface{}{
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
"@timestamp": "invalid string",
},
expectedFields: mapstr.M{
"error": mapstr.M{
"message": "@timestamp not overwritten (parse error on invalid string)",
"type": "json",
},
"top_a": 23,
"top_b": mapstr.M{
"inner_c": "see",
"inner_d": "dee",
"inner_d.inner_e": "COMPLETELY_NEW_e",
},
},
addErrorKeys: true,
},
}

for name, test := range tests {
b.Run(name, func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
event := &beat.Event{
Timestamp: eventTimestamp,
Meta: eventMetadata.Clone(),
Fields: eventFields.Clone(),
}
// The WriteJSONKeys will override the keys, so we need to clone it.
keys := clone(test.keys)
b.StartTimer()
WriteJSONKeys(event, keys, test.expandKeys, test.overwriteKeys, test.addErrorKeys)
require.Equal(b, test.expectedFields, event.Fields)
}
})
}
}

func clone(a map[string]interface{}) map[string]interface{} {
newMap := make(map[string]interface{})
for k, v := range a {
newMap[k] = v
}
return newMap
}
6 changes: 1 addition & 5 deletions libbeat/processors/actions/decode_json_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,7 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) {
if err != nil {
f.logger.Debugf("Error trying to unmarshal %s", text)
errs = append(errs, err.Error())
event.SetErrorWithOption(mapstr.M{
"message": "parsing input as JSON: " + err.Error(),
"data": text,
"field": field,
}, f.addErrorKey)
event.SetErrorWithOption(fmt.Sprintf("parsing input as JSON: %s", err.Error()), f.addErrorKey, text, field)
continue
}

Expand Down

0 comments on commit 952889f

Please sign in to comment.