diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index 56659b612d9..849730376ff 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -27,7 +27,7 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits] * Add support for histograms to metrics intake {pull}5360[5360] * Upgrade Go to 1.16.4 {pull}5381[5381] * Add units to metric fields {pull}5395[5395] - +* Add support for adjusting OTel event timestamps using `telemetry.sdk.elastic_export_timestamp` {pull}5433[5433] [float] ==== Deprecated diff --git a/processor/otel/consumer.go b/processor/otel/consumer.go index d4fcde590c9..8371f5cef1c 100644 --- a/processor/otel/consumer.go +++ b/processor/otel/consumer.go @@ -42,6 +42,7 @@ import ( "strconv" "strings" "sync/atomic" + "time" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" @@ -102,32 +103,43 @@ func (c *Consumer) Capabilities() consumer.Capabilities { // ConsumeTraces consumes OpenTelemetry trace data, // converting into Elastic APM events and reporting to the Elastic APM schema. func (c *Consumer) ConsumeTraces(ctx context.Context, traces pdata.Traces) error { - batch := c.convert(traces) + receiveTimestamp := time.Now() + batch := c.convert(traces, receiveTimestamp) return c.Processor.ProcessBatch(ctx, batch) } -func (c *Consumer) convert(td pdata.Traces) *model.Batch { +func (c *Consumer) convert(td pdata.Traces, receiveTimestamp time.Time) *model.Batch { batch := model.Batch{} resourceSpans := td.ResourceSpans() for i := 0; i < resourceSpans.Len(); i++ { - c.convertResourceSpans(resourceSpans.At(i), &batch) + c.convertResourceSpans(resourceSpans.At(i), receiveTimestamp, &batch) } return &batch } -func (c *Consumer) convertResourceSpans(resourceSpans pdata.ResourceSpans, out *model.Batch) { +func (c *Consumer) convertResourceSpans(resourceSpans pdata.ResourceSpans, receiveTimestamp time.Time, out *model.Batch) { var metadata model.Metadata - translateResourceMetadata(resourceSpans.Resource(), &metadata) + var timeDelta time.Duration + resource := resourceSpans.Resource() + translateResourceMetadata(resource, &metadata) + if exportTimestamp, ok := exportTimestamp(resource); ok { + timeDelta = receiveTimestamp.Sub(exportTimestamp) + } instrumentationLibrarySpans := resourceSpans.InstrumentationLibrarySpans() for i := 0; i < instrumentationLibrarySpans.Len(); i++ { - c.convertInstrumentationLibrarySpans(instrumentationLibrarySpans.At(i), metadata, out) + c.convertInstrumentationLibrarySpans(instrumentationLibrarySpans.At(i), metadata, timeDelta, out) } } -func (c *Consumer) convertInstrumentationLibrarySpans(in pdata.InstrumentationLibrarySpans, metadata model.Metadata, out *model.Batch) { +func (c *Consumer) convertInstrumentationLibrarySpans( + in pdata.InstrumentationLibrarySpans, + metadata model.Metadata, + timeDelta time.Duration, + out *model.Batch, +) { otelSpans := in.Spans() for i := 0; i < otelSpans.Len(); i++ { - c.convertSpan(otelSpans.At(i), in.InstrumentationLibrary(), metadata, out) + c.convertSpan(otelSpans.At(i), in.InstrumentationLibrary(), metadata, timeDelta, out) } } @@ -135,12 +147,12 @@ func (c *Consumer) convertSpan( otelSpan pdata.Span, otelLibrary pdata.InstrumentationLibrary, metadata model.Metadata, + timeDelta time.Duration, out *model.Batch, ) { logger := logp.NewLogger(logs.Otel) root := otelSpan.ParentSpanID().IsEmpty() - var parentID string if !root { parentID = otelSpan.ParentSpanID().HexString() @@ -155,6 +167,7 @@ func (c *Consumer) convertSpan( if endTime.After(startTime) { durationMillis = endTime.Sub(startTime).Seconds() * 1000 } + timestamp := startTime.Add(timeDelta) var transaction *model.Transaction var span *model.Span @@ -171,7 +184,7 @@ func (c *Consumer) convertSpan( ID: spanID, ParentID: parentID, TraceID: traceID, - Timestamp: startTime, + Timestamp: timestamp, Duration: durationMillis, Name: name, Outcome: spanStatusOutcome(otelSpan.Status()), @@ -184,7 +197,7 @@ func (c *Consumer) convertSpan( ID: spanID, ParentID: parentID, TraceID: traceID, - Timestamp: startTime, + Timestamp: timestamp, Duration: durationMillis, Name: name, Outcome: spanStatusOutcome(otelSpan.Status()), @@ -195,7 +208,7 @@ func (c *Consumer) convertSpan( events := otelSpan.Events() for i := 0; i < events.Len(); i++ { - convertSpanEvent(logger, events.At(i), metadata, transaction, span, out) + convertSpanEvent(logger, events.At(i), metadata, transaction, span, timeDelta, out) } } @@ -735,6 +748,7 @@ func convertSpanEvent( event pdata.SpanEvent, metadata model.Metadata, transaction *model.Transaction, span *model.Span, // only one is non-nil + timeDelta time.Duration, out *model.Batch, ) { var e *model.Error @@ -775,8 +789,10 @@ func convertSpanEvent( // - exception.message` return } + timestamp := event.Timestamp().AsTime() + timestamp = timestamp.Add(timeDelta) e = convertOpenTelemetryExceptionSpanEvent( - event.Timestamp().AsTime(), + timestamp, exceptionType, exceptionMessage, exceptionStacktrace, exceptionEscaped, metadata.Service.Language.Name, ) diff --git a/processor/otel/consumer_test.go b/processor/otel/consumer_test.go index b8962c2cc6e..1c1c0a2ece8 100644 --- a/processor/otel/consumer_test.go +++ b/processor/otel/consumer_test.go @@ -578,6 +578,74 @@ func TestArrayLabels(t *testing.T) { }, tx.Labels) } +func TestConsumeTracesExportTimestamp(t *testing.T) { + traces, otelSpans := newTracesSpans() + + // The actual timestamps will be non-deterministic, as they are adjusted + // based on the server's clock. + // + // Use a large delta so that we can allow for a significant amount of + // delay in the test environment affecting the timestamp adjustment. + const timeDelta = time.Hour + const allowedError = 5 // seconds + + now := time.Now() + exportTimestamp := now.Add(-timeDelta) + traces.ResourceSpans().At(0).Resource().Attributes().InitFromMap(map[string]pdata.AttributeValue{ + "telemetry.sdk.elastic_export_timestamp": pdata.NewAttributeValueInt(exportTimestamp.UnixNano()), + }) + + // Offsets are start times relative to the export timestamp. + transactionOffset := -2 * time.Second + spanOffset := transactionOffset + time.Second + exceptionOffset := spanOffset + 25*time.Millisecond + transactionDuration := time.Second + 100*time.Millisecond + spanDuration := 50 * time.Millisecond + + exportedTransactionTimestamp := exportTimestamp.Add(transactionOffset) + exportedSpanTimestamp := exportTimestamp.Add(spanOffset) + exportedExceptionTimestamp := exportTimestamp.Add(exceptionOffset) + + otelSpan1 := pdata.NewSpan() + otelSpan1.SetTraceID(pdata.NewTraceID([16]byte{1})) + otelSpan1.SetSpanID(pdata.NewSpanID([8]byte{2})) + otelSpan1.SetStartTimestamp(pdata.TimestampFromTime(exportedTransactionTimestamp)) + otelSpan1.SetEndTimestamp(pdata.TimestampFromTime(exportedTransactionTimestamp.Add(transactionDuration))) + otelSpans.Spans().Append(otelSpan1) + + otelSpan2 := pdata.NewSpan() + otelSpan2.SetTraceID(pdata.NewTraceID([16]byte{1})) + otelSpan2.SetSpanID(pdata.NewSpanID([8]byte{2})) + otelSpan2.SetParentSpanID(pdata.NewSpanID([8]byte{3})) + otelSpan2.SetStartTimestamp(pdata.TimestampFromTime(exportedSpanTimestamp)) + otelSpan2.SetEndTimestamp(pdata.TimestampFromTime(exportedSpanTimestamp.Add(spanDuration))) + otelSpans.Spans().Append(otelSpan2) + + otelSpanEvent := pdata.NewSpanEvent() + otelSpanEvent.SetTimestamp(pdata.TimestampFromTime(exportedExceptionTimestamp)) + otelSpanEvent.SetName("exception") + otelSpanEvent.Attributes().InitFromMap(map[string]pdata.AttributeValue{ + "exception.type": pdata.NewAttributeValueString("the_type"), + "exception.message": pdata.NewAttributeValueString("the_message"), + "exception.stacktrace": pdata.NewAttributeValueString("the_stacktrace"), + }) + otelSpan2.Events().Append(otelSpanEvent) + + batch := transformTraces(t, traces) + require.Len(t, batch.Transactions, 1) + require.Len(t, batch.Spans, 1) + require.Len(t, batch.Errors, 1) + + // Give some leeway for one event, and check other events' timestamps relative to that one. + assert.InDelta(t, now.Add(transactionOffset).Unix(), batch.Transactions[0].Timestamp.Unix(), allowedError) + assert.Equal(t, spanOffset-transactionOffset, batch.Spans[0].Timestamp.Sub(batch.Transactions[0].Timestamp)) + assert.Equal(t, exceptionOffset-transactionOffset, batch.Errors[0].Timestamp.Sub(batch.Transactions[0].Timestamp)) + + // Durations should be unaffected. + assert.Equal(t, float64(transactionDuration.Milliseconds()), batch.Transactions[0].Duration) + assert.Equal(t, float64(spanDuration.Milliseconds()), batch.Spans[0].Duration) +} + func TestConsumer_JaegerMetadata(t *testing.T) { jaegerBatch := jaegermodel.Batch{ Spans: []*jaegermodel.Span{{ diff --git a/processor/otel/metrics.go b/processor/otel/metrics.go index 30e818d25e0..cbfbf8eeab8 100644 --- a/processor/otel/metrics.go +++ b/processor/otel/metrics.go @@ -51,29 +51,40 @@ import ( // ConsumeMetrics consumes OpenTelemetry metrics data, converting into // the Elastic APM metrics model and sending to the reporter. func (c *Consumer) ConsumeMetrics(ctx context.Context, metrics pdata.Metrics) error { - batch := c.convertMetrics(metrics) + receiveTimestamp := time.Now() + batch := c.convertMetrics(metrics, receiveTimestamp) return c.Processor.ProcessBatch(ctx, batch) } -func (c *Consumer) convertMetrics(metrics pdata.Metrics) *model.Batch { +func (c *Consumer) convertMetrics(metrics pdata.Metrics, receiveTimestamp time.Time) *model.Batch { batch := model.Batch{} resourceMetrics := metrics.ResourceMetrics() for i := 0; i < resourceMetrics.Len(); i++ { - c.convertResourceMetrics(resourceMetrics.At(i), &batch) + c.convertResourceMetrics(resourceMetrics.At(i), receiveTimestamp, &batch) } return &batch } -func (c *Consumer) convertResourceMetrics(resourceMetrics pdata.ResourceMetrics, out *model.Batch) { +func (c *Consumer) convertResourceMetrics(resourceMetrics pdata.ResourceMetrics, receiveTimestamp time.Time, out *model.Batch) { var metadata model.Metadata - translateResourceMetadata(resourceMetrics.Resource(), &metadata) + var timeDelta time.Duration + resource := resourceMetrics.Resource() + translateResourceMetadata(resource, &metadata) + if exportTimestamp, ok := exportTimestamp(resource); ok { + timeDelta = receiveTimestamp.Sub(exportTimestamp) + } instrumentationLibraryMetrics := resourceMetrics.InstrumentationLibraryMetrics() for i := 0; i < instrumentationLibraryMetrics.Len(); i++ { - c.convertInstrumentationLibraryMetrics(instrumentationLibraryMetrics.At(i), metadata, out) + c.convertInstrumentationLibraryMetrics(instrumentationLibraryMetrics.At(i), metadata, timeDelta, out) } } -func (c *Consumer) convertInstrumentationLibraryMetrics(in pdata.InstrumentationLibraryMetrics, metadata model.Metadata, out *model.Batch) { +func (c *Consumer) convertInstrumentationLibraryMetrics( + in pdata.InstrumentationLibraryMetrics, + metadata model.Metadata, + timeDelta time.Duration, + out *model.Batch, +) { var ms metricsets otelMetrics := in.Metrics() var unsupported int64 @@ -84,6 +95,7 @@ func (c *Consumer) convertInstrumentationLibraryMetrics(in pdata.Instrumentation } for _, m := range ms { m.Metadata = metadata + m.Timestamp = m.Timestamp.Add(timeDelta) out.Metricsets = append(out.Metricsets, m.Metricset) } if unsupported > 0 { diff --git a/processor/otel/metrics_test.go b/processor/otel/metrics_test.go index a4815b0765b..b1e7ac88658 100644 --- a/processor/otel/metrics_test.go +++ b/processor/otel/metrics_test.go @@ -317,6 +317,46 @@ func TestConsumeMetrics_JVM(t *testing.T) { }}, metricsets) } +func TestConsumeMetricsExportTimestamp(t *testing.T) { + resourceMetrics := pdata.NewResourceMetrics() + metrics := pdata.NewMetrics() + metrics.ResourceMetrics().Append(resourceMetrics) + + // The actual timestamps will be non-deterministic, as they are adjusted + // based on the server's clock. + // + // Use a large delta so that we can allow for a significant amount of + // delay in the test environment affecting the timestamp adjustment. + const timeDelta = time.Hour + const allowedError = 5 // seconds + + now := time.Now() + exportTimestamp := now.Add(-timeDelta) + resourceMetrics.Resource().Attributes().InitFromMap(map[string]pdata.AttributeValue{ + "telemetry.sdk.elastic_export_timestamp": pdata.NewAttributeValueInt(exportTimestamp.UnixNano()), + }) + + // Timestamp relative to the export timestamp. + dataPointOffset := -time.Second + exportedDataPointTimestamp := exportTimestamp.Add(dataPointOffset) + + instrumentationLibraryMetrics := pdata.NewInstrumentationLibraryMetrics() + resourceMetrics.InstrumentationLibraryMetrics().Append(instrumentationLibraryMetrics) + + metric := pdata.NewMetric() + metric.SetName("int_gauge") + metric.SetDataType(pdata.MetricDataTypeIntGauge) + intGauge := metric.IntGauge() + intGauge.DataPoints().Resize(1) + intGauge.DataPoints().At(0).SetTimestamp(pdata.TimestampFromTime(exportedDataPointTimestamp)) + intGauge.DataPoints().At(0).SetValue(1) + instrumentationLibraryMetrics.Metrics().Append(metric) + + metricsets, _ := transformMetrics(t, metrics) + require.Len(t, metricsets, 1) + assert.InDelta(t, now.Add(dataPointOffset).Unix(), metricsets[0].Timestamp.Unix(), allowedError) +} + func transformMetrics(t *testing.T, metrics pdata.Metrics) ([]*model.Metricset, otel.ConsumerStats) { var batches []*model.Batch recorder := batchRecorderBatchProcessor(&batches) diff --git a/processor/otel/timestamps.go b/processor/otel/timestamps.go new file mode 100644 index 00000000000..81f52fb0981 --- /dev/null +++ b/processor/otel/timestamps.go @@ -0,0 +1,36 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package otel + +import ( + "time" + + "go.opentelemetry.io/collector/consumer/pdata" +) + +// exportTimestamp extracts the `telemetry.sdk.elastic_export_timestamp` +// resource attribute as a timestamp, and returns a boolean indicating +// whether the attribute was found. +func exportTimestamp(resource pdata.Resource) (time.Time, bool) { + attr, ok := resource.Attributes().Get("telemetry.sdk.elastic_export_timestamp") + if !ok { + return time.Time{}, false + } + nsec := attr.IntVal() + return time.Unix(0, nsec), nsec > 0 +}