Skip to content

Commit

Permalink
processor/otel: adjust timestamps for mobile (#5433)
Browse files Browse the repository at this point in the history
Add support for a custom resource attribute,
`telemetry.sdk.elastic_export_timestamp`,
representing the time at which the agent sent
the event payload to the server. The server
uses this to adjust the timestamps of events,
to cater for end-user devices having incorrect
system clock settings.
  • Loading branch information
axw authored Jun 16, 2021
1 parent 6344c98 commit 1a93a25
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 21 deletions.
2 changes: 1 addition & 1 deletion changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits]
* Add support for histograms to metrics intake {pull}5360[5360]
* Upgrade Go to 1.16.4 {pull}5381[5381]
* Add units to metric fields {pull}5395[5395]

* Add support for adjusting OTel event timestamps using `telemetry.sdk.elastic_export_timestamp` {pull}5433[5433]

[float]
==== Deprecated
42 changes: 29 additions & 13 deletions processor/otel/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"strconv"
"strings"
"sync/atomic"
"time"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
Expand Down Expand Up @@ -102,45 +103,56 @@ func (c *Consumer) Capabilities() consumer.Capabilities {
// ConsumeTraces consumes OpenTelemetry trace data,
// converting into Elastic APM events and reporting to the Elastic APM schema.
func (c *Consumer) ConsumeTraces(ctx context.Context, traces pdata.Traces) error {
batch := c.convert(traces)
receiveTimestamp := time.Now()
batch := c.convert(traces, receiveTimestamp)
return c.Processor.ProcessBatch(ctx, batch)
}

func (c *Consumer) convert(td pdata.Traces) *model.Batch {
func (c *Consumer) convert(td pdata.Traces, receiveTimestamp time.Time) *model.Batch {
batch := model.Batch{}
resourceSpans := td.ResourceSpans()
for i := 0; i < resourceSpans.Len(); i++ {
c.convertResourceSpans(resourceSpans.At(i), &batch)
c.convertResourceSpans(resourceSpans.At(i), receiveTimestamp, &batch)
}
return &batch
}

func (c *Consumer) convertResourceSpans(resourceSpans pdata.ResourceSpans, out *model.Batch) {
func (c *Consumer) convertResourceSpans(resourceSpans pdata.ResourceSpans, receiveTimestamp time.Time, out *model.Batch) {
var metadata model.Metadata
translateResourceMetadata(resourceSpans.Resource(), &metadata)
var timeDelta time.Duration
resource := resourceSpans.Resource()
translateResourceMetadata(resource, &metadata)
if exportTimestamp, ok := exportTimestamp(resource); ok {
timeDelta = receiveTimestamp.Sub(exportTimestamp)
}
instrumentationLibrarySpans := resourceSpans.InstrumentationLibrarySpans()
for i := 0; i < instrumentationLibrarySpans.Len(); i++ {
c.convertInstrumentationLibrarySpans(instrumentationLibrarySpans.At(i), metadata, out)
c.convertInstrumentationLibrarySpans(instrumentationLibrarySpans.At(i), metadata, timeDelta, out)
}
}

func (c *Consumer) convertInstrumentationLibrarySpans(in pdata.InstrumentationLibrarySpans, metadata model.Metadata, out *model.Batch) {
func (c *Consumer) convertInstrumentationLibrarySpans(
in pdata.InstrumentationLibrarySpans,
metadata model.Metadata,
timeDelta time.Duration,
out *model.Batch,
) {
otelSpans := in.Spans()
for i := 0; i < otelSpans.Len(); i++ {
c.convertSpan(otelSpans.At(i), in.InstrumentationLibrary(), metadata, out)
c.convertSpan(otelSpans.At(i), in.InstrumentationLibrary(), metadata, timeDelta, out)
}
}

func (c *Consumer) convertSpan(
otelSpan pdata.Span,
otelLibrary pdata.InstrumentationLibrary,
metadata model.Metadata,
timeDelta time.Duration,
out *model.Batch,
) {
logger := logp.NewLogger(logs.Otel)

root := otelSpan.ParentSpanID().IsEmpty()

var parentID string
if !root {
parentID = otelSpan.ParentSpanID().HexString()
Expand All @@ -155,6 +167,7 @@ func (c *Consumer) convertSpan(
if endTime.After(startTime) {
durationMillis = endTime.Sub(startTime).Seconds() * 1000
}
timestamp := startTime.Add(timeDelta)

var transaction *model.Transaction
var span *model.Span
Expand All @@ -171,7 +184,7 @@ func (c *Consumer) convertSpan(
ID: spanID,
ParentID: parentID,
TraceID: traceID,
Timestamp: startTime,
Timestamp: timestamp,
Duration: durationMillis,
Name: name,
Outcome: spanStatusOutcome(otelSpan.Status()),
Expand All @@ -184,7 +197,7 @@ func (c *Consumer) convertSpan(
ID: spanID,
ParentID: parentID,
TraceID: traceID,
Timestamp: startTime,
Timestamp: timestamp,
Duration: durationMillis,
Name: name,
Outcome: spanStatusOutcome(otelSpan.Status()),
Expand All @@ -195,7 +208,7 @@ func (c *Consumer) convertSpan(

events := otelSpan.Events()
for i := 0; i < events.Len(); i++ {
convertSpanEvent(logger, events.At(i), metadata, transaction, span, out)
convertSpanEvent(logger, events.At(i), metadata, transaction, span, timeDelta, out)
}
}

Expand Down Expand Up @@ -735,6 +748,7 @@ func convertSpanEvent(
event pdata.SpanEvent,
metadata model.Metadata,
transaction *model.Transaction, span *model.Span, // only one is non-nil
timeDelta time.Duration,
out *model.Batch,
) {
var e *model.Error
Expand Down Expand Up @@ -775,8 +789,10 @@ func convertSpanEvent(
// - exception.message`
return
}
timestamp := event.Timestamp().AsTime()
timestamp = timestamp.Add(timeDelta)
e = convertOpenTelemetryExceptionSpanEvent(
event.Timestamp().AsTime(),
timestamp,
exceptionType, exceptionMessage, exceptionStacktrace,
exceptionEscaped, metadata.Service.Language.Name,
)
Expand Down
68 changes: 68 additions & 0 deletions processor/otel/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,74 @@ func TestArrayLabels(t *testing.T) {
}, tx.Labels)
}

func TestConsumeTracesExportTimestamp(t *testing.T) {
traces, otelSpans := newTracesSpans()

// The actual timestamps will be non-deterministic, as they are adjusted
// based on the server's clock.
//
// Use a large delta so that we can allow for a significant amount of
// delay in the test environment affecting the timestamp adjustment.
const timeDelta = time.Hour
const allowedError = 5 // seconds

now := time.Now()
exportTimestamp := now.Add(-timeDelta)
traces.ResourceSpans().At(0).Resource().Attributes().InitFromMap(map[string]pdata.AttributeValue{
"telemetry.sdk.elastic_export_timestamp": pdata.NewAttributeValueInt(exportTimestamp.UnixNano()),
})

// Offsets are start times relative to the export timestamp.
transactionOffset := -2 * time.Second
spanOffset := transactionOffset + time.Second
exceptionOffset := spanOffset + 25*time.Millisecond
transactionDuration := time.Second + 100*time.Millisecond
spanDuration := 50 * time.Millisecond

exportedTransactionTimestamp := exportTimestamp.Add(transactionOffset)
exportedSpanTimestamp := exportTimestamp.Add(spanOffset)
exportedExceptionTimestamp := exportTimestamp.Add(exceptionOffset)

otelSpan1 := pdata.NewSpan()
otelSpan1.SetTraceID(pdata.NewTraceID([16]byte{1}))
otelSpan1.SetSpanID(pdata.NewSpanID([8]byte{2}))
otelSpan1.SetStartTimestamp(pdata.TimestampFromTime(exportedTransactionTimestamp))
otelSpan1.SetEndTimestamp(pdata.TimestampFromTime(exportedTransactionTimestamp.Add(transactionDuration)))
otelSpans.Spans().Append(otelSpan1)

otelSpan2 := pdata.NewSpan()
otelSpan2.SetTraceID(pdata.NewTraceID([16]byte{1}))
otelSpan2.SetSpanID(pdata.NewSpanID([8]byte{2}))
otelSpan2.SetParentSpanID(pdata.NewSpanID([8]byte{3}))
otelSpan2.SetStartTimestamp(pdata.TimestampFromTime(exportedSpanTimestamp))
otelSpan2.SetEndTimestamp(pdata.TimestampFromTime(exportedSpanTimestamp.Add(spanDuration)))
otelSpans.Spans().Append(otelSpan2)

otelSpanEvent := pdata.NewSpanEvent()
otelSpanEvent.SetTimestamp(pdata.TimestampFromTime(exportedExceptionTimestamp))
otelSpanEvent.SetName("exception")
otelSpanEvent.Attributes().InitFromMap(map[string]pdata.AttributeValue{
"exception.type": pdata.NewAttributeValueString("the_type"),
"exception.message": pdata.NewAttributeValueString("the_message"),
"exception.stacktrace": pdata.NewAttributeValueString("the_stacktrace"),
})
otelSpan2.Events().Append(otelSpanEvent)

batch := transformTraces(t, traces)
require.Len(t, batch.Transactions, 1)
require.Len(t, batch.Spans, 1)
require.Len(t, batch.Errors, 1)

// Give some leeway for one event, and check other events' timestamps relative to that one.
assert.InDelta(t, now.Add(transactionOffset).Unix(), batch.Transactions[0].Timestamp.Unix(), allowedError)
assert.Equal(t, spanOffset-transactionOffset, batch.Spans[0].Timestamp.Sub(batch.Transactions[0].Timestamp))
assert.Equal(t, exceptionOffset-transactionOffset, batch.Errors[0].Timestamp.Sub(batch.Transactions[0].Timestamp))

// Durations should be unaffected.
assert.Equal(t, float64(transactionDuration.Milliseconds()), batch.Transactions[0].Duration)
assert.Equal(t, float64(spanDuration.Milliseconds()), batch.Spans[0].Duration)
}

func TestConsumer_JaegerMetadata(t *testing.T) {
jaegerBatch := jaegermodel.Batch{
Spans: []*jaegermodel.Span{{
Expand Down
26 changes: 19 additions & 7 deletions processor/otel/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,40 @@ import (
// ConsumeMetrics consumes OpenTelemetry metrics data, converting into
// the Elastic APM metrics model and sending to the reporter.
func (c *Consumer) ConsumeMetrics(ctx context.Context, metrics pdata.Metrics) error {
batch := c.convertMetrics(metrics)
receiveTimestamp := time.Now()
batch := c.convertMetrics(metrics, receiveTimestamp)
return c.Processor.ProcessBatch(ctx, batch)
}

func (c *Consumer) convertMetrics(metrics pdata.Metrics) *model.Batch {
func (c *Consumer) convertMetrics(metrics pdata.Metrics, receiveTimestamp time.Time) *model.Batch {
batch := model.Batch{}
resourceMetrics := metrics.ResourceMetrics()
for i := 0; i < resourceMetrics.Len(); i++ {
c.convertResourceMetrics(resourceMetrics.At(i), &batch)
c.convertResourceMetrics(resourceMetrics.At(i), receiveTimestamp, &batch)
}
return &batch
}

func (c *Consumer) convertResourceMetrics(resourceMetrics pdata.ResourceMetrics, out *model.Batch) {
func (c *Consumer) convertResourceMetrics(resourceMetrics pdata.ResourceMetrics, receiveTimestamp time.Time, out *model.Batch) {
var metadata model.Metadata
translateResourceMetadata(resourceMetrics.Resource(), &metadata)
var timeDelta time.Duration
resource := resourceMetrics.Resource()
translateResourceMetadata(resource, &metadata)
if exportTimestamp, ok := exportTimestamp(resource); ok {
timeDelta = receiveTimestamp.Sub(exportTimestamp)
}
instrumentationLibraryMetrics := resourceMetrics.InstrumentationLibraryMetrics()
for i := 0; i < instrumentationLibraryMetrics.Len(); i++ {
c.convertInstrumentationLibraryMetrics(instrumentationLibraryMetrics.At(i), metadata, out)
c.convertInstrumentationLibraryMetrics(instrumentationLibraryMetrics.At(i), metadata, timeDelta, out)
}
}

func (c *Consumer) convertInstrumentationLibraryMetrics(in pdata.InstrumentationLibraryMetrics, metadata model.Metadata, out *model.Batch) {
func (c *Consumer) convertInstrumentationLibraryMetrics(
in pdata.InstrumentationLibraryMetrics,
metadata model.Metadata,
timeDelta time.Duration,
out *model.Batch,
) {
var ms metricsets
otelMetrics := in.Metrics()
var unsupported int64
Expand All @@ -84,6 +95,7 @@ func (c *Consumer) convertInstrumentationLibraryMetrics(in pdata.Instrumentation
}
for _, m := range ms {
m.Metadata = metadata
m.Timestamp = m.Timestamp.Add(timeDelta)
out.Metricsets = append(out.Metricsets, m.Metricset)
}
if unsupported > 0 {
Expand Down
40 changes: 40 additions & 0 deletions processor/otel/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,46 @@ func TestConsumeMetrics_JVM(t *testing.T) {
}}, metricsets)
}

func TestConsumeMetricsExportTimestamp(t *testing.T) {
resourceMetrics := pdata.NewResourceMetrics()
metrics := pdata.NewMetrics()
metrics.ResourceMetrics().Append(resourceMetrics)

// The actual timestamps will be non-deterministic, as they are adjusted
// based on the server's clock.
//
// Use a large delta so that we can allow for a significant amount of
// delay in the test environment affecting the timestamp adjustment.
const timeDelta = time.Hour
const allowedError = 5 // seconds

now := time.Now()
exportTimestamp := now.Add(-timeDelta)
resourceMetrics.Resource().Attributes().InitFromMap(map[string]pdata.AttributeValue{
"telemetry.sdk.elastic_export_timestamp": pdata.NewAttributeValueInt(exportTimestamp.UnixNano()),
})

// Timestamp relative to the export timestamp.
dataPointOffset := -time.Second
exportedDataPointTimestamp := exportTimestamp.Add(dataPointOffset)

instrumentationLibraryMetrics := pdata.NewInstrumentationLibraryMetrics()
resourceMetrics.InstrumentationLibraryMetrics().Append(instrumentationLibraryMetrics)

metric := pdata.NewMetric()
metric.SetName("int_gauge")
metric.SetDataType(pdata.MetricDataTypeIntGauge)
intGauge := metric.IntGauge()
intGauge.DataPoints().Resize(1)
intGauge.DataPoints().At(0).SetTimestamp(pdata.TimestampFromTime(exportedDataPointTimestamp))
intGauge.DataPoints().At(0).SetValue(1)
instrumentationLibraryMetrics.Metrics().Append(metric)

metricsets, _ := transformMetrics(t, metrics)
require.Len(t, metricsets, 1)
assert.InDelta(t, now.Add(dataPointOffset).Unix(), metricsets[0].Timestamp.Unix(), allowedError)
}

func transformMetrics(t *testing.T, metrics pdata.Metrics) ([]*model.Metricset, otel.ConsumerStats) {
var batches []*model.Batch
recorder := batchRecorderBatchProcessor(&batches)
Expand Down
36 changes: 36 additions & 0 deletions processor/otel/timestamps.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package otel

import (
"time"

"go.opentelemetry.io/collector/consumer/pdata"
)

// exportTimestamp extracts the `telemetry.sdk.elastic_export_timestamp`
// resource attribute as a timestamp, and returns a boolean indicating
// whether the attribute was found.
func exportTimestamp(resource pdata.Resource) (time.Time, bool) {
attr, ok := resource.Attributes().Get("telemetry.sdk.elastic_export_timestamp")
if !ok {
return time.Time{}, false
}
nsec := attr.IntVal()
return time.Unix(0, nsec), nsec > 0
}

0 comments on commit 1a93a25

Please sign in to comment.