Skip to content

Commit

Permalink
[exporter/elasticsearch] Add span event to traces OTel mapping mode (o…
Browse files Browse the repository at this point in the history
…pen-telemetry#34831)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
  Span events are now supported in OTel mapping mode.
They will be routed to
`logs-${data_stream.dataset}-${data_stream.namespace}` if
`traces_dynamic_index::enabled` is `true`.

**Link to tracking Issue:** <Issue number if applicable>

**Testing:** <Describe what testing was performed and which tests were
added.>

Updated exporter tests

**Documentation:** <Describe the documentation added.>
  • Loading branch information
carsonip authored and f7o committed Sep 12, 2024
1 parent 3d3674b commit 7247c42
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 14 deletions.
29 changes: 29 additions & 0 deletions .chloggen/elasticsearchexporter_otel-mode-traces-span-events.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add span event support to traces OTel mapping mode

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34831]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
Span events are now supported in OTel mapping mode.
They will be routed to `logs-${data_stream.dataset}-${data_stream.namespace}` if `traces_dynamic_index::enabled` is `true`.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
8 changes: 5 additions & 3 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ This can be customised through the following settings:

- `traces_dynamic_index` (optional): uses resource, scope, or span attributes to dynamically construct index name.
- `enabled`(default=false): Enable/Disable dynamic index for trace spans. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: span attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `traces-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if
`elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > span attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `traces-generic-default`, and `traces_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields.
`elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > span attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `traces-generic-default`, and `traces_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. There is an exception for span events under OTel mapping mode (`mapping::mode: otel`), where span event attributes instead of span attributes are considered, and `data_stream.type` is always `logs` instead of `traces` such that documents are routed to `logs-${data_stream.dataset}-${data_stream.namespace}`.

- `logstash_format` (optional): Logstash format compatibility. Logs, metrics and traces can be written into an index in Logstash format.
- `enabled`(default=false): Enable/disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `(logs|metrics|traces)_index` or `(logs|metrics|traces)_dynamic_index` as prefix and the date as suffix,
Expand All @@ -155,8 +155,10 @@ behaviours, which may be configured through the following settings:
- `none`: Use original fields and event structure from the OTLP event.
- `ecs`: Try to map fields to [Elastic Common Schema (ECS)][ECS]
- `otel`: Elastic's preferred "OTel-native" mapping mode. Uses original fields and event structure from the OTLP event.
:warning: This mode's behavior is unstable, it is currently is experimental and undergoing changes.
There's a special treatment for the following attributes: `data_stream.type`, `data_stream.dataset`, `data_stream.namespace`. Instead of serializing these values under the `*attributes.*` namespace, they're put at the root of the document, to conform with the conventions of the data stream naming scheme that maps these as `constant_keyword` fields.
- :warning: This mode's behavior is unstable, it is currently is experimental and undergoing changes.
- There's a special treatment for the following attributes: `data_stream.type`, `data_stream.dataset`, `data_stream.namespace`. Instead of serializing these values under the `*attributes.*` namespace, they're put at the root of the document, to conform with the conventions of the data stream naming scheme that maps these as `constant_keyword` fields.
- `data_stream.dataset` will always be appended with `.otel`. It is recommended to use with `*_dynamic_index.enabled: true` to route documents to data stream `${data_stream.type}-${data_stream.dataset}-${data_stream.namespace}`.
- Span events are stored in separate documents. They will be routed with `data_stream.type` set to `logs` if `traces_dynamic_index::enabled` is `true`.

- `raw`: Omit the `Attributes.` string prefixed to field names for log and
span attributes as well as omit the `Events.` string prefixed to
Expand Down
26 changes: 20 additions & 6 deletions exporter/elasticsearchexporter/data_stream_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
)

func routeWithDefaults(defaultDSType, defaultDSDataset, defaultDSNamespace string) func(
func routeWithDefaults(defaultDSType string) func(
pcommon.Map,
pcommon.Map,
pcommon.Map,
Expand All @@ -29,8 +29,8 @@ func routeWithDefaults(defaultDSType, defaultDSDataset, defaultDSNamespace strin
// 1. read data_stream.* from attributes
// 2. read elasticsearch.index.* from attributes
// 3. use default hardcoded data_stream.*
dataset, datasetExists := getFromAttributes(dataStreamDataset, defaultDSDataset, recordAttr, scopeAttr, resourceAttr)
namespace, namespaceExists := getFromAttributes(dataStreamNamespace, defaultDSNamespace, recordAttr, scopeAttr, resourceAttr)
dataset, datasetExists := getFromAttributes(dataStreamDataset, defaultDataStreamDataset, recordAttr, scopeAttr, resourceAttr)
namespace, namespaceExists := getFromAttributes(dataStreamNamespace, defaultDataStreamNamespace, recordAttr, scopeAttr, resourceAttr)
dataStreamMode := datasetExists || namespaceExists
if !dataStreamMode {
prefix, prefixExists := getFromAttributes(indexPrefix, "", resourceAttr, scopeAttr, recordAttr)
Expand Down Expand Up @@ -62,7 +62,7 @@ func routeLogRecord(
fIndex string,
otel bool,
) string {
route := routeWithDefaults(defaultDataStreamTypeLogs, defaultDataStreamDataset, defaultDataStreamNamespace)
route := routeWithDefaults(defaultDataStreamTypeLogs)
return route(record.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel)
}

Expand All @@ -75,7 +75,7 @@ func routeDataPoint(
fIndex string,
otel bool,
) string {
route := routeWithDefaults(defaultDataStreamTypeMetrics, defaultDataStreamDataset, defaultDataStreamNamespace)
route := routeWithDefaults(defaultDataStreamTypeMetrics)
return route(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel)
}

Expand All @@ -88,6 +88,20 @@ func routeSpan(
fIndex string,
otel bool,
) string {
route := routeWithDefaults(defaultDataStreamTypeTraces, defaultDataStreamDataset, defaultDataStreamNamespace)
route := routeWithDefaults(defaultDataStreamTypeTraces)
return route(span.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel)
}

// routeSpanEvent returns the name of the index to send the span event to according to data stream routing attributes.
// This function may mutate record attributes.
func routeSpanEvent(
spanEvent ptrace.SpanEvent,
scope pcommon.InstrumentationScope,
resource pcommon.Resource,
fIndex string,
otel bool,
) string {
// span events are sent to logs-*, not traces-*
route := routeWithDefaults(defaultDataStreamTypeLogs)
return route(spanEvent.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel)
}
40 changes: 40 additions & 0 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,12 @@ func (e *elasticsearchExporter) pushTraceData(
}
errs = append(errs, err)
}
for ii := 0; ii < span.Events().Len(); ii++ {
spanEvent := span.Events().At(ii)
if err := e.pushSpanEvent(ctx, resource, il.SchemaUrl(), span, spanEvent, scope, scopeSpan.SchemaUrl(), session); err != nil {
errs = append(errs, err)
}
}
}
}
}
Expand Down Expand Up @@ -411,3 +417,37 @@ func (e *elasticsearchExporter) pushTraceRecord(
}
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil)
}

func (e *elasticsearchExporter) pushSpanEvent(
ctx context.Context,
resource pcommon.Resource,
resourceSchemaURL string,
span ptrace.Span,
spanEvent ptrace.SpanEvent,
scope pcommon.InstrumentationScope,
scopeSchemaURL string,
bulkIndexerSession bulkIndexerSession,
) error {
fIndex := e.index
if e.dynamicIndex {
fIndex = routeSpanEvent(spanEvent, scope, resource, fIndex, e.otel)
}

if e.logstashFormat.Enabled {
formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now())
if err != nil {
return err
}
fIndex = formattedIndex
}

document := e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL)
if document == nil {
return nil
}
docBytes, err := e.model.encodeDocument(*document)
if err != nil {
return err
}
return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(docBytes), nil)
}
11 changes: 10 additions & 1 deletion exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,11 @@ func TestExporterTraces(t *testing.T) {
span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(3600, 0)))
span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Unix(7200, 0)))

event := span.Events().AppendEmpty()
event.SetName("exception")
event.Attributes().PutStr("event.attr.foo", "event.attr.bar")
event.SetDroppedAttributesCount(1)

scopeAttr := span.Attributes()
fillResourceAttributeMap(scopeAttr, map[string]string{
"attr.foo": "attr.bar",
Expand All @@ -1122,13 +1127,17 @@ func TestExporterTraces(t *testing.T) {

mustSendTraces(t, exporter, traces)

rec.WaitItems(1)
rec.WaitItems(2)

expected := []itemRequest{
{
Action: []byte(`{"create":{"_index":"traces-generic.otel-default"}}`),
Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","attributes":{"attr.foo":"attr.bar"},"data_stream":{"dataset":"generic.otel","namespace":"default","type":"traces"},"dropped_attributes_count":2,"dropped_events_count":3,"dropped_links_count":4,"duration":3600000000000,"kind":"Unspecified","links":[{"attributes":{"link.attr.foo":"link.attr.bar"},"dropped_attributes_count":11,"span_id":"","trace_id":"","trace_state":"bar"}],"name":"name","resource":{"attributes":{"resource.foo":"resource.bar"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"status":{"code":"Unset"},"trace_state":"foo"}`),
},
{
Action: []byte(`{"create":{"_index":"logs-generic.otel-default"}}`),
Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"event.attr.foo":"event.attr.bar","event.name":"exception"},"data_stream":{"dataset":"generic.otel","namespace":"default","type":"logs"},"dropped_attributes_count":1,"resource":{"attributes":{"resource.foo":"resource.bar"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`),
},
}

assertItemsEqual(t, expected, rec.Items(), false)
Expand Down
29 changes: 25 additions & 4 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var resourceAttrsToPreserve = map[string]bool{
type mappingModel interface {
encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string) ([]byte, error)
encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string) ([]byte, error)
encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string) *objmodel.Document
upsertMetricDataPointValue(map[uint32]objmodel.Document, pcommon.Resource, string, pcommon.InstrumentationScope, string, pmetric.Metric, dataPoint, pcommon.Value) error
encodeDocument(objmodel.Document) ([]byte, error)
}
Expand Down Expand Up @@ -483,7 +484,9 @@ func (m *encodeModel) encodeScopeOTelMode(document *objmodel.Document, scope pco
}

func (m *encodeModel) encodeAttributesOTelMode(document *objmodel.Document, attributeMap pcommon.Map) {
attributeMap.RemoveIf(func(key string, val pcommon.Value) bool {
attrsCopy := pcommon.NewMap() // Copy to avoid mutating original map
attributeMap.CopyTo(attrsCopy)
attrsCopy.RemoveIf(func(key string, val pcommon.Value) bool {
switch key {
case dataStreamType, dataStreamDataset, dataStreamNamespace:
// At this point the data_stream attributes are expected to be in the record attributes,
Expand All @@ -494,7 +497,7 @@ func (m *encodeModel) encodeAttributesOTelMode(document *objmodel.Document, attr
}
return false
})
document.AddAttributes("attributes", attributeMap)
document.AddAttributes("attributes", attrsCopy)
}

func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string) ([]byte, error) {
Expand Down Expand Up @@ -549,8 +552,6 @@ func (m *encodeModel) encodeSpanOTelMode(resource pcommon.Resource, resourceSche
m.encodeResourceOTelMode(&document, resource, resourceSchemaURL)
m.encodeScopeOTelMode(&document, scope, scopeSchemaURL)

// TODO: add span events to log data streams

return document
}

Expand All @@ -574,6 +575,26 @@ func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptra
return document
}

func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string) *objmodel.Document {
if m.mode != MappingOTel {
// Currently span events are stored separately only in OTel mapping mode.
// In other modes, they are stored within the span document.
return nil
}
var document objmodel.Document
document.AddTimestamp("@timestamp", spanEvent.Timestamp())
document.AddString("attributes.event.name", spanEvent.Name())
document.AddSpanID("span_id", span.SpanID())
document.AddTraceID("trace_id", span.TraceID())
document.AddInt("dropped_attributes_count", int64(spanEvent.DroppedAttributesCount()))

m.encodeAttributesOTelMode(&document, spanEvent.Attributes())
m.encodeResourceOTelMode(&document, resource, resourceSchemaURL)
m.encodeScopeOTelMode(&document, scope, scopeSchemaURL)

return &document
}

func (m *encodeModel) encodeAttributes(document *objmodel.Document, attributes pcommon.Map) {
key := "Attributes"
if m.mode == MappingRaw {
Expand Down

0 comments on commit 7247c42

Please sign in to comment.