diff --git a/.chloggen/elasticsearchexporter_otel-mode-traces-span-events.yaml b/.chloggen/elasticsearchexporter_otel-mode-traces-span-events.yaml new file mode 100644 index 000000000000..73e7e06cd4e5 --- /dev/null +++ b/.chloggen/elasticsearchexporter_otel-mode-traces-span-events.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add span event support to traces OTel mapping mode + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34831] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + Span events are now supported in OTel mapping mode. + They will be routed to `logs-${data_stream.dataset}-${data_stream.namespace}` if `traces_dynamic_index::enabled` is `true`. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 5212e0790a7f..73b72bec82a2 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -135,7 +135,7 @@ This can be customised through the following settings: - `traces_dynamic_index` (optional): uses resource, scope, or span attributes to dynamically construct index name. - `enabled`(default=false): Enable/Disable dynamic index for trace spans. If `data_stream.dataset` or `data_stream.namespace` exist in attributes (precedence: span attribute > scope attribute > resource attribute), they will be used to dynamically construct index name in the form `traces-${data_stream.dataset}-${data_stream.namespace}`. Otherwise, if - `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > span attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `traces-generic-default`, and `traces_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. + `elasticsearch.index.prefix` or `elasticsearch.index.suffix` exist in attributes (precedence: resource attribute > scope attribute > span attribute), they will be used to dynamically construct index name in the form `${elasticsearch.index.prefix}${traces_index}${elasticsearch.index.suffix}`. Otherwise, the index name falls back to `traces-generic-default`, and `traces_index` config will be ignored. Except for prefix/suffix attribute presence, the resulting docs will contain the corresponding `data_stream.*` fields. There is an exception for span events under OTel mapping mode (`mapping::mode: otel`), where span event attributes instead of span attributes are considered, and `data_stream.type` is always `logs` instead of `traces` such that documents are routed to `logs-${data_stream.dataset}-${data_stream.namespace}`. - `logstash_format` (optional): Logstash format compatibility. Logs, metrics and traces can be written into an index in Logstash format. - `enabled`(default=false): Enable/disable Logstash format compatibility. When `logstash_format.enabled` is `true`, the index name is composed using `(logs|metrics|traces)_index` or `(logs|metrics|traces)_dynamic_index` as prefix and the date as suffix, @@ -155,8 +155,10 @@ behaviours, which may be configured through the following settings: - `none`: Use original fields and event structure from the OTLP event. - `ecs`: Try to map fields to [Elastic Common Schema (ECS)][ECS] - `otel`: Elastic's preferred "OTel-native" mapping mode. Uses original fields and event structure from the OTLP event. - :warning: This mode's behavior is unstable, it is currently is experimental and undergoing changes. - There's a special treatment for the following attributes: `data_stream.type`, `data_stream.dataset`, `data_stream.namespace`. Instead of serializing these values under the `*attributes.*` namespace, they're put at the root of the document, to conform with the conventions of the data stream naming scheme that maps these as `constant_keyword` fields. + - :warning: This mode's behavior is unstable, it is currently is experimental and undergoing changes. + - There's a special treatment for the following attributes: `data_stream.type`, `data_stream.dataset`, `data_stream.namespace`. Instead of serializing these values under the `*attributes.*` namespace, they're put at the root of the document, to conform with the conventions of the data stream naming scheme that maps these as `constant_keyword` fields. + - `data_stream.dataset` will always be appended with `.otel`. It is recommended to use with `*_dynamic_index.enabled: true` to route documents to data stream `${data_stream.type}-${data_stream.dataset}-${data_stream.namespace}`. + - Span events are stored in separate documents. They will be routed with `data_stream.type` set to `logs` if `traces_dynamic_index::enabled` is `true`. - `raw`: Omit the `Attributes.` string prefixed to field names for log and span attributes as well as omit the `Events.` string prefixed to diff --git a/exporter/elasticsearchexporter/data_stream_router.go b/exporter/elasticsearchexporter/data_stream_router.go index 028fd183aa2d..851bb92d9756 100644 --- a/exporter/elasticsearchexporter/data_stream_router.go +++ b/exporter/elasticsearchexporter/data_stream_router.go @@ -11,7 +11,7 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" ) -func routeWithDefaults(defaultDSType, defaultDSDataset, defaultDSNamespace string) func( +func routeWithDefaults(defaultDSType string) func( pcommon.Map, pcommon.Map, pcommon.Map, @@ -29,8 +29,8 @@ func routeWithDefaults(defaultDSType, defaultDSDataset, defaultDSNamespace strin // 1. read data_stream.* from attributes // 2. read elasticsearch.index.* from attributes // 3. use default hardcoded data_stream.* - dataset, datasetExists := getFromAttributes(dataStreamDataset, defaultDSDataset, recordAttr, scopeAttr, resourceAttr) - namespace, namespaceExists := getFromAttributes(dataStreamNamespace, defaultDSNamespace, recordAttr, scopeAttr, resourceAttr) + dataset, datasetExists := getFromAttributes(dataStreamDataset, defaultDataStreamDataset, recordAttr, scopeAttr, resourceAttr) + namespace, namespaceExists := getFromAttributes(dataStreamNamespace, defaultDataStreamNamespace, recordAttr, scopeAttr, resourceAttr) dataStreamMode := datasetExists || namespaceExists if !dataStreamMode { prefix, prefixExists := getFromAttributes(indexPrefix, "", resourceAttr, scopeAttr, recordAttr) @@ -62,7 +62,7 @@ func routeLogRecord( fIndex string, otel bool, ) string { - route := routeWithDefaults(defaultDataStreamTypeLogs, defaultDataStreamDataset, defaultDataStreamNamespace) + route := routeWithDefaults(defaultDataStreamTypeLogs) return route(record.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel) } @@ -75,7 +75,7 @@ func routeDataPoint( fIndex string, otel bool, ) string { - route := routeWithDefaults(defaultDataStreamTypeMetrics, defaultDataStreamDataset, defaultDataStreamNamespace) + route := routeWithDefaults(defaultDataStreamTypeMetrics) return route(dataPoint.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel) } @@ -88,6 +88,20 @@ func routeSpan( fIndex string, otel bool, ) string { - route := routeWithDefaults(defaultDataStreamTypeTraces, defaultDataStreamDataset, defaultDataStreamNamespace) + route := routeWithDefaults(defaultDataStreamTypeTraces) return route(span.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel) } + +// routeSpanEvent returns the name of the index to send the span event to according to data stream routing attributes. +// This function may mutate record attributes. +func routeSpanEvent( + spanEvent ptrace.SpanEvent, + scope pcommon.InstrumentationScope, + resource pcommon.Resource, + fIndex string, + otel bool, +) string { + // span events are sent to logs-*, not traces-* + route := routeWithDefaults(defaultDataStreamTypeLogs) + return route(spanEvent.Attributes(), scope.Attributes(), resource.Attributes(), fIndex, otel) +} diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index ee7d697e8d5c..2bf4c0250fa4 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -370,6 +370,12 @@ func (e *elasticsearchExporter) pushTraceData( } errs = append(errs, err) } + for ii := 0; ii < span.Events().Len(); ii++ { + spanEvent := span.Events().At(ii) + if err := e.pushSpanEvent(ctx, resource, il.SchemaUrl(), span, spanEvent, scope, scopeSpan.SchemaUrl(), session); err != nil { + errs = append(errs, err) + } + } } } } @@ -411,3 +417,37 @@ func (e *elasticsearchExporter) pushTraceRecord( } return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil) } + +func (e *elasticsearchExporter) pushSpanEvent( + ctx context.Context, + resource pcommon.Resource, + resourceSchemaURL string, + span ptrace.Span, + spanEvent ptrace.SpanEvent, + scope pcommon.InstrumentationScope, + scopeSchemaURL string, + bulkIndexerSession bulkIndexerSession, +) error { + fIndex := e.index + if e.dynamicIndex { + fIndex = routeSpanEvent(spanEvent, scope, resource, fIndex, e.otel) + } + + if e.logstashFormat.Enabled { + formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now()) + if err != nil { + return err + } + fIndex = formattedIndex + } + + document := e.model.encodeSpanEvent(resource, resourceSchemaURL, span, spanEvent, scope, scopeSchemaURL) + if document == nil { + return nil + } + docBytes, err := e.model.encodeDocument(*document) + if err != nil { + return err + } + return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(docBytes), nil) +} diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 060b972430c2..c53f97a872b4 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -1100,6 +1100,11 @@ func TestExporterTraces(t *testing.T) { span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(3600, 0))) span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Unix(7200, 0))) + event := span.Events().AppendEmpty() + event.SetName("exception") + event.Attributes().PutStr("event.attr.foo", "event.attr.bar") + event.SetDroppedAttributesCount(1) + scopeAttr := span.Attributes() fillResourceAttributeMap(scopeAttr, map[string]string{ "attr.foo": "attr.bar", @@ -1122,13 +1127,17 @@ func TestExporterTraces(t *testing.T) { mustSendTraces(t, exporter, traces) - rec.WaitItems(1) + rec.WaitItems(2) expected := []itemRequest{ { Action: []byte(`{"create":{"_index":"traces-generic.otel-default"}}`), Document: []byte(`{"@timestamp":"1970-01-01T01:00:00.000000000Z","attributes":{"attr.foo":"attr.bar"},"data_stream":{"dataset":"generic.otel","namespace":"default","type":"traces"},"dropped_attributes_count":2,"dropped_events_count":3,"dropped_links_count":4,"duration":3600000000000,"kind":"Unspecified","links":[{"attributes":{"link.attr.foo":"link.attr.bar"},"dropped_attributes_count":11,"span_id":"","trace_id":"","trace_state":"bar"}],"name":"name","resource":{"attributes":{"resource.foo":"resource.bar"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"status":{"code":"Unset"},"trace_state":"foo"}`), }, + { + Action: []byte(`{"create":{"_index":"logs-generic.otel-default"}}`), + Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"event.attr.foo":"event.attr.bar","event.name":"exception"},"data_stream":{"dataset":"generic.otel","namespace":"default","type":"logs"},"dropped_attributes_count":1,"resource":{"attributes":{"resource.foo":"resource.bar"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`), + }, } assertItemsEqual(t, expected, rec.Items(), false) diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index 07e51c30fe75..55af4eb45db9 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -67,6 +67,7 @@ var resourceAttrsToPreserve = map[string]bool{ type mappingModel interface { encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string) ([]byte, error) encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string) ([]byte, error) + encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string) *objmodel.Document upsertMetricDataPointValue(map[uint32]objmodel.Document, pcommon.Resource, string, pcommon.InstrumentationScope, string, pmetric.Metric, dataPoint, pcommon.Value) error encodeDocument(objmodel.Document) ([]byte, error) } @@ -483,7 +484,9 @@ func (m *encodeModel) encodeScopeOTelMode(document *objmodel.Document, scope pco } func (m *encodeModel) encodeAttributesOTelMode(document *objmodel.Document, attributeMap pcommon.Map) { - attributeMap.RemoveIf(func(key string, val pcommon.Value) bool { + attrsCopy := pcommon.NewMap() // Copy to avoid mutating original map + attributeMap.CopyTo(attrsCopy) + attrsCopy.RemoveIf(func(key string, val pcommon.Value) bool { switch key { case dataStreamType, dataStreamDataset, dataStreamNamespace: // At this point the data_stream attributes are expected to be in the record attributes, @@ -494,7 +497,7 @@ func (m *encodeModel) encodeAttributesOTelMode(document *objmodel.Document, attr } return false }) - document.AddAttributes("attributes", attributeMap) + document.AddAttributes("attributes", attrsCopy) } func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string) ([]byte, error) { @@ -549,8 +552,6 @@ func (m *encodeModel) encodeSpanOTelMode(resource pcommon.Resource, resourceSche m.encodeResourceOTelMode(&document, resource, resourceSchemaURL) m.encodeScopeOTelMode(&document, scope, scopeSchemaURL) - // TODO: add span events to log data streams - return document } @@ -574,6 +575,26 @@ func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptra return document } +func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, spanEvent ptrace.SpanEvent, scope pcommon.InstrumentationScope, scopeSchemaURL string) *objmodel.Document { + if m.mode != MappingOTel { + // Currently span events are stored separately only in OTel mapping mode. + // In other modes, they are stored within the span document. + return nil + } + var document objmodel.Document + document.AddTimestamp("@timestamp", spanEvent.Timestamp()) + document.AddString("attributes.event.name", spanEvent.Name()) + document.AddSpanID("span_id", span.SpanID()) + document.AddTraceID("trace_id", span.TraceID()) + document.AddInt("dropped_attributes_count", int64(spanEvent.DroppedAttributesCount())) + + m.encodeAttributesOTelMode(&document, spanEvent.Attributes()) + m.encodeResourceOTelMode(&document, resource, resourceSchemaURL) + m.encodeScopeOTelMode(&document, scope, scopeSchemaURL) + + return &document +} + func (m *encodeModel) encodeAttributes(document *objmodel.Document, attributes pcommon.Map) { key := "Attributes" if m.mode == MappingRaw {