-
Notifications
You must be signed in to change notification settings - Fork 2.5k
/
Copy pathmodel.go
121 lines (105 loc) · 4.61 KB
/
model.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"
import (
"bytes"
"encoding/json"
"time"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
)
type mappingModel interface {
encodeLog(pcommon.Resource, plog.LogRecord, pcommon.InstrumentationScope) ([]byte, error)
encodeSpan(pcommon.Resource, ptrace.Span, pcommon.InstrumentationScope) ([]byte, error)
}
// encodeModel tries to keep the event as close to the original open telemetry semantics as is.
// No fields will be mapped by default.
//
// Field deduplication and dedotting of attributes is supported by the encodeModel.
//
// See: https://github.com/open-telemetry/oteps/blob/master/text/logs/0097-log-data-model.md
type encodeModel struct {
dedup bool
dedot bool
}
const (
traceIDField = "traceID"
spanIDField = "spanID"
attributeField = "attribute"
)
func (m *encodeModel) encodeLog(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) ([]byte, error) {
var document objmodel.Document
document.AddTimestamp("@timestamp", record.Timestamp()) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used.
document.AddTraceID("TraceId", record.TraceID())
document.AddSpanID("SpanId", record.SpanID())
document.AddInt("TraceFlags", int64(record.Flags()))
document.AddString("SeverityText", record.SeverityText())
document.AddInt("SeverityNumber", int64(record.SeverityNumber()))
document.AddAttribute("Body", record.Body())
document.AddAttributes("Attributes", record.Attributes())
document.AddAttributes("Resource", resource.Attributes())
document.AddAttributes("Scope", scopeToAttributes(scope))
if m.dedup {
document.Dedup()
} else if m.dedot {
document.Sort()
}
var buf bytes.Buffer
err := document.Serialize(&buf, m.dedot)
return buf.Bytes(), err
}
func (m *encodeModel) encodeSpan(resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) ([]byte, error) {
var document objmodel.Document
document.AddTimestamp("@timestamp", span.StartTimestamp()) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used.
document.AddTimestamp("EndTimestamp", span.EndTimestamp())
document.AddTraceID("TraceId", span.TraceID())
document.AddSpanID("SpanId", span.SpanID())
document.AddSpanID("ParentSpanId", span.ParentSpanID())
document.AddString("Name", span.Name())
document.AddString("Kind", traceutil.SpanKindStr(span.Kind()))
document.AddInt("TraceStatus", int64(span.Status().Code()))
document.AddString("Link", spanLinksToString(span.Links()))
document.AddAttributes("Attributes", span.Attributes())
document.AddAttributes("Resource", resource.Attributes())
document.AddEvents("Events", span.Events())
document.AddInt("Duration", durationAsMicroseconds(span.StartTimestamp().AsTime(), span.EndTimestamp().AsTime())) // unit is microseconds
document.AddAttributes("Scope", scopeToAttributes(scope))
if m.dedup {
document.Dedup()
} else if m.dedot {
document.Sort()
}
var buf bytes.Buffer
err := document.Serialize(&buf, m.dedot)
return buf.Bytes(), err
}
func spanLinksToString(spanLinkSlice ptrace.SpanLinkSlice) string {
linkArray := make([]map[string]any, 0, spanLinkSlice.Len())
for i := 0; i < spanLinkSlice.Len(); i++ {
spanLink := spanLinkSlice.At(i)
link := map[string]any{}
link[spanIDField] = traceutil.SpanIDToHexOrEmptyString(spanLink.SpanID())
link[traceIDField] = traceutil.TraceIDToHexOrEmptyString(spanLink.TraceID())
link[attributeField] = spanLink.Attributes().AsRaw()
linkArray = append(linkArray, link)
}
linkArrayBytes, _ := json.Marshal(&linkArray)
return string(linkArrayBytes)
}
// durationAsMicroseconds calculate span duration through end - start nanoseconds and converts time.Time to microseconds,
// which is the format the Duration field is stored in the Span.
func durationAsMicroseconds(start, end time.Time) int64 {
return (end.UnixNano() - start.UnixNano()) / 1000
}
func scopeToAttributes(scope pcommon.InstrumentationScope) pcommon.Map {
attrs := pcommon.NewMap()
attrs.PutStr("name", scope.Name())
attrs.PutStr("version", scope.Version())
for k, v := range scope.Attributes().AsRaw() {
attrs.PutStr(k, v.(string))
}
return attrs
}