Skip to content

Commit

Permalink
[fluentforwardreceiver]: Remove usage of insert, avoid unnecessary co…
Browse files Browse the repository at this point in the history
…pies (#13913)

Signed-off-by: Bogdan Drutu <[email protected]>

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Sep 6, 2022
1 parent cca3c03 commit 234d85e
Showing 1 changed file with 20 additions and 28 deletions.
48 changes: 20 additions & 28 deletions receiver/fluentforwardreceiver/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,55 +84,50 @@ func (em EventMode) String() string {

// parseInterfaceToMap takes map of interface objects and returns
// AttributeValueMap
func parseInterfaceToMap(msi map[string]interface{}) pcommon.Value {
rv := pcommon.NewValueMap()
am := rv.MapVal()
func parseInterfaceToMap(msi map[string]interface{}, dest pcommon.Value) {
am := dest.SetEmptyMapVal()
am.EnsureCapacity(len(msi))
for k, value := range msi {
am.Insert(k, parseToAttributeValue(value))
parseToAttributeValue(value, am.UpsertEmpty(k))
}
return rv
}

// parseInterfaceToArray takes array of interface objects and returns
// AttributeValueArray
func parseInterfaceToArray(ai []interface{}) pcommon.Value {
iv := pcommon.NewValueSlice()
av := iv.SliceVal()
func parseInterfaceToArray(ai []interface{}, dest pcommon.Value) {
av := dest.SetEmptySliceVal()
av.EnsureCapacity(len(ai))
for _, value := range ai {
parseToAttributeValue(value).CopyTo(av.AppendEmpty())
parseToAttributeValue(value, av.AppendEmpty())
}
return iv
}

// parseToAttributeValue converts interface object to AttributeValue
func parseToAttributeValue(val interface{}) pcommon.Value {
func parseToAttributeValue(val interface{}, dest pcommon.Value) {
// See https://github.com/tinylib/msgp/wiki/Type-Mapping-Rules
switch r := val.(type) {
case bool:
return pcommon.NewValueBool(r)
dest.SetBoolVal(r)
case string:
return pcommon.NewValueString(r)
dest.SetStringVal(r)
case uint64:
return pcommon.NewValueInt(int64(r))
dest.SetIntVal(int64(r))
case int64:
return pcommon.NewValueInt(r)
dest.SetIntVal(r)
// Sometimes strings come in as bytes array
case []byte:
return pcommon.NewValueString(string(r))
dest.SetStringVal(string(r))
case map[string]interface{}:
return parseInterfaceToMap(r)
parseInterfaceToMap(r, dest)
case []interface{}:
return parseInterfaceToArray(r)
parseInterfaceToArray(r, dest)
case float32:
return pcommon.NewValueDouble(float64(r))
dest.SetDoubleVal(float64(r))
case float64:
return pcommon.NewValueDouble(r)
dest.SetDoubleVal(r)
case nil:
return pcommon.NewValueEmpty()
default:
return pcommon.NewValueString(fmt.Sprintf("%v", val))
dest.SetStringVal(fmt.Sprintf("%v", val))
}
}

Expand Down Expand Up @@ -187,13 +182,11 @@ func parseRecordToLogRecord(dc *msgp.Reader, lr plog.LogRecord) error {
return msgp.WrapError(err, "Record", key)
}

av := parseToAttributeValue(val)

// fluentd uses message, fluentbit log.
if key == "message" || key == "log" {
av.CopyTo(lr.Body())
parseToAttributeValue(val, lr.Body())
} else {
attrs.Insert(key, av)
parseToAttributeValue(val, attrs.UpsertEmpty(key))
}
}

Expand Down Expand Up @@ -443,7 +436,6 @@ func (pfe *PackedForwardEventLogRecords) parseEntries(entriesRaw []byte, isGzipp

lr.Attributes().UpsertString(tagAttributeKey, tag)

tgt := pfe.LogRecordSlice.AppendEmpty()
lr.CopyTo(tgt)
lr.MoveTo(pfe.LogRecordSlice.AppendEmpty())
}
}

0 comments on commit 234d85e

Please sign in to comment.