Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change instrumentation to use otel-go for trace #1833

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@ import (
"sync"
"time"

"go.opencensus.io/trace"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/trace"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/consumererror"
)

var (
okStatus = trace.Status{Code: trace.StatusCodeOK}
)

// Settings for timeout. The timeout applies to individual attempts to send data to the backend.
type TimeoutSettings struct {
// Timeout is the timeout for every attempt to send data to the backend.
Expand Down Expand Up @@ -88,6 +85,7 @@ type internalOptions struct {
RetrySettings
Start
Shutdown
traceProvider trace.Provider
}

// fromConfiguredOptions returns the internal options starting from the default and applying all configured options.
Expand All @@ -101,6 +99,7 @@ func fromConfiguredOptions(options ...ExporterOption) *internalOptions {
RetrySettings: RetrySettings{Enabled: false},
Start: func(ctx context.Context, host component.Host) error { return nil },
Shutdown: func(ctx context.Context) error { return nil },
traceProvider: global.TraceProvider(),
}

for _, op := range options {
Expand Down Expand Up @@ -153,15 +152,22 @@ func WithQueue(queueSettings QueueSettings) ExporterOption {
}
}

func withOtelProviders(traceProvider trace.Provider) ExporterOption {
return func(o *internalOptions) {
o.traceProvider = traceProvider
}
}

// baseExporter contains common fields between different exporter types.
type baseExporter struct {
cfg configmodels.Exporter
sender requestSender
qrSender *queuedRetrySender
start Start
shutdown Shutdown
startOnce sync.Once
shutdown Shutdown
shutdownOnce sync.Once
tracer trace.Tracer
}

func newBaseExporter(cfg configmodels.Exporter, options ...ExporterOption) *baseExporter {
Expand All @@ -170,6 +176,7 @@ func newBaseExporter(cfg configmodels.Exporter, options ...ExporterOption) *base
cfg: cfg,
start: opts.Start,
shutdown: opts.Shutdown,
tracer: opts.traceProvider.Tracer("go.opentelemetry.io/collector/exporter"),
}

be.qrSender = newQueuedRetrySender(opts.QueueSettings, opts.RetrySettings, &timeoutSender{cfg: opts.TimeoutSettings})
Expand Down
13 changes: 0 additions & 13 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"testing"

"github.com/stretchr/testify/require"
"go.opencensus.io/trace"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
Expand All @@ -31,11 +30,6 @@ var defaultExporterCfg = &configmodels.ExporterSettings{
NameVal: "test",
}

func TestErrorToStatus(t *testing.T) {
require.Equal(t, okStatus, errToStatus(nil))
require.Equal(t, trace.Status{Code: trace.StatusCodeUnknown, Message: "my_error"}, errToStatus(errors.New("my_error")))
}

func TestBaseExporter(t *testing.T) {
be := newBaseExporter(defaultExporterCfg)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -50,10 +44,3 @@ func TestBaseExporterWithOptions(t *testing.T) {
require.Error(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.Error(t, be.Shutdown(context.Background()))
}

func errToStatus(err error) trace.Status {
if err != nil {
return trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}
}
return okStatus
}
6 changes: 5 additions & 1 deletion exporter/exporterhelper/logshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package exporterhelper
import (
"context"

"go.opentelemetry.io/otel/api/trace"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/consumererror"
Expand Down Expand Up @@ -81,6 +83,7 @@ func NewLogsExporter(cfg configmodels.Exporter, pushLogsData PushLogsData, optio
return &logsExporterWithObservability{
exporterName: cfg.Name(),
nextSender: nextSender,
tracer: be.tracer,
}
})

Expand All @@ -93,10 +96,11 @@ func NewLogsExporter(cfg configmodels.Exporter, pushLogsData PushLogsData, optio
type logsExporterWithObservability struct {
exporterName string
nextSender requestSender
tracer trace.Tracer
}

func (lewo *logsExporterWithObservability) send(req request) (int, error) {
req.setContext(obsreport.StartLogsExportOp(req.context(), lewo.exporterName))
req.setContext(obsreport.StartLogsExportOp(req.context(), lewo.exporterName, obsreport.WithExportTracer(lewo.tracer)))
numDroppedLogs, err := lewo.nextSender.send(req)
obsreport.EndLogsExportOp(req.context(), req.count(), numDroppedLogs, err)
return numDroppedLogs, err
Expand Down
115 changes: 31 additions & 84 deletions exporter/exporterhelper/logshelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opencensus.io/trace"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/data/testdata"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/obsreport/obsreporttest"
)

Expand Down Expand Up @@ -82,51 +79,16 @@ func TestLogsExporter_Default_ReturnError(t *testing.T) {
require.Equal(t, want, me.ConsumeLogs(context.Background(), ld))
}

func TestLogsExporter_WithRecordLogs(t *testing.T) {
me, err := NewLogsExporter(fakeLogsExporterConfig, newPushLogsData(0, nil))
require.Nil(t, err)
require.NotNil(t, me)

checkRecordedMetricsForLogsExporter(t, me, nil, 0)
}

func TestLogsExporter_WithRecordLogs_NonZeroDropped(t *testing.T) {
me, err := NewLogsExporter(fakeLogsExporterConfig, newPushLogsData(1, nil))
require.Nil(t, err)
require.NotNil(t, me)

checkRecordedMetricsForLogsExporter(t, me, nil, 1)
}

func TestLogsExporter_WithRecordLogs_ReturnError(t *testing.T) {
want := errors.New("my_error")
me, err := NewLogsExporter(fakeLogsExporterConfig, newPushLogsData(0, want))
require.Nil(t, err)
require.NotNil(t, me)

checkRecordedMetricsForLogsExporter(t, me, want, 0)
func TestLogsExporter_Observability(t *testing.T) {
checkObservabilityForLogsExporter(t, nil, 1, 0)
}

func TestLogsExporter_WithSpan(t *testing.T) {
me, err := NewLogsExporter(fakeLogsExporterConfig, newPushLogsData(0, nil))
require.Nil(t, err)
require.NotNil(t, me)
checkWrapSpanForLogsExporter(t, me, nil, 1)
func TestLogsExporter_Observability_NonZeroDropped(t *testing.T) {
checkObservabilityForLogsExporter(t, nil, 1, 1)
}

func TestLogsExporter_WithSpan_NonZeroDropped(t *testing.T) {
me, err := NewLogsExporter(fakeLogsExporterConfig, newPushLogsData(1, nil))
require.Nil(t, err)
require.NotNil(t, me)
checkWrapSpanForLogsExporter(t, me, nil, 1)
}

func TestLogsExporter_WithSpan_ReturnError(t *testing.T) {
want := errors.New("my_error")
me, err := NewLogsExporter(fakeLogsExporterConfig, newPushLogsData(0, want))
require.Nil(t, err)
require.NotNil(t, me)
checkWrapSpanForLogsExporter(t, me, want, 1)
func TestLogsExporter_Observability_ReturnError(t *testing.T) {
checkObservabilityForLogsExporter(t, errors.New("my_error"), 1, 0)
}

func TestLogsExporter_WithShutdown(t *testing.T) {
Expand All @@ -152,70 +114,55 @@ func TestLogsExporter_WithShutdown_ReturnError(t *testing.T) {
assert.Equal(t, me.Shutdown(context.Background()), want)
}

func newPushLogsData(droppedTimeSeries int, retError error) PushLogsData {
func newPushLogsData(droppedLogRecords int, retError error) PushLogsData {
return func(ctx context.Context, td pdata.Logs) (int, error) {
return droppedTimeSeries, retError
return droppedLogRecords, retError
}
}

func checkRecordedMetricsForLogsExporter(t *testing.T, me component.LogsExporter, wantError error, droppedLogRecords int) {
func checkObservabilityForLogsExporter(t *testing.T, wantError error, numLogRecords, droppedLogRecords int) {
doneFn, err := obsreporttest.SetupRecordedMetricsTest()
require.NoError(t, err)
defer doneFn()

ld := testdata.GenerateLogDataTwoLogsSameResource()
const numBatches = 7
for i := 0; i < numBatches; i++ {
require.Equal(t, wantError, me.ConsumeLogs(context.Background(), ld))
}
traceProvider, ime := obsreporttest.SetupSdkTraceProviderTest(t)
tracer := traceProvider.Tracer("go.opentelemetry.io/collector/exporter/logshelper")

// TODO: When the new metrics correctly count partial dropped fix this.
if wantError != nil {
obsreporttest.CheckExporterLogsViews(t, fakeLogsExporterName, 0, int64(numBatches*ld.LogRecordCount()))
} else {
obsreporttest.CheckExporterLogsViews(t, fakeLogsExporterName, int64(numBatches*ld.LogRecordCount()), 0)
}
}
me, err := NewLogsExporter(fakeLogsExporterConfig, newPushLogsData(droppedLogRecords, wantError), withOtelProviders(traceProvider))
require.Nil(t, err)
require.NotNil(t, me)

func generateLogsTraffic(t *testing.T, me component.LogsExporter, numRequests int, wantError error) {
const numRequests = 5
ld := testdata.GenerateLogDataOneLog()
ctx, span := trace.StartSpan(context.Background(), fakeLogsParentSpanName, trace.WithSampler(trace.AlwaysSample()))
defer span.End()
ctx, span := tracer.Start(context.Background(), fakeLogsParentSpanName)
for i := 0; i < numRequests; i++ {
require.Equal(t, wantError, me.ConsumeLogs(ctx, ld))
assert.Equal(t, wantError, me.ConsumeLogs(ctx, ld))
}
}

func checkWrapSpanForLogsExporter(t *testing.T, me component.LogsExporter, wantError error, numLogRecords int64) {
ocSpansSaver := new(testOCTraceExporter)
trace.RegisterExporter(ocSpansSaver)
defer trace.UnregisterExporter(ocSpansSaver)

const numRequests = 5
generateLogsTraffic(t, me, numRequests, wantError)
span.End()

// Inspection time!
ocSpansSaver.mu.Lock()
defer ocSpansSaver.mu.Unlock()

require.NotEqual(t, 0, len(ocSpansSaver.spanData), "No exported span data")

gotSpanData := ocSpansSaver.spanData
require.Equal(t, numRequests+1, len(gotSpanData))
gotSpanData := ime.GetSpans()
require.Len(t, gotSpanData, numRequests+1, "No exported span data")

parentSpan := gotSpanData[numRequests]
require.Equalf(t, fakeLogsParentSpanName, parentSpan.Name, "SpanData %v", parentSpan)
for _, sd := range gotSpanData[:numRequests] {
require.Equalf(t, parentSpan.SpanContext.SpanID, sd.ParentSpanID, "Exporter span not a child\nSpanData %v", sd)
require.Equalf(t, errToStatus(wantError), sd.Status, "SpanData %v", sd)
assert.Equalf(t, parentSpan.SpanContext.SpanID, sd.ParentSpanID, "Exporter span not a child\nSpanData %v", sd)
obsreporttest.CheckSpanStatus(t, wantError, sd)

sentLogRecords := numLogRecords
var failedToSendLogRecords int64
failedToSendLogRecords := 0
if wantError != nil {
sentLogRecords = 0
failedToSendLogRecords = numLogRecords
}
require.Equalf(t, sentLogRecords, sd.Attributes[obsreport.SentLogRecordsKey], "SpanData %v", sd)
require.Equalf(t, failedToSendLogRecords, sd.Attributes[obsreport.FailedToSendLogRecordsKey], "SpanData %v", sd)
obsreporttest.CheckExporterLogsSpanAttributes(t, sd, int64(sentLogRecords), int64(failedToSendLogRecords))
}

// TODO: When the new metrics correctly count partial dropped fix this.
if wantError != nil {
obsreporttest.CheckExporterLogsViews(t, fakeLogsExporterName, 0, int64(numRequests*ld.LogRecordCount()))
} else {
obsreporttest.CheckExporterLogsViews(t, fakeLogsExporterName, int64(numRequests*ld.LogRecordCount()), 0)
}
}
6 changes: 5 additions & 1 deletion exporter/exporterhelper/metricshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package exporterhelper
import (
"context"

"go.opentelemetry.io/otel/api/trace"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/consumerdata"
Expand Down Expand Up @@ -93,6 +95,7 @@ func NewMetricsExporter(cfg configmodels.Exporter, pushMetricsData PushMetricsDa
return &metricsSenderWithObservability{
exporterName: cfg.Name(),
nextSender: nextSender,
tracer: be.tracer,
}
})

Expand All @@ -105,10 +108,11 @@ func NewMetricsExporter(cfg configmodels.Exporter, pushMetricsData PushMetricsDa
type metricsSenderWithObservability struct {
exporterName string
nextSender requestSender
tracer trace.Tracer
}

func (mewo *metricsSenderWithObservability) send(req request) (int, error) {
req.setContext(obsreport.StartMetricsExportOp(req.context(), mewo.exporterName))
req.setContext(obsreport.StartMetricsExportOp(req.context(), mewo.exporterName, obsreport.WithExportTracer(mewo.tracer)))
numDroppedMetrics, err := mewo.nextSender.send(req)

// TODO: this is not ideal: it should come from the next function itself.
Expand Down
Loading