From a29e6726f9b8af593e29e5341f4ac5551f6b7f79 Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Wed, 16 Aug 2023 16:39:00 -0700 Subject: [PATCH] [exporterhelper] Add collector observability to the new exporter helper This change adds collector's internal metrics and tracing to the new request-based exporter helpers. Only those metrics and traces are added that already adopted by the existing exporter helpers for backward compatibility. The new exporter helpers can and should expose more metrics, e.g. for tracking converter errors. --- exporter/exporterhelper/logs.go | 17 ++++++-- exporter/exporterhelper/logs_test.go | 53 ++++++++++++++++++++++++ exporter/exporterhelper/metrics.go | 17 ++++++-- exporter/exporterhelper/metrics_test.go | 52 ++++++++++++++++++++++++ exporter/exporterhelper/traces.go | 17 ++++++-- exporter/exporterhelper/traces_test.go | 54 +++++++++++++++++++++++++ 6 files changed, 198 insertions(+), 12 deletions(-) diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index 0fdf1fc38587..932485353feb 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -144,8 +144,12 @@ func NewLogsRequestExporter( if err != nil { return nil, err } - - // TODO: Add new observability tracing/metrics to the new exporterhelper. + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &logsExporterWithObservability{ + obsrep: be.obsrep, + nextSender: nextSender, + } + }) lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { req, cErr := converter.RequestFromLogs(ctx, ld) @@ -155,10 +159,15 @@ func NewLogsRequestExporter( zap.Error(err)) return consumererror.NewPermanent(cErr) } - return be.sender.send(&request{ + r := &request{ baseRequest: baseRequest{ctx: ctx}, Request: req, - }) + } + sErr := be.sender.send(r) + if errors.Is(sErr, errSendingQueueIsFull) { + be.obsrep.recordLogsEnqueueFailure(r.Context(), int64(r.Count())) + } + return sErr }, bs.consumerOptions...) return &logsExporter{ diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index 14e35679ff4b..339ba5bb4345 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -160,6 +160,18 @@ func TestLogsExporter_WithRecordMetrics(t *testing.T) { checkRecordedMetricsForLogsExporter(t, tt, le, nil) } +func TestLogsRequestExporter_WithRecordMetrics(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + le, err := NewLogsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}) + require.NoError(t, err) + require.NotNil(t, le) + + checkRecordedMetricsForLogsExporter(t, tt, le, nil) +} + func TestLogsExporter_WithRecordMetrics_ReturnError(t *testing.T) { want := errors.New("my_error") tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName) @@ -173,6 +185,20 @@ func TestLogsExporter_WithRecordMetrics_ReturnError(t *testing.T) { checkRecordedMetricsForLogsExporter(t, tt, le, want) } +func TestLogsRequestExporter_WithRecordMetrics_ExportError(t *testing.T) { + want := errors.New("export_error") + tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + le, err := NewLogsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), + &fakeRequestConverter{requestError: want}) + require.Nil(t, err) + require.NotNil(t, le) + + checkRecordedMetricsForLogsExporter(t, tt, le, want) +} + func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName) require.NoError(t, err) @@ -211,6 +237,19 @@ func TestLogsExporter_WithSpan(t *testing.T) { checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, nil, 1) } +func TestLogsRequestExporter_WithSpan(t *testing.T) { + set := exportertest.NewNopCreateSettings() + sr := new(tracetest.SpanRecorder) + set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(set.TracerProvider) + defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) + + le, err := NewLogsRequestExporter(context.Background(), set, &fakeRequestConverter{}) + require.Nil(t, err) + require.NotNil(t, le) + checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, nil, 1) +} + func TestLogsExporter_WithSpan_ReturnError(t *testing.T) { set := exportertest.NewNopCreateSettings() sr := new(tracetest.SpanRecorder) @@ -225,6 +264,20 @@ func TestLogsExporter_WithSpan_ReturnError(t *testing.T) { checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, want, 1) } +func TestLogsRequestExporter_WithSpan_ReturnError(t *testing.T) { + set := exportertest.NewNopCreateSettings() + sr := new(tracetest.SpanRecorder) + set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(set.TracerProvider) + defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) + + want := errors.New("my_error") + le, err := NewLogsRequestExporter(context.Background(), set, &fakeRequestConverter{requestError: want}) + require.Nil(t, err) + require.NotNil(t, le) + checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, want, 1) +} + func TestLogsExporter_WithShutdown(t *testing.T) { shutdownCalled := false shutdown := func(context.Context) error { shutdownCalled = true; return nil } diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index e07c4c83bc4a..11bc4d4333ca 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -144,8 +144,12 @@ func NewMetricsRequestExporter( if err != nil { return nil, err } - - // TODO: Add new observability tracing/metrics to the new exporterhelper. + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &metricsSenderWithObservability{ + obsrep: be.obsrep, + nextSender: nextSender, + } + }) mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { req, cErr := converter.RequestFromMetrics(ctx, md) @@ -155,10 +159,15 @@ func NewMetricsRequestExporter( zap.Error(err)) return consumererror.NewPermanent(cErr) } - return be.sender.send(&request{ + r := &request{ Request: req, baseRequest: baseRequest{ctx: ctx}, - }) + } + sErr := be.sender.send(r) + if errors.Is(sErr, errSendingQueueIsFull) { + be.obsrep.recordMetricsEnqueueFailure(r.Context(), int64(r.Count())) + } + return sErr }, bs.consumerOptions...) return &metricsExporter{ diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index c8ea75872198..af1ca5a07adc 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -161,6 +161,18 @@ func TestMetricsExporter_WithRecordMetrics(t *testing.T) { checkRecordedMetricsForMetricsExporter(t, tt, me, nil) } +func TestMetricsRequestExporter_WithRecordMetrics(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + me, err := NewMetricsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), fakeRequestConverter{}) + require.NoError(t, err) + require.NotNil(t, me) + + checkRecordedMetricsForMetricsExporter(t, tt, me, nil) +} + func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) { want := errors.New("my_error") tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName) @@ -174,6 +186,19 @@ func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) { checkRecordedMetricsForMetricsExporter(t, tt, me, want) } +func TestMetricsRequestExporter_WithRecordMetrics_ExportError(t *testing.T) { + want := errors.New("my_error") + tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + me, err := NewMetricsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), fakeRequestConverter{requestError: want}) + require.NoError(t, err) + require.NotNil(t, me) + + checkRecordedMetricsForMetricsExporter(t, tt, me, want) +} + func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName) require.NoError(t, err) @@ -212,6 +237,19 @@ func TestMetricsExporter_WithSpan(t *testing.T) { checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, nil, 2) } +func TestMetricsRequestExporter_WithSpan(t *testing.T) { + set := exportertest.NewNopCreateSettings() + sr := new(tracetest.SpanRecorder) + set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(set.TracerProvider) + defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) + + me, err := NewMetricsRequestExporter(context.Background(), set, fakeRequestConverter{}) + require.NoError(t, err) + require.NotNil(t, me) + checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, nil, 2) +} + func TestMetricsExporter_WithSpan_ReturnError(t *testing.T) { set := exportertest.NewNopCreateSettings() sr := new(tracetest.SpanRecorder) @@ -226,6 +264,20 @@ func TestMetricsExporter_WithSpan_ReturnError(t *testing.T) { checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, want, 2) } +func TestMetricsRequestExporter_WithSpan_ExportError(t *testing.T) { + set := exportertest.NewNopCreateSettings() + sr := new(tracetest.SpanRecorder) + set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(set.TracerProvider) + defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) + + want := errors.New("my_error") + me, err := NewMetricsRequestExporter(context.Background(), set, fakeRequestConverter{requestError: want}) + require.NoError(t, err) + require.NotNil(t, me) + checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, want, 2) +} + func TestMetricsExporter_WithShutdown(t *testing.T) { shutdownCalled := false shutdown := func(context.Context) error { shutdownCalled = true; return nil } diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 886c0a2f197a..576e14c3787e 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -144,8 +144,12 @@ func NewTracesRequestExporter( if err != nil { return nil, err } - - // TODO: Add new observability tracing/metrics to the new exporterhelper. + be.wrapConsumerSender(func(nextSender requestSender) requestSender { + return &tracesExporterWithObservability{ + obsrep: be.obsrep, + nextSender: nextSender, + } + }) tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { req, cErr := converter.RequestFromTraces(ctx, td) @@ -155,10 +159,15 @@ func NewTracesRequestExporter( zap.Error(err)) return consumererror.NewPermanent(cErr) } - return be.sender.send(&request{ + r := &request{ baseRequest: baseRequest{ctx: ctx}, Request: req, - }) + } + sErr := be.sender.send(r) + if errors.Is(sErr, errSendingQueueIsFull) { + be.obsrep.recordTracesEnqueueFailure(r.Context(), int64(r.Count())) + } + return sErr }, bs.consumerOptions...) return &traceExporter{ diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 0b314426f102..b348cea7d4b2 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -158,6 +158,18 @@ func TestTracesExporter_WithRecordMetrics(t *testing.T) { checkRecordedMetricsForTracesExporter(t, tt, te, nil) } +func TestTracesRequestExporter_WithRecordMetrics(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + te, err := NewTracesRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{}) + require.NoError(t, err) + require.NotNil(t, te) + + checkRecordedMetricsForTracesExporter(t, tt, te, nil) +} + func TestTracesExporter_WithRecordMetrics_ReturnError(t *testing.T) { want := errors.New("my_error") tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName) @@ -171,6 +183,19 @@ func TestTracesExporter_WithRecordMetrics_ReturnError(t *testing.T) { checkRecordedMetricsForTracesExporter(t, tt, te, want) } +func TestTracesRequestExporter_WithRecordMetrics_RequestSenderError(t *testing.T) { + want := errors.New("export_error") + tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + te, err := NewTracesRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{requestError: want}) + require.NoError(t, err) + require.NotNil(t, te) + + checkRecordedMetricsForTracesExporter(t, tt, te, want) +} + func TestTracesExporter_WithRecordEnqueueFailedMetrics(t *testing.T) { tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName) require.NoError(t, err) @@ -210,6 +235,20 @@ func TestTracesExporter_WithSpan(t *testing.T) { checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, nil, 1) } +func TestTracesRequestExporter_WithSpan(t *testing.T) { + set := exportertest.NewNopCreateSettings() + sr := new(tracetest.SpanRecorder) + set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(set.TracerProvider) + defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) + + te, err := NewTracesRequestExporter(context.Background(), set, &fakeRequestConverter{}) + require.NoError(t, err) + require.NotNil(t, te) + + checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, nil, 1) +} + func TestTracesExporter_WithSpan_ReturnError(t *testing.T) { set := exportertest.NewNopCreateSettings() sr := new(tracetest.SpanRecorder) @@ -225,6 +264,21 @@ func TestTracesExporter_WithSpan_ReturnError(t *testing.T) { checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, want, 1) } +func TestTracesRequestExporter_WithSpan_ExportError(t *testing.T) { + set := exportertest.NewNopCreateSettings() + sr := new(tracetest.SpanRecorder) + set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(set.TracerProvider) + defer otel.SetTracerProvider(trace.NewNoopTracerProvider()) + + want := errors.New("export_error") + te, err := NewTracesRequestExporter(context.Background(), set, &fakeRequestConverter{requestError: want}) + require.NoError(t, err) + require.NotNil(t, te) + + checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, want, 1) +} + func TestTracesExporter_WithShutdown(t *testing.T) { shutdownCalled := false shutdown := func(context.Context) error { shutdownCalled = true; return nil }