Skip to content

Commit

Permalink
[exporter/loki] Add instrumentation scope to log object (open-telemet…
Browse files Browse the repository at this point in the history
…ry#17071)

add instrumentation scope to loki exporter
  • Loading branch information
mar4uk authored Dec 16, 2022
1 parent 13e096b commit 8e35f2c
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 54 deletions.
11 changes: 11 additions & 0 deletions .chloggen/loki-instrumentation-scope.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/loki

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Added `InstrumentationScope` to log object"

# One or more tracking issues related to the change
issues: [16485]
26 changes: 21 additions & 5 deletions exporter/lokiexporter/legacy_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type lokiExporter struct {
settings component.TelemetrySettings
client *http.Client
wg sync.WaitGroup
convert func(plog.LogRecord, pcommon.Resource) (*logproto.Entry, error)
convert func(plog.LogRecord, pcommon.Resource, pcommon.InstrumentationScope) (*logproto.Entry, error)
tenantSource tenant.Source
}

Expand Down Expand Up @@ -209,6 +209,7 @@ func (l *lokiExporter) logDataToLoki(ld plog.Logs) (pr *logproto.PushRequest, nu
resource := rls.At(i).Resource()
for j := 0; j < ills.Len(); j++ {
logs := ills.At(j).LogRecords()
scope := ills.At(j).Scope()
for k := 0; k < logs.Len(); k++ {
log := logs.At(k)

Expand All @@ -225,7 +226,7 @@ func (l *lokiExporter) logDataToLoki(ld plog.Logs) (pr *logproto.PushRequest, nu
labels := mergedLabels.String()
var entry *logproto.Entry
var err error
entry, err = l.convert(log, resource)
entry, err = l.convert(log, resource, scope)
if err != nil {
// Couldn't convert so dropping log.
numDroppedLogs++
Expand Down Expand Up @@ -326,7 +327,7 @@ func (l *lokiExporter) convertRecordAttributesToLabels(log plog.LogRecord) model
return ls
}

func (l *lokiExporter) convertLogBodyToEntry(lr plog.LogRecord, res pcommon.Resource) (*logproto.Entry, error) {
func (l *lokiExporter) convertLogBodyToEntry(lr plog.LogRecord, res pcommon.Resource, scope pcommon.InstrumentationScope) (*logproto.Entry, error) {
var b strings.Builder

if _, ok := l.config.Labels.RecordAttributes["severity"]; !ok && len(lr.SeverityText()) > 0 {
Expand Down Expand Up @@ -377,6 +378,21 @@ func (l *lokiExporter) convertLogBodyToEntry(lr plog.LogRecord, res pcommon.Reso
return true
})

scopeName := scope.Name()
scopeVersion := scope.Version()
if scopeName != "" {
b.WriteString("instrumentation_scope_name")
b.WriteString("=")
b.WriteString(scopeName)
b.WriteRune(' ')
if scopeVersion != "" {
b.WriteString("instrumentation_scope_version")
b.WriteString("=")
b.WriteString(scopeVersion)
b.WriteRune(' ')
}
}

b.WriteString(lr.Body().Str())

return &logproto.Entry{
Expand All @@ -385,8 +401,8 @@ func (l *lokiExporter) convertLogBodyToEntry(lr plog.LogRecord, res pcommon.Reso
}, nil
}

func (l *lokiExporter) convertLogToJSONEntry(lr plog.LogRecord, res pcommon.Resource) (*logproto.Entry, error) {
line, err := loki.Encode(lr, res)
func (l *lokiExporter) convertLogToJSONEntry(lr plog.LogRecord, res pcommon.Resource, scope pcommon.InstrumentationScope) (*logproto.Entry, error) {
line, err := loki.Encode(lr, res, scope)
if err != nil {
return nil, err
}
Expand Down
15 changes: 11 additions & 4 deletions exporter/lokiexporter/legacy_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,10 @@ func TestExporter_convertLogBodyToEntry(t *testing.T) {
res.Attributes().PutStr("host.name", "something")
res.Attributes().PutStr("pod.name", "something123")

scope := pcommon.NewInstrumentationScope()
scope.SetName("example-logger-name")
scope.SetVersion("v1")

lr := plog.NewLogRecord()
lr.Body().SetStr("Payment succeeded")
lr.SetTraceID([16]byte{1, 2, 3, 4})
Expand All @@ -584,11 +588,11 @@ func TestExporter_convertLogBodyToEntry(t *testing.T) {
ResourceAttributes: map[string]string{"pod.name": "pod.name"},
},
}, componenttest.NewNopTelemetrySettings())
entry, _ := exp.convertLogBodyToEntry(lr, res)
entry, _ := exp.convertLogBodyToEntry(lr, res, scope)

expEntry := &logproto.Entry{
Timestamp: time.Unix(0, int64(lr.Timestamp())),
Line: "severity=DEBUG severityN=5 traceID=01020304000000000000000000000000 spanID=0506070800000000 host.name=something Payment succeeded",
Line: "severity=DEBUG severityN=5 traceID=01020304000000000000000000000000 spanID=0506070800000000 host.name=something instrumentation_scope_name=example-logger-name instrumentation_scope_version=v1 Payment succeeded",
}
require.NotNil(t, entry)
require.Equal(t, expEntry, entry)
Expand Down Expand Up @@ -690,12 +694,15 @@ func TestExporter_convertLogtoJSONEntry(t *testing.T) {
lr.SetTimestamp(ts)
res := pcommon.NewResource()
res.Attributes().PutStr("host.name", "something")
scope := pcommon.NewInstrumentationScope()
scope.SetName("example-logger-name")
scope.SetVersion("v1")

exp := newLegacyExporter(&Config{}, componenttest.NewNopTelemetrySettings())
entry, err := exp.convertLogToJSONEntry(lr, res)
entry, err := exp.convertLogToJSONEntry(lr, res, scope)
expEntry := &logproto.Entry{
Timestamp: time.Unix(0, int64(lr.Timestamp())),
Line: `{"body":"log message","resources":{"host.name":"something"}}`,
Line: `{"body":"log message","resources":{"host.name":"something"},"instrumentation_scope":{"name":"example-logger-name","version":"v1"}}`,
}
require.Nil(t, err)
require.NotNil(t, entry)
Expand Down
14 changes: 7 additions & 7 deletions pkg/translator/loki/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ func removeAttributes(attrs pcommon.Map, labels model.LabelSet) {
})
}

func convertLogToJSONEntry(lr plog.LogRecord, res pcommon.Resource) (*logproto.Entry, error) {
line, err := Encode(lr, res)
func convertLogToJSONEntry(lr plog.LogRecord, res pcommon.Resource, scope pcommon.InstrumentationScope) (*logproto.Entry, error) {
line, err := Encode(lr, res, scope)
if err != nil {
return nil, err
}
Expand All @@ -153,8 +153,8 @@ func convertLogToJSONEntry(lr plog.LogRecord, res pcommon.Resource) (*logproto.E
}, nil
}

func convertLogToLogfmtEntry(lr plog.LogRecord, res pcommon.Resource) (*logproto.Entry, error) {
line, err := EncodeLogfmt(lr, res)
func convertLogToLogfmtEntry(lr plog.LogRecord, res pcommon.Resource, scope pcommon.InstrumentationScope) (*logproto.Entry, error) {
line, err := EncodeLogfmt(lr, res, scope)
if err != nil {
return nil, err
}
Expand All @@ -164,12 +164,12 @@ func convertLogToLogfmtEntry(lr plog.LogRecord, res pcommon.Resource) (*logproto
}, nil
}

func convertLogToLokiEntry(lr plog.LogRecord, res pcommon.Resource, format string) (*logproto.Entry, error) {
func convertLogToLokiEntry(lr plog.LogRecord, res pcommon.Resource, format string, scope pcommon.InstrumentationScope) (*logproto.Entry, error) {
switch format {
case formatJSON:
return convertLogToJSONEntry(lr, res)
return convertLogToJSONEntry(lr, res, scope)
case formatLogfmt:
return convertLogToLogfmtEntry(lr, res)
return convertLogToLogfmtEntry(lr, res, scope)
default:
return nil, fmt.Errorf("invalid format %s. Expected one of: %s, %s", format, formatJSON, formatLogfmt)
}
Expand Down
44 changes: 35 additions & 9 deletions pkg/translator/loki/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,25 @@ import (

// JSON representation of the LogRecord as described by https://developers.google.com/protocol-buffers/docs/proto3#json
type lokiEntry struct {
Name string `json:"name,omitempty"`
Body json.RawMessage `json:"body,omitempty"`
TraceID string `json:"traceid,omitempty"`
SpanID string `json:"spanid,omitempty"`
Severity string `json:"severity,omitempty"`
Attributes map[string]interface{} `json:"attributes,omitempty"`
Resources map[string]interface{} `json:"resources,omitempty"`
Name string `json:"name,omitempty"`
Body json.RawMessage `json:"body,omitempty"`
TraceID string `json:"traceid,omitempty"`
SpanID string `json:"spanid,omitempty"`
Severity string `json:"severity,omitempty"`
Attributes map[string]interface{} `json:"attributes,omitempty"`
Resources map[string]interface{} `json:"resources,omitempty"`
InstrumentationScope *instrumentationScope `json:"instrumentation_scope,omitempty"`
}

type instrumentationScope struct {
Name string `json:"name,omitempty"`
Version string `json:"version,omitempty"`
}

// Encode converts an OTLP log record and its resource attributes into a JSON
// string representing a Loki entry. An error is returned when the record can't
// be marshaled into JSON.
func Encode(lr plog.LogRecord, res pcommon.Resource) (string, error) {
func Encode(lr plog.LogRecord, res pcommon.Resource, scope pcommon.InstrumentationScope) (string, error) {
var logRecord lokiEntry
var jsonRecord []byte
var err error
Expand All @@ -60,6 +66,17 @@ func Encode(lr plog.LogRecord, res pcommon.Resource) (string, error) {
Resources: res.Attributes().AsRaw(),
}

scopeName := scope.Name()
scopeVersion := scope.Version()
if scopeName != "" {
logRecord.InstrumentationScope = &instrumentationScope{
Name: scopeName,
}
if scopeVersion != "" {
logRecord.InstrumentationScope.Version = scopeVersion
}
}

jsonRecord, err = json.Marshal(logRecord)
if err != nil {
return "", err
Expand All @@ -70,7 +87,7 @@ func Encode(lr plog.LogRecord, res pcommon.Resource) (string, error) {
// EncodeLogfmt converts an OTLP log record and its resource attributes into a logfmt
// string representing a Loki entry. An error is returned when the record can't
// be marshaled into logfmt.
func EncodeLogfmt(lr plog.LogRecord, res pcommon.Resource) (string, error) {
func EncodeLogfmt(lr plog.LogRecord, res pcommon.Resource, scope pcommon.InstrumentationScope) (string, error) {
keyvals := bodyToKeyvals(lr.Body())

if traceID := lr.TraceID(); !traceID.IsEmpty() {
Expand All @@ -97,6 +114,15 @@ func EncodeLogfmt(lr plog.LogRecord, res pcommon.Resource) (string, error) {
return true
})

scopeName := scope.Name()
scopeVersion := scope.Version()
if scopeName != "" {
keyvals = append(keyvals, "instrumentation_scope_name", scopeName)
if scopeVersion != "" {
keyvals = append(keyvals, "instrumentation_scope_version", scopeVersion)
}
}

logfmtLine, err := logfmt.MarshalKeyvals(keyvals...)
if err != nil {
return "", err
Expand Down
43 changes: 24 additions & 19 deletions pkg/translator/loki/encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
)

func exampleLog() (plog.LogRecord, pcommon.Resource) {
func exampleLog() (plog.LogRecord, pcommon.Resource, pcommon.InstrumentationScope) {

buffer := plog.NewLogRecord()
buffer.Body().SetStr("Example log")
Expand All @@ -36,27 +36,32 @@ func exampleLog() (plog.LogRecord, pcommon.Resource) {
resource := pcommon.NewResource()
resource.Attributes().PutStr("host.name", "something")

return buffer, resource
scope := pcommon.NewInstrumentationScope()
scope.SetName("example-logger-name")
scope.SetVersion("v1")

return buffer, resource, scope
}

func TestEncodeJsonWithStringBody(t *testing.T) {
in := `{"body":"Example log","traceid":"01020304000000000000000000000000","spanid":"0506070800000000","severity":"error","attributes":{"attr1":"1","attr2":"2"},"resources":{"host.name":"something"}}`
in := `{"body":"Example log","traceid":"01020304000000000000000000000000","spanid":"0506070800000000","severity":"error","attributes":{"attr1":"1","attr2":"2"},"resources":{"host.name":"something"},"instrumentation_scope":{"name":"example-logger-name","version":"v1"}}`
log, resource, scope := exampleLog()

out, err := Encode(exampleLog())
out, err := Encode(log, resource, scope)
assert.NoError(t, err)
assert.Equal(t, in, out)
}

func TestEncodeJsonWithMapBody(t *testing.T) {
in := `{"body":{"key1":"value","key2":"value"},"traceid":"01020304000000000000000000000000","spanid":"0506070800000000","severity":"error","attributes":{"attr1":"1","attr2":"2"},"resources":{"host.name":"something"}}`
in := `{"body":{"key1":"value","key2":"value"},"traceid":"01020304000000000000000000000000","spanid":"0506070800000000","severity":"error","attributes":{"attr1":"1","attr2":"2"},"resources":{"host.name":"something"},"instrumentation_scope":{"name":"example-logger-name","version":"v1"}}`

log, resource := exampleLog()
log, resource, scope := exampleLog()
mapVal := pcommon.NewValueMap()
mapVal.Map().PutStr("key1", "value")
mapVal.Map().PutStr("key2", "value")
mapVal.CopyTo(log.Body())

out, err := Encode(log, resource)
out, err := Encode(log, resource, scope)
assert.NoError(t, err)
assert.Equal(t, in, out)
}
Expand Down Expand Up @@ -148,42 +153,42 @@ func TestSerializeComplexBody(t *testing.T) {
}

func TestEncodeLogfmtWithStringBody(t *testing.T) {
in := `msg="hello world" traceID=01020304000000000000000000000000 spanID=0506070800000000 severity=error attribute_attr1=1 attribute_attr2=2 resource_host.name=something`
log, resource := exampleLog()
in := `msg="hello world" traceID=01020304000000000000000000000000 spanID=0506070800000000 severity=error attribute_attr1=1 attribute_attr2=2 resource_host.name=something instrumentation_scope_name=example-logger-name instrumentation_scope_version=v1`
log, resource, scope := exampleLog()
log.Body().SetStr("msg=\"hello world\"")
out, err := EncodeLogfmt(log, resource)
out, err := EncodeLogfmt(log, resource, scope)
assert.NoError(t, err)
assert.Equal(t, in, out)
}

func TestEncodeLogfmtWithMapBody(t *testing.T) {
in := `key1=value key2=value traceID=01020304000000000000000000000000 spanID=0506070800000000 severity=error attribute_attr1=1 attribute_attr2=2 resource_host.name=something`
log, resource := exampleLog()
in := `key1=value key2=value traceID=01020304000000000000000000000000 spanID=0506070800000000 severity=error attribute_attr1=1 attribute_attr2=2 resource_host.name=something instrumentation_scope_name=example-logger-name instrumentation_scope_version=v1`
log, resource, scope := exampleLog()
mapVal := pcommon.NewValueMap()
mapVal.Map().PutStr("key1", "value")
mapVal.Map().PutStr("key2", "value")
mapVal.CopyTo(log.Body())
out, err := EncodeLogfmt(log, resource)
out, err := EncodeLogfmt(log, resource, scope)
assert.NoError(t, err)
assert.Equal(t, in, out)
}

func TestEncodeLogfmtWithSliceBody(t *testing.T) {
in := `body_0=value body_1=true body_2=123 traceID=01020304000000000000000000000000 spanID=0506070800000000 severity=error attribute_attr1=1 attribute_attr2=2 resource_host.name=something`
log, resource := exampleLog()
in := `body_0=value body_1=true body_2=123 traceID=01020304000000000000000000000000 spanID=0506070800000000 severity=error attribute_attr1=1 attribute_attr2=2 resource_host.name=something instrumentation_scope_name=example-logger-name instrumentation_scope_version=v1`
log, resource, scope := exampleLog()
sliceVal := pcommon.NewValueSlice()
sliceVal.Slice().AppendEmpty().SetStr("value")
sliceVal.Slice().AppendEmpty().SetBool(true)
sliceVal.Slice().AppendEmpty().SetInt(123)
sliceVal.CopyTo(log.Body())
out, err := EncodeLogfmt(log, resource)
out, err := EncodeLogfmt(log, resource, scope)
assert.NoError(t, err)
assert.Equal(t, in, out)
}

func TestEncodeLogfmtWithComplexAttributes(t *testing.T) {
in := `Example= log= traceID=01020304000000000000000000000000 spanID=0506070800000000 severity=error attribute_attr1=1 attribute_attr2=2 attribute_aslice_0=fooo attribute_aslice_1_slice_0=true attribute_aslice_1_foo=bar attribute_aslice_2_nested="deeply nested" attribute_aslice_2_uint=123 resource_host.name=something resource_bslice_0=fooo resource_bslice_1_slice_0=true resource_bslice_1_foo=bar resource_bslice_2_nested="deeply nested" resource_bslice_2_uint=123`
log, resource := exampleLog()
in := `Example= log= traceID=01020304000000000000000000000000 spanID=0506070800000000 severity=error attribute_attr1=1 attribute_attr2=2 attribute_aslice_0=fooo attribute_aslice_1_slice_0=true attribute_aslice_1_foo=bar attribute_aslice_2_nested="deeply nested" attribute_aslice_2_uint=123 resource_host.name=something resource_bslice_0=fooo resource_bslice_1_slice_0=true resource_bslice_1_foo=bar resource_bslice_2_nested="deeply nested" resource_bslice_2_uint=123 instrumentation_scope_name=example-logger-name instrumentation_scope_version=v1`
log, resource, scope := exampleLog()
sliceVal := pcommon.NewValueSlice()
sliceVal.Slice().AppendEmpty().SetStr("fooo")
map1 := pcommon.NewValueMap()
Expand All @@ -197,7 +202,7 @@ func TestEncodeLogfmtWithComplexAttributes(t *testing.T) {
sliceVal.CopyTo(log.Attributes().PutEmpty("aslice"))
sliceVal.CopyTo(resource.Attributes().PutEmpty("bslice"))

out, err := EncodeLogfmt(log, resource)
out, err := EncodeLogfmt(log, resource, scope)
assert.NoError(t, err)
assert.Equal(t, in, out)
}
6 changes: 4 additions & 2 deletions pkg/translator/loki/logs_to_loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func LogsToLokiRequests(ld plog.Logs) map[string]PushRequest {

for j := 0; j < ills.Len(); j++ {
logs := ills.At(j).LogRecords()
scope := ills.At(j).Scope()
for k := 0; k < logs.Len(); k++ {

// similarly, we may remove attributes, so change only our version
Expand Down Expand Up @@ -99,7 +100,7 @@ func LogsToLokiRequests(ld plog.Logs) map[string]PushRequest {

// create the stream name based on the labels
labels := mergedLabels.String()
entry, err := convertLogToLokiEntry(log, resource, format)
entry, err := convertLogToLokiEntry(log, resource, format, scope)
if err != nil {
// Couldn't convert so dropping log.
group.report.Errors = append(group.report.Errors, fmt.Errorf("failed to convert, dropping log: %w", err))
Expand Down Expand Up @@ -205,6 +206,7 @@ func LogsToLoki(ld plog.Logs) (*logproto.PushRequest, *PushReport) {
ills := rls.At(i).ScopeLogs()

for j := 0; j < ills.Len(); j++ {
scope := ills.At(j).Scope()
logs := ills.At(j).LogRecords()
for k := 0; k < logs.Len(); k++ {

Expand All @@ -229,7 +231,7 @@ func LogsToLoki(ld plog.Logs) (*logproto.PushRequest, *PushReport) {
// create the stream name based on the labels
labels := mergedLabels.String()

entry, err := convertLogToLokiEntry(log, resource, format)
entry, err := convertLogToLokiEntry(log, resource, format, scope)
if err != nil {
// Couldn't convert so dropping log.
report.Errors = append(report.Errors, fmt.Errorf("failed to convert, dropping log: %w", err))
Expand Down
Loading

0 comments on commit 8e35f2c

Please sign in to comment.