From 37dfca7d330fe0cab2cc82f9a3ae9a7d39b39f9a Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Wed, 2 Nov 2022 02:39:06 -0400 Subject: [PATCH 1/6] initial processor instrumentation --- obsreport/obsreport_processor.go | 237 ++++++++++++++++++++++++++++--- obsreport/obsreport_test.go | 88 +++++------- 2 files changed, 258 insertions(+), 67 deletions(-) diff --git a/obsreport/obsreport_processor.go b/obsreport/obsreport_processor.go index 27b315f00d8..3e7d1313858 100644 --- a/obsreport/obsreport_processor.go +++ b/obsreport/obsreport_processor.go @@ -20,12 +20,24 @@ import ( "go.opencensus.io/stats" "go.opencensus.io/tag" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/instrument/syncint64" + "go.opentelemetry.io/otel/metric/unit" + "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/featuregate" + "go.opentelemetry.io/collector/internal/obsreportconfig" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) +var ( + processorName = "processor" + processorScope = scopeName + nameSep + processorName +) + // BuildProcessorCustomMetricName is used to be build a metric name following // the standards used in the Collector. The configType should be the same // value used to identify the type on the config. @@ -44,6 +56,22 @@ func BuildProcessorCustomMetricName(configType, metric string) string { type Processor struct { level configtelemetry.Level mutators []tag.Mutator + + logger *zap.Logger + + useOtelForMetrics bool + otelAttrs []attribute.KeyValue + + acceptedSpansCounter syncint64.Counter + refusedSpansCounter syncint64.Counter + droppedSpansCounter syncint64.Counter + acceptedMetricPointsCounter syncint64.Counter + refusedMetricPointsCounter syncint64.Counter + droppedMetricPointsCounter syncint64.Counter + acceptedLogRecordsCounter syncint64.Counter + refusedLogRecordsCounter syncint64.Counter + droppedLogRecordsCounter syncint64.Counter + } // ProcessorSettings are settings for creating a Processor. @@ -53,26 +81,149 @@ type ProcessorSettings struct { } // NewProcessor creates a new Processor. -func NewProcessor(cfg ProcessorSettings) (*Processor, error) { - return &Processor{ +func NewProcessor(cfg ProcessorSettings) *Processor { + return newProcessor(cfg, featuregate.GetRegistry()) +} + +func newProcessor(cfg ProcessorSettings, registry *featuregate.Registry) *Processor { + proc := &Processor{ level: cfg.ProcessorCreateSettings.MetricsLevel, mutators: []tag.Mutator{tag.Upsert(obsmetrics.TagKeyProcessor, cfg.ProcessorID.String(), tag.WithTTL(tag.TTLNoPropagation))}, - }, nil -} + logger: cfg.ProcessorCreateSettings.Logger, + useOtelForMetrics: registry.IsEnabled(obsreportconfig.UseOtelForInternalMetricsfeatureGateID), + otelAttrs: []attribute.KeyValue{ + attribute.String(obsmetrics.ProcessorKey, cfg.ProcessorID.String()), + }, -// Deprecated: [v0.65.0] use NewProcessor. -func MustNewProcessor(cfg ProcessorSettings) *Processor { - proc, err := NewProcessor(cfg) - if err != nil { - panic(err) } + proc.createOtelMetrics(cfg) + return proc + +} + +func (proc *Processor) createOtelMetrics(cfg ProcessorSettings) { + if !proc.useOtelForMetrics { + return + } + + meter := cfg.ProcessorCreateSettings.MeterProvider.Meter(processorScope) + + var err error + handleError := func(metricName string, err error) { + if err != nil { + proc.logger.Warn("failed to create otel instrument", zap.Error(err), zap.String("metric", metricName)) + } + } + + proc.acceptedSpansCounter, err = meter.SyncInt64().Counter( + obsmetrics.ProcessorPrefix+obsmetrics.AcceptedSpansKey, + instrument.WithDescription("Number of spans successfully pushed into the next component in the pipeline."), + instrument.WithUnit(unit.Dimensionless), + ) + handleError(obsmetrics.ProcessorPrefix+obsmetrics.AcceptedSpansKey, err) + + proc.refusedSpansCounter, err = meter.SyncInt64().Counter( + obsmetrics.ProcessorPrefix+obsmetrics.RefusedSpansKey, + instrument.WithDescription("Number of spans that were rejected by the next component in the pipeline."), + instrument.WithUnit(unit.Dimensionless), + ) + handleError(obsmetrics.ProcessorPrefix+obsmetrics.RefusedSpansKey, err) + + proc.droppedSpansCounter, err = meter.SyncInt64().Counter( + obsmetrics.ProcessorPrefix+obsmetrics.DroppedSpansKey, + instrument.WithDescription("Number of spans that were dropped."), + instrument.WithUnit(unit.Dimensionless), + ) + handleError(obsmetrics.ProcessorPrefix+obsmetrics.DroppedSpansKey, err) + + proc.acceptedMetricPointsCounter, err = meter.SyncInt64().Counter( + obsmetrics.ReceiverPrefix+obsmetrics.AcceptedMetricPointsKey, + instrument.WithDescription("Number of metric points successfully pushed into the next component in the pipeline."), + instrument.WithUnit(unit.Dimensionless), + ) + handleError(obsmetrics.ReceiverPrefix+obsmetrics.AcceptedMetricPointsKey, err) + + proc.refusedMetricPointsCounter, err = meter.SyncInt64().Counter( + obsmetrics.ProcessorPrefix+obsmetrics.RefusedMetricPointsKey, + instrument.WithDescription("Number of metric points that were rejected by the next component in the pipeline."), + instrument.WithUnit(unit.Dimensionless), + ) + handleError(obsmetrics.ProcessorPrefix+obsmetrics.RefusedMetricPointsKey, err) + + proc.droppedMetricPointsCounter, err = meter.SyncInt64().Counter( + obsmetrics.ProcessorPrefix+obsmetrics.DroppedMetricPointsKey, + instrument.WithDescription("Number of metric points that were dropped."), + instrument.WithUnit(unit.Dimensionless), + ) + handleError(obsmetrics.ProcessorPrefix+obsmetrics.DroppedMetricPointsKey, err) + + proc.acceptedLogRecordsCounter, err = meter.SyncInt64().Counter( + obsmetrics.ProcessorPrefix+obsmetrics.AcceptedLogRecordsKey, + instrument.WithDescription("Number of log records successfully pushed into the next component in the pipeline."), + instrument.WithUnit(unit.Dimensionless), + ) + handleError(obsmetrics.ProcessorPrefix+obsmetrics.AcceptedLogRecordsKey, err) + + proc.refusedLogRecordsCounter, err = meter.SyncInt64().Counter( + obsmetrics.ProcessorPrefix+obsmetrics.RefusedLogRecordsKey, + instrument.WithDescription("Number of log records that were rejected by the next component in the pipeline."), + instrument.WithUnit(unit.Dimensionless), + ) + handleError(obsmetrics.ProcessorPrefix+obsmetrics.RefusedLogRecordsKey, err) + + proc.droppedLogRecordsCounter, err = meter.SyncInt64().Counter( + obsmetrics.ProcessorPrefix+obsmetrics.DroppedLogRecordsKey, + instrument.WithDescription("Number of log records that were dropped."), + instrument.WithUnit(unit.Dimensionless), + ) + handleError(obsmetrics.ProcessorPrefix+obsmetrics.DroppedLogRecordsKey, err) } +// func (por *Processor) recordMetrics(ctx context.Context, dataType config.DataType, accepted, refused, dropped int64) { +// if por.useOtelForMetrics { +// por.recordWithOtel(ctx, dataType, accepted, refused, dropped) +// } else { +// por.recordWithOC(ctx, dataType, accepted, refused, dropped) +// } +// } + +// func (por *Processor) recordWithOtel(ctx context.Context, dataType config.DataType, accepted, refused, dropped int64) { +// var acceptedCounter, refusedCounter, droppedCounter syncint64.Counter + +// switch dataType { +// case config.TracesDataType: +// acceptedCounter = por.acceptedSpansCounter +// refusedCounter = por.refusedSpansCounter +// droppedCounter = por.droppedSpansCounter +// case config.MetricsDataType: +// acceptedCounter = por.acceptedMetricPointsCounter +// refusedCounter = por.refusedMetricPointsCounter +// droppedCounter = por.droppedMetricPointsCounter +// case config.LogsDataType: +// acceptedCounter = por.acceptedLogRecordsCounter +// refusedCounter = por.refusedLogRecordsCounter +// droppedCounter = por.droppedLogRecordsCounter +// } + +// acceptedCounter.Add(ctx, int64(accepted), por.otelAttrs...) +// refusedCounter.Add(ctx, int64(refused), por.otelAttrs...) +// droppedCounter.Add(ctx, int64(dropped), por.otelAttrs...) +// } + +// func (por *Processor) recordWithOC(ctx context.Context, dataType, config.DataType) + + // TracesAccepted reports that the trace data was accepted. func (por *Processor) TracesAccepted(ctx context.Context, numSpans int) { - if por.level != configtelemetry.LevelNone { + if por.level == configtelemetry.LevelNone { + return + } + + if por.useOtelForMetrics { + por.acceptedSpansCounter.Add(ctx, int64(numSpans), por.otelAttrs...) + } else { // ignore the error for now; should not happen _ = stats.RecordWithTags( ctx, @@ -86,7 +237,13 @@ func (por *Processor) TracesAccepted(ctx context.Context, numSpans int) { // TracesRefused reports that the trace data was refused. func (por *Processor) TracesRefused(ctx context.Context, numSpans int) { - if por.level != configtelemetry.LevelNone { + if por.level == configtelemetry.LevelNone { + return + } + + if por.useOtelForMetrics { + por.refusedSpansCounter.Add(ctx, int64(numSpans), por.otelAttrs...) + } else { // ignore the error for now; should not happen _ = stats.RecordWithTags( ctx, @@ -95,12 +252,19 @@ func (por *Processor) TracesRefused(ctx context.Context, numSpans int) { obsmetrics.ProcessorRefusedSpans.M(int64(numSpans)), obsmetrics.ProcessorDroppedSpans.M(0), ) + } } // TracesDropped reports that the trace data was dropped. func (por *Processor) TracesDropped(ctx context.Context, numSpans int) { - if por.level != configtelemetry.LevelNone { + if por.level == configtelemetry.LevelNone { + return + } + + if por.useOtelForMetrics { + por.droppedSpansCounter.Add(ctx, int64(numSpans), por.otelAttrs...) + } else { // ignore the error for now; should not happen _ = stats.RecordWithTags( ctx, @@ -114,7 +278,13 @@ func (por *Processor) TracesDropped(ctx context.Context, numSpans int) { // MetricsAccepted reports that the metrics were accepted. func (por *Processor) MetricsAccepted(ctx context.Context, numPoints int) { - if por.level != configtelemetry.LevelNone { + if por.level == configtelemetry.LevelNone { + return + } + + if por.useOtelForMetrics { + por.acceptedMetricPointsCounter.Add(ctx, int64(numPoints), ) + } else { // ignore the error for now; should not happen _ = stats.RecordWithTags( ctx, @@ -123,12 +293,19 @@ func (por *Processor) MetricsAccepted(ctx context.Context, numPoints int) { obsmetrics.ProcessorRefusedMetricPoints.M(0), obsmetrics.ProcessorDroppedMetricPoints.M(0), ) + } } // MetricsRefused reports that the metrics were refused. func (por *Processor) MetricsRefused(ctx context.Context, numPoints int) { - if por.level != configtelemetry.LevelNone { + if por.level == configtelemetry.LevelNone { + return + } + + if por.useOtelForMetrics { + por.refusedMetricPointsCounter.Add(ctx, int64(numPoints), por.otelAttrs...) + } else { // ignore the error for now; should not happen _ = stats.RecordWithTags( ctx, @@ -142,7 +319,13 @@ func (por *Processor) MetricsRefused(ctx context.Context, numPoints int) { // MetricsDropped reports that the metrics were dropped. func (por *Processor) MetricsDropped(ctx context.Context, numPoints int) { - if por.level != configtelemetry.LevelNone { + if por.level == configtelemetry.LevelNone { + return + } + + if por.useOtelForMetrics { + por.droppedMetricPointsCounter.Add(ctx, int64(numPoints), por.otelAttrs...) + } else { // ignore the error for now; should not happen _ = stats.RecordWithTags( ctx, @@ -156,7 +339,13 @@ func (por *Processor) MetricsDropped(ctx context.Context, numPoints int) { // LogsAccepted reports that the logs were accepted. func (por *Processor) LogsAccepted(ctx context.Context, numRecords int) { - if por.level != configtelemetry.LevelNone { + if por.level == configtelemetry.LevelNone { + return + } + + if por.useOtelForMetrics { + por.acceptedLogRecordsCounter.Add(ctx, int64(numRecords), por.otelAttrs...) + } else { // ignore the error for now; should not happen _ = stats.RecordWithTags( ctx, @@ -170,7 +359,13 @@ func (por *Processor) LogsAccepted(ctx context.Context, numRecords int) { // LogsRefused reports that the logs were refused. func (por *Processor) LogsRefused(ctx context.Context, numRecords int) { - if por.level != configtelemetry.LevelNone { + if por.level == configtelemetry.LevelNone { + return + } + + if por.useOtelForMetrics { + por.refusedLogRecordsCounter.Add(ctx, int64(numRecords), por.otelAttrs...) + } else { // ignore the error for now; should not happen _ = stats.RecordWithTags( ctx, @@ -184,7 +379,13 @@ func (por *Processor) LogsRefused(ctx context.Context, numRecords int) { // LogsDropped reports that the logs were dropped. func (por *Processor) LogsDropped(ctx context.Context, numRecords int) { - if por.level != configtelemetry.LevelNone { + if por.level == configtelemetry.LevelNone { + return + } + + if por.useOtelForMetrics { + por.droppedLogRecordsCounter.Add(ctx, int64(numRecords), por.otelAttrs...) + } else { // ignore the error for now; should not happen _ = stats.RecordWithTags( ctx, diff --git a/obsreport/obsreport_test.go b/obsreport/obsreport_test.go index 26a2a2fb6c7..fe4797ea8b4 100644 --- a/obsreport/obsreport_test.go +++ b/obsreport/obsreport_test.go @@ -479,45 +479,38 @@ func TestReceiveWithLongLivedCtx(t *testing.T) { } func TestProcessorTraceData(t *testing.T) { - tt, err := obsreporttest.SetupTelemetry() - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - - const acceptedSpans = 27 - const refusedSpans = 19 - const droppedSpans = 13 + testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) { + const acceptedSpans = 27 + const refusedSpans = 19 + const droppedSpans = 13 + obsrep := newProcessor(ProcessorSettings{ + ProcessorID: processor, + ProcessorCreateSettings: tt.ToProcessorCreateSettings(), + }, registry) + obsrep.TracesAccepted(context.Background(), acceptedSpans) + obsrep.TracesRefused(context.Background(), refusedSpans) + obsrep.TracesDropped(context.Background(), droppedSpans) - obsrep, err := NewProcessor(ProcessorSettings{ - ProcessorID: processor, - ProcessorCreateSettings: tt.ToProcessorCreateSettings(), + require.NoError(t, obsreporttest.CheckProcessorTraces(tt, processor, acceptedSpans, refusedSpans, droppedSpans)) }) - require.NoError(t, err) - obsrep.TracesAccepted(context.Background(), acceptedSpans) - obsrep.TracesRefused(context.Background(), refusedSpans) - obsrep.TracesDropped(context.Background(), droppedSpans) - - require.NoError(t, obsreporttest.CheckProcessorTraces(tt, processor, acceptedSpans, refusedSpans, droppedSpans)) } func TestProcessorMetricsData(t *testing.T) { - tt, err := obsreporttest.SetupTelemetry() - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - - const acceptedPoints = 29 - const refusedPoints = 11 - const droppedPoints = 17 + testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) { + const acceptedPoints = 29 + const refusedPoints = 11 + const droppedPoints = 17 + + obsrep := newProcessor(ProcessorSettings{ + ProcessorID: processor, + ProcessorCreateSettings: tt.ToProcessorCreateSettings(), + }, registry) + obsrep.MetricsAccepted(context.Background(), acceptedPoints) + obsrep.MetricsRefused(context.Background(), refusedPoints) + obsrep.MetricsDropped(context.Background(), droppedPoints) - obsrep, err := NewProcessor(ProcessorSettings{ - ProcessorID: processor, - ProcessorCreateSettings: tt.ToProcessorCreateSettings(), + require.NoError(t, obsreporttest.CheckProcessorMetrics(tt, processor, acceptedPoints, refusedPoints, droppedPoints)) }) - require.NoError(t, err) - obsrep.MetricsAccepted(context.Background(), acceptedPoints) - obsrep.MetricsRefused(context.Background(), refusedPoints) - obsrep.MetricsDropped(context.Background(), droppedPoints) - - require.NoError(t, obsreporttest.CheckProcessorMetrics(tt, processor, acceptedPoints, refusedPoints, droppedPoints)) } func TestBuildProcessorCustomMetricName(t *testing.T) { @@ -543,22 +536,19 @@ func TestBuildProcessorCustomMetricName(t *testing.T) { } func TestProcessorLogRecords(t *testing.T) { - tt, err := obsreporttest.SetupTelemetry() - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - - const acceptedRecords = 29 - const refusedRecords = 11 - const droppedRecords = 17 + testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) { + const acceptedRecords = 29 + const refusedRecords = 11 + const droppedRecords = 17 + + obsrep := newProcessor(ProcessorSettings{ + ProcessorID: processor, + ProcessorCreateSettings: tt.ToProcessorCreateSettings(), + }, registry) + obsrep.LogsAccepted(context.Background(), acceptedRecords) + obsrep.LogsRefused(context.Background(), refusedRecords) + obsrep.LogsDropped(context.Background(), droppedRecords) - obsrep, err := NewProcessor(ProcessorSettings{ - ProcessorID: processor, - ProcessorCreateSettings: tt.ToProcessorCreateSettings(), + require.NoError(t, obsreporttest.CheckProcessorLogs(tt, processor, acceptedRecords, refusedRecords, droppedRecords)) }) - require.NoError(t, err) - obsrep.LogsAccepted(context.Background(), acceptedRecords) - obsrep.LogsRefused(context.Background(), refusedRecords) - obsrep.LogsDropped(context.Background(), droppedRecords) - - require.NoError(t, obsreporttest.CheckProcessorLogs(tt, processor, acceptedRecords, refusedRecords, droppedRecords)) -} \ No newline at end of file +} From 5cec1fe97e7ca1bb3931eaf22b29312fdb963359 Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Wed, 2 Nov 2022 13:34:10 -0400 Subject: [PATCH 2/6] adding some tests --- obsreport/obsreport_processor.go | 40 ++----------------- obsreport/obsreporttest/obsreporttest.go | 24 +++-------- .../obsreporttest/otelprometheuschecker.go | 32 ++++++++++++++- 3 files changed, 40 insertions(+), 56 deletions(-) diff --git a/obsreport/obsreport_processor.go b/obsreport/obsreport_processor.go index 3e7d1313858..e828d005183 100644 --- a/obsreport/obsreport_processor.go +++ b/obsreport/obsreport_processor.go @@ -139,11 +139,11 @@ func (proc *Processor) createOtelMetrics(cfg ProcessorSettings) { handleError(obsmetrics.ProcessorPrefix+obsmetrics.DroppedSpansKey, err) proc.acceptedMetricPointsCounter, err = meter.SyncInt64().Counter( - obsmetrics.ReceiverPrefix+obsmetrics.AcceptedMetricPointsKey, + obsmetrics.ProcessorPrefix+obsmetrics.AcceptedMetricPointsKey, instrument.WithDescription("Number of metric points successfully pushed into the next component in the pipeline."), instrument.WithUnit(unit.Dimensionless), ) - handleError(obsmetrics.ReceiverPrefix+obsmetrics.AcceptedMetricPointsKey, err) + handleError(obsmetrics.ProcessorPrefix+obsmetrics.AcceptedMetricPointsKey, err) proc.refusedMetricPointsCounter, err = meter.SyncInt64().Counter( obsmetrics.ProcessorPrefix+obsmetrics.RefusedMetricPointsKey, @@ -181,40 +181,6 @@ func (proc *Processor) createOtelMetrics(cfg ProcessorSettings) { handleError(obsmetrics.ProcessorPrefix+obsmetrics.DroppedLogRecordsKey, err) } -// func (por *Processor) recordMetrics(ctx context.Context, dataType config.DataType, accepted, refused, dropped int64) { -// if por.useOtelForMetrics { -// por.recordWithOtel(ctx, dataType, accepted, refused, dropped) -// } else { -// por.recordWithOC(ctx, dataType, accepted, refused, dropped) -// } -// } - -// func (por *Processor) recordWithOtel(ctx context.Context, dataType config.DataType, accepted, refused, dropped int64) { -// var acceptedCounter, refusedCounter, droppedCounter syncint64.Counter - -// switch dataType { -// case config.TracesDataType: -// acceptedCounter = por.acceptedSpansCounter -// refusedCounter = por.refusedSpansCounter -// droppedCounter = por.droppedSpansCounter -// case config.MetricsDataType: -// acceptedCounter = por.acceptedMetricPointsCounter -// refusedCounter = por.refusedMetricPointsCounter -// droppedCounter = por.droppedMetricPointsCounter -// case config.LogsDataType: -// acceptedCounter = por.acceptedLogRecordsCounter -// refusedCounter = por.refusedLogRecordsCounter -// droppedCounter = por.droppedLogRecordsCounter -// } - -// acceptedCounter.Add(ctx, int64(accepted), por.otelAttrs...) -// refusedCounter.Add(ctx, int64(refused), por.otelAttrs...) -// droppedCounter.Add(ctx, int64(dropped), por.otelAttrs...) -// } - -// func (por *Processor) recordWithOC(ctx context.Context, dataType, config.DataType) - - // TracesAccepted reports that the trace data was accepted. func (por *Processor) TracesAccepted(ctx context.Context, numSpans int) { if por.level == configtelemetry.LevelNone { @@ -283,7 +249,7 @@ func (por *Processor) MetricsAccepted(ctx context.Context, numPoints int) { } if por.useOtelForMetrics { - por.acceptedMetricPointsCounter.Add(ctx, int64(numPoints), ) + por.acceptedMetricPointsCounter.Add(ctx, int64(numPoints), por.otelAttrs...) } else { // ignore the error for now; should not happen _ = stats.RecordWithTags( diff --git a/obsreport/obsreporttest/obsreporttest.go b/obsreport/obsreporttest/obsreporttest.go index d17f9ddad3b..e2356a1d7bb 100644 --- a/obsreport/obsreporttest/obsreporttest.go +++ b/obsreport/obsreporttest/obsreporttest.go @@ -157,32 +157,20 @@ func CheckExporterLogs(tts TestTelemetry, exporter component.ID, sentLogRecords, // CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values. // When this function is called it is required to also call SetupTelemetry as first thing. -func CheckProcessorTraces(_ TestTelemetry, processor component.ID, acceptedSpans, refusedSpans, droppedSpans int64) error { - processorTags := tagsForProcessorView(processor) - return multierr.Combine( - checkValueForView(processorTags, acceptedSpans, "processor/accepted_spans"), - checkValueForView(processorTags, refusedSpans, "processor/refused_spans"), - checkValueForView(processorTags, droppedSpans, "processor/dropped_spans")) +func CheckProcessorTraces(tts TestTelemetry, processor config.ComponentID, acceptedSpans, refusedSpans, droppedSpans int64) error { + return tts.otelPrometheusChecker.checkProcessorTraces(processor, acceptedSpans, refusedSpans, droppedSpans) } // CheckProcessorMetrics checks that for the current exported values for metrics exporter metrics match given values. // When this function is called it is required to also call SetupTelemetry as first thing. -func CheckProcessorMetrics(_ TestTelemetry, processor component.ID, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints int64) error { - processorTags := tagsForProcessorView(processor) - return multierr.Combine( - checkValueForView(processorTags, acceptedMetricPoints, "processor/accepted_metric_points"), - checkValueForView(processorTags, refusedMetricPoints, "processor/refused_metric_points"), - checkValueForView(processorTags, droppedMetricPoints, "processor/dropped_metric_points")) +func CheckProcessorMetrics(tts TestTelemetry, processor config.ComponentID, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints int64) error { + return tts.otelPrometheusChecker.checkProcessorMetrics(processor, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints) } // CheckProcessorLogs checks that for the current exported values for logs exporter metrics match given values. // When this function is called it is required to also call SetupTelemetry as first thing. -func CheckProcessorLogs(_ TestTelemetry, processor component.ID, acceptedLogRecords, refusedLogRecords, droppedLogRecords int64) error { - processorTags := tagsForProcessorView(processor) - return multierr.Combine( - checkValueForView(processorTags, acceptedLogRecords, "processor/accepted_log_records"), - checkValueForView(processorTags, refusedLogRecords, "processor/refused_log_records"), - checkValueForView(processorTags, droppedLogRecords, "processor/dropped_log_records")) +func CheckProcessorLogs(tts TestTelemetry, processor config.ComponentID, acceptedLogRecords, refusedLogRecords, droppedLogRecords int64) error { + return tts.otelPrometheusChecker.checkProcessorLogs(processor, acceptedLogRecords, refusedLogRecords, droppedLogRecords) } // CheckReceiverTraces checks that for the current exported values for trace receiver metrics match given values. diff --git a/obsreport/obsreporttest/otelprometheuschecker.go b/obsreport/obsreporttest/otelprometheuschecker.go index 5349beb6aa9..d9e872d6ac3 100644 --- a/obsreport/obsreporttest/otelprometheuschecker.go +++ b/obsreport/obsreporttest/otelprometheuschecker.go @@ -62,7 +62,31 @@ func (pc *prometheusChecker) checkReceiverMetrics(receiver component.ID, protoco pc.checkCounter("receiver_refused_metric_points", droppedMetricPoints, receiverAttrs)) } -func (pc *prometheusChecker) checkExporterTraces(exporter component.ID, sentSpans, sendFailedSpans int64) error { +func (pc *prometheusChecker) checkProcessorTraces(processor config.ComponentID, acceptedSpans, refusedSpans, droppedSpans int64) error { + processorAttrs := attributesForProcessorMetrics(processor) + return multierr.Combine( + pc.checkCounter("processor_accepted_spans", acceptedSpans, processorAttrs), + pc.checkCounter("processor_refused_spans", refusedSpans, processorAttrs), + pc.checkCounter("processor_dropped_spans", droppedSpans, processorAttrs)) +} + +func (pc *prometheusChecker) checkProcessorMetrics(processor config.ComponentID, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints int64) error { + processorAttrs := attributesForProcessorMetrics(processor) + return multierr.Combine( + pc.checkCounter("processor_accepted_metric_points", acceptedMetricPoints, processorAttrs), + pc.checkCounter("processor_refused_metric_points", refusedMetricPoints, processorAttrs), + pc.checkCounter("processor_dropped_metric_points", droppedMetricPoints, processorAttrs)) +} + +func (pc *prometheusChecker) checkProcessorLogs(processor config.ComponentID, acceptedLogRecords, refusedLogRecords, droppedLogRecords int64) error { + processorAttrs := attributesForProcessorMetrics(processor) + return multierr.Combine( + pc.checkCounter("processor_accepted_log_records", acceptedLogRecords, processorAttrs), + pc.checkCounter("processor_refused_log_records", refusedLogRecords, processorAttrs), + pc.checkCounter("processor_dropped_log_records", droppedLogRecords, processorAttrs)) +} + +func (pc *prometheusChecker) checkExporterTraces(exporter config.ComponentID, sentSpans, sendFailedSpans int64) error { exporterAttrs := attributesForExporterMetrics(exporter) return multierr.Combine( pc.checkCounter("exporter_sent_spans", sentSpans, exporterAttrs), @@ -167,6 +191,12 @@ func attributesForReceiverMetrics(receiver component.ID, transport string) []att } } +func attributesForProcessorMetrics(processor config.ComponentID) []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String(processorTag.Name(), processor.String()), + } +} + // attributesForReceiverMetrics returns the attributes that are needed for the receiver metrics. func attributesForExporterMetrics(exporter component.ID) []attribute.KeyValue { return []attribute.KeyValue{ From 1983ee017ac65d0bb05aa3a43f8d381fbb9f7970 Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Wed, 2 Nov 2022 14:58:40 -0400 Subject: [PATCH 3/6] add obsreporttest unit tests for processor --- obsreport/obsreporttest/obsreporttest_test.go | 77 ++++++++++++++++++- 1 file changed, 75 insertions(+), 2 deletions(-) diff --git a/obsreport/obsreporttest/obsreporttest_test.go b/obsreport/obsreporttest/obsreporttest_test.go index edb77d1fea4..7021219bceb 100644 --- a/obsreport/obsreporttest/obsreporttest_test.go +++ b/obsreport/obsreporttest/obsreporttest_test.go @@ -33,8 +33,9 @@ const ( var ( scraper = component.NewID("fakeScraper") - receiver = component.NewID("fakeReicever") - exporter = component.NewID("fakeExporter") + receiver = config.NewComponentID("fakeReicever") + processor = config.NewComponentID("fakeProcessor") + exporter = config.NewComponentID("fakeExporter") ) func TestCheckScraperMetricsViews(t *testing.T) { @@ -121,6 +122,78 @@ func TestCheckReceiverLogsViews(t *testing.T) { assert.Error(t, obsreporttest.CheckReceiverLogs(tt, receiver, transport, 0, 7)) } +func TestCheckProcessorTracesViews(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry() + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + por := obsreport.NewProcessor(obsreport.ProcessorSettings{ + ProcessorID: processor, + ProcessorCreateSettings: tt.ToProcessorCreateSettings(), + }) + + por.TracesAccepted(context.Background(), 7) + por.TracesRefused(context.Background(), 8) + por.TracesDropped(context.Background(), 9) + + assert.NoError(t, obsreporttest.CheckProcessorTraces(tt, processor, 7, 8, 9)) + assert.Error(t, obsreporttest.CheckProcessorTraces(tt, processor, 0, 0, 0)) + assert.Error(t, obsreporttest.CheckProcessorTraces(tt, processor, 7, 0, 0)) + assert.Error(t, obsreporttest.CheckProcessorTraces(tt, processor, 7, 8, 0)) + assert.Error(t, obsreporttest.CheckProcessorTraces(tt, processor, 7, 0, 9)) + assert.Error(t, obsreporttest.CheckProcessorTraces(tt, processor, 0, 8, 0)) + assert.Error(t, obsreporttest.CheckProcessorTraces(tt, processor, 0, 8, 9)) + assert.Error(t, obsreporttest.CheckProcessorTraces(tt, processor, 0, 0, 9)) +} + +func TestCheckProcessorMetricsViews(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry() + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + por := obsreport.NewProcessor(obsreport.ProcessorSettings{ + ProcessorID: processor, + ProcessorCreateSettings: tt.ToProcessorCreateSettings(), + }) + + por.MetricsAccepted(context.Background(), 7) + por.MetricsRefused(context.Background(), 8) + por.MetricsDropped(context.Background(), 9) + + assert.NoError(t, obsreporttest.CheckProcessorMetrics(tt, processor, 7, 8, 9)) + assert.Error(t, obsreporttest.CheckProcessorMetrics(tt, processor, 0, 0, 0)) + assert.Error(t, obsreporttest.CheckProcessorMetrics(tt, processor, 7, 0, 0)) + assert.Error(t, obsreporttest.CheckProcessorMetrics(tt, processor, 7, 8, 0)) + assert.Error(t, obsreporttest.CheckProcessorMetrics(tt, processor, 7, 0, 9)) + assert.Error(t, obsreporttest.CheckProcessorMetrics(tt, processor, 0, 8, 0)) + assert.Error(t, obsreporttest.CheckProcessorMetrics(tt, processor, 0, 8, 9)) + assert.Error(t, obsreporttest.CheckProcessorMetrics(tt, processor, 0, 0, 9)) +} + +func TestCheckProcessorLogViews(t *testing.T) { + tt, err := obsreporttest.SetupTelemetry() + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + por := obsreport.NewProcessor(obsreport.ProcessorSettings{ + ProcessorID: processor, + ProcessorCreateSettings: tt.ToProcessorCreateSettings(), + }) + + por.LogsAccepted(context.Background(), 7) + por.LogsRefused(context.Background(), 8) + por.LogsDropped(context.Background(), 9) + + assert.NoError(t, obsreporttest.CheckProcessorLogs(tt, processor, 7, 8, 9)) + assert.Error(t, obsreporttest.CheckProcessorLogs(tt, processor, 0, 0, 0)) + assert.Error(t, obsreporttest.CheckProcessorLogs(tt, processor, 7, 0, 0)) + assert.Error(t, obsreporttest.CheckProcessorLogs(tt, processor, 7, 8, 0)) + assert.Error(t, obsreporttest.CheckProcessorLogs(tt, processor, 7, 0, 9)) + assert.Error(t, obsreporttest.CheckProcessorLogs(tt, processor, 0, 8, 0)) + assert.Error(t, obsreporttest.CheckProcessorLogs(tt, processor, 0, 8, 9)) + assert.Error(t, obsreporttest.CheckProcessorLogs(tt, processor, 0, 0, 9)) +} + func TestCheckExporterTracesViews(t *testing.T) { tt, err := obsreporttest.SetupTelemetry() require.NoError(t, err) From 85fdbe8158be8884aab665d0b1573e2051299254 Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Wed, 2 Nov 2022 15:51:58 -0400 Subject: [PATCH 4/6] refactor common code --- obsreport/obsreport_processor.go | 233 ++++++++++++------------------- 1 file changed, 86 insertions(+), 147 deletions(-) diff --git a/obsreport/obsreport_processor.go b/obsreport/obsreport_processor.go index e828d005183..420a49ab5e1 100644 --- a/obsreport/obsreport_processor.go +++ b/obsreport/obsreport_processor.go @@ -103,8 +103,8 @@ func newProcessor(cfg ProcessorSettings, registry *featuregate.Registry) *Proces } -func (proc *Processor) createOtelMetrics(cfg ProcessorSettings) { - if !proc.useOtelForMetrics { +func (por *Processor) createOtelMetrics(cfg ProcessorSettings) { + if !por.useOtelForMetrics { return } @@ -113,67 +113,67 @@ func (proc *Processor) createOtelMetrics(cfg ProcessorSettings) { var err error handleError := func(metricName string, err error) { if err != nil { - proc.logger.Warn("failed to create otel instrument", zap.Error(err), zap.String("metric", metricName)) + por.logger.Warn("failed to create otel instrument", zap.Error(err), zap.String("metric", metricName)) } } - proc.acceptedSpansCounter, err = meter.SyncInt64().Counter( + por.acceptedSpansCounter, err = meter.SyncInt64().Counter( obsmetrics.ProcessorPrefix+obsmetrics.AcceptedSpansKey, instrument.WithDescription("Number of spans successfully pushed into the next component in the pipeline."), instrument.WithUnit(unit.Dimensionless), ) handleError(obsmetrics.ProcessorPrefix+obsmetrics.AcceptedSpansKey, err) - proc.refusedSpansCounter, err = meter.SyncInt64().Counter( + por.refusedSpansCounter, err = meter.SyncInt64().Counter( obsmetrics.ProcessorPrefix+obsmetrics.RefusedSpansKey, instrument.WithDescription("Number of spans that were rejected by the next component in the pipeline."), instrument.WithUnit(unit.Dimensionless), ) handleError(obsmetrics.ProcessorPrefix+obsmetrics.RefusedSpansKey, err) - proc.droppedSpansCounter, err = meter.SyncInt64().Counter( + por.droppedSpansCounter, err = meter.SyncInt64().Counter( obsmetrics.ProcessorPrefix+obsmetrics.DroppedSpansKey, instrument.WithDescription("Number of spans that were dropped."), instrument.WithUnit(unit.Dimensionless), ) handleError(obsmetrics.ProcessorPrefix+obsmetrics.DroppedSpansKey, err) - proc.acceptedMetricPointsCounter, err = meter.SyncInt64().Counter( + por.acceptedMetricPointsCounter, err = meter.SyncInt64().Counter( obsmetrics.ProcessorPrefix+obsmetrics.AcceptedMetricPointsKey, instrument.WithDescription("Number of metric points successfully pushed into the next component in the pipeline."), instrument.WithUnit(unit.Dimensionless), ) handleError(obsmetrics.ProcessorPrefix+obsmetrics.AcceptedMetricPointsKey, err) - proc.refusedMetricPointsCounter, err = meter.SyncInt64().Counter( + por.refusedMetricPointsCounter, err = meter.SyncInt64().Counter( obsmetrics.ProcessorPrefix+obsmetrics.RefusedMetricPointsKey, instrument.WithDescription("Number of metric points that were rejected by the next component in the pipeline."), instrument.WithUnit(unit.Dimensionless), ) handleError(obsmetrics.ProcessorPrefix+obsmetrics.RefusedMetricPointsKey, err) - proc.droppedMetricPointsCounter, err = meter.SyncInt64().Counter( + por.droppedMetricPointsCounter, err = meter.SyncInt64().Counter( obsmetrics.ProcessorPrefix+obsmetrics.DroppedMetricPointsKey, instrument.WithDescription("Number of metric points that were dropped."), instrument.WithUnit(unit.Dimensionless), ) handleError(obsmetrics.ProcessorPrefix+obsmetrics.DroppedMetricPointsKey, err) - proc.acceptedLogRecordsCounter, err = meter.SyncInt64().Counter( + por.acceptedLogRecordsCounter, err = meter.SyncInt64().Counter( obsmetrics.ProcessorPrefix+obsmetrics.AcceptedLogRecordsKey, instrument.WithDescription("Number of log records successfully pushed into the next component in the pipeline."), instrument.WithUnit(unit.Dimensionless), ) handleError(obsmetrics.ProcessorPrefix+obsmetrics.AcceptedLogRecordsKey, err) - proc.refusedLogRecordsCounter, err = meter.SyncInt64().Counter( + por.refusedLogRecordsCounter, err = meter.SyncInt64().Counter( obsmetrics.ProcessorPrefix+obsmetrics.RefusedLogRecordsKey, instrument.WithDescription("Number of log records that were rejected by the next component in the pipeline."), instrument.WithUnit(unit.Dimensionless), ) handleError(obsmetrics.ProcessorPrefix+obsmetrics.RefusedLogRecordsKey, err) - proc.droppedLogRecordsCounter, err = meter.SyncInt64().Counter( + por.droppedLogRecordsCounter, err = meter.SyncInt64().Counter( obsmetrics.ProcessorPrefix+obsmetrics.DroppedLogRecordsKey, instrument.WithDescription("Number of log records that were dropped."), instrument.WithUnit(unit.Dimensionless), @@ -181,184 +181,123 @@ func (proc *Processor) createOtelMetrics(cfg ProcessorSettings) { handleError(obsmetrics.ProcessorPrefix+obsmetrics.DroppedLogRecordsKey, err) } -// TracesAccepted reports that the trace data was accepted. -func (por *Processor) TracesAccepted(ctx context.Context, numSpans int) { - if por.level == configtelemetry.LevelNone { - return +func (por *Processor) recordWithOtel(ctx context.Context, dataType config.DataType, accepted, refused, dropped int64) { + var acceptedCount, refusedCount, droppedCount syncint64.Counter + switch dataType { + case config.TracesDataType: + acceptedCount = por.acceptedSpansCounter + refusedCount = por.refusedSpansCounter + droppedCount = por.droppedSpansCounter + case config.MetricsDataType: + acceptedCount = por.acceptedMetricPointsCounter + refusedCount = por.refusedMetricPointsCounter + droppedCount = por.droppedMetricPointsCounter + case config.LogsDataType: + acceptedCount = por.acceptedLogRecordsCounter + refusedCount = por.refusedLogRecordsCounter + droppedCount = por.droppedLogRecordsCounter } - if por.useOtelForMetrics { - por.acceptedSpansCounter.Add(ctx, int64(numSpans), por.otelAttrs...) - } else { - // ignore the error for now; should not happen - _ = stats.RecordWithTags( - ctx, - por.mutators, - obsmetrics.ProcessorAcceptedSpans.M(int64(numSpans)), - obsmetrics.ProcessorRefusedSpans.M(0), - obsmetrics.ProcessorDroppedSpans.M(0), - ) - } + acceptedCount.Add(ctx, accepted, por.otelAttrs...) + refusedCount.Add(ctx, refused, por.otelAttrs...) + droppedCount.Add(ctx, dropped, por.otelAttrs...) } -// TracesRefused reports that the trace data was refused. -func (por *Processor) TracesRefused(ctx context.Context, numSpans int) { - if por.level == configtelemetry.LevelNone { - return +func (por *Processor) recordWithOC(ctx context.Context, dataType config.DataType, accepted, refused, dropped int64) { + var acceptedMeasure, refusedMeasure, droppedMeasure *stats.Int64Measure + + switch dataType { + case config.TracesDataType: + acceptedMeasure = obsmetrics.ProcessorAcceptedSpans + refusedMeasure = obsmetrics.ProcessorRefusedSpans + droppedMeasure = obsmetrics.ProcessorDroppedSpans + case config.MetricsDataType: + acceptedMeasure = obsmetrics.ProcessorAcceptedMetricPoints + refusedMeasure = obsmetrics.ProcessorRefusedMetricPoints + droppedMeasure = obsmetrics.ProcessorDroppedMetricPoints + case config.LogsDataType: + acceptedMeasure = obsmetrics.ProcessorAcceptedLogRecords + refusedMeasure = obsmetrics.ProcessorRefusedLogRecords + droppedMeasure = obsmetrics.ProcessorDroppedLogRecords } + // ignore the error for now; should not happen + _ = stats.RecordWithTags( + ctx, + por.mutators, + acceptedMeasure.M(accepted), + refusedMeasure.M(refused), + droppedMeasure.M(dropped), + ) +} + +func (por *Processor) recordData(ctx context.Context, dataType config.DataType, accepted, refused, dropped int64) { if por.useOtelForMetrics { - por.refusedSpansCounter.Add(ctx, int64(numSpans), por.otelAttrs...) + por.recordWithOtel(ctx, dataType, accepted, refused, dropped) } else { - // ignore the error for now; should not happen - _ = stats.RecordWithTags( - ctx, - por.mutators, - obsmetrics.ProcessorAcceptedSpans.M(0), - obsmetrics.ProcessorRefusedSpans.M(int64(numSpans)), - obsmetrics.ProcessorDroppedSpans.M(0), - ) + por.recordWithOC(ctx, dataType, accepted, refused, dropped) + } +} +// TracesAccepted reports that the trace data was accepted. +func (por *Processor) TracesAccepted(ctx context.Context, numSpans int) { + if por.level != configtelemetry.LevelNone { + por.recordData(ctx, config.TracesDataType, int64(numSpans), int64(0), int64(0)) } } -// TracesDropped reports that the trace data was dropped. -func (por *Processor) TracesDropped(ctx context.Context, numSpans int) { - if por.level == configtelemetry.LevelNone { - return +// TracesRefused reports that the trace data was refused. +func (por *Processor) TracesRefused(ctx context.Context, numSpans int) { + if por.level != configtelemetry.LevelNone { + por.recordData(ctx, config.TracesDataType, int64(0), int64(numSpans), int64(0)) } +} - if por.useOtelForMetrics { - por.droppedSpansCounter.Add(ctx, int64(numSpans), por.otelAttrs...) - } else { - // ignore the error for now; should not happen - _ = stats.RecordWithTags( - ctx, - por.mutators, - obsmetrics.ProcessorAcceptedSpans.M(0), - obsmetrics.ProcessorRefusedSpans.M(0), - obsmetrics.ProcessorDroppedSpans.M(int64(numSpans)), - ) +// TracesDropped reports that the trace data was dropped. +func (por *Processor) TracesDropped(ctx context.Context, numSpans int) { + if por.level != configtelemetry.LevelNone { + por.recordData(ctx, config.TracesDataType, int64(0), int64(0), int64(numSpans)) } } // MetricsAccepted reports that the metrics were accepted. func (por *Processor) MetricsAccepted(ctx context.Context, numPoints int) { - if por.level == configtelemetry.LevelNone { - return - } - - if por.useOtelForMetrics { - por.acceptedMetricPointsCounter.Add(ctx, int64(numPoints), por.otelAttrs...) - } else { - // ignore the error for now; should not happen - _ = stats.RecordWithTags( - ctx, - por.mutators, - obsmetrics.ProcessorAcceptedMetricPoints.M(int64(numPoints)), - obsmetrics.ProcessorRefusedMetricPoints.M(0), - obsmetrics.ProcessorDroppedMetricPoints.M(0), - ) - + if por.level != configtelemetry.LevelNone { + por.recordData(ctx, config.MetricsDataType, int64(numPoints), int64(0), int64(0)) } } // MetricsRefused reports that the metrics were refused. func (por *Processor) MetricsRefused(ctx context.Context, numPoints int) { - if por.level == configtelemetry.LevelNone { - return - } - - if por.useOtelForMetrics { - por.refusedMetricPointsCounter.Add(ctx, int64(numPoints), por.otelAttrs...) - } else { - // ignore the error for now; should not happen - _ = stats.RecordWithTags( - ctx, - por.mutators, - obsmetrics.ProcessorAcceptedMetricPoints.M(0), - obsmetrics.ProcessorRefusedMetricPoints.M(int64(numPoints)), - obsmetrics.ProcessorDroppedMetricPoints.M(0), - ) + if por.level != configtelemetry.LevelNone { + por.recordData(ctx, config.MetricsDataType, int64(0), int64(numPoints), int64(0)) } } // MetricsDropped reports that the metrics were dropped. func (por *Processor) MetricsDropped(ctx context.Context, numPoints int) { - if por.level == configtelemetry.LevelNone { - return - } - - if por.useOtelForMetrics { - por.droppedMetricPointsCounter.Add(ctx, int64(numPoints), por.otelAttrs...) - } else { - // ignore the error for now; should not happen - _ = stats.RecordWithTags( - ctx, - por.mutators, - obsmetrics.ProcessorAcceptedMetricPoints.M(0), - obsmetrics.ProcessorRefusedMetricPoints.M(0), - obsmetrics.ProcessorDroppedMetricPoints.M(int64(numPoints)), - ) + if por.level != configtelemetry.LevelNone { + por.recordData(ctx, config.MetricsDataType, int64(0), int64(0), int64(numPoints)) } } // LogsAccepted reports that the logs were accepted. func (por *Processor) LogsAccepted(ctx context.Context, numRecords int) { - if por.level == configtelemetry.LevelNone { - return - } - - if por.useOtelForMetrics { - por.acceptedLogRecordsCounter.Add(ctx, int64(numRecords), por.otelAttrs...) - } else { - // ignore the error for now; should not happen - _ = stats.RecordWithTags( - ctx, - por.mutators, - obsmetrics.ProcessorAcceptedLogRecords.M(int64(numRecords)), - obsmetrics.ProcessorRefusedLogRecords.M(0), - obsmetrics.ProcessorDroppedLogRecords.M(0), - ) + if por.level != configtelemetry.LevelNone { + por.recordData(ctx, config.LogsDataType, int64(numRecords), int64(0), int64(0)) } } // LogsRefused reports that the logs were refused. func (por *Processor) LogsRefused(ctx context.Context, numRecords int) { - if por.level == configtelemetry.LevelNone { - return - } - - if por.useOtelForMetrics { - por.refusedLogRecordsCounter.Add(ctx, int64(numRecords), por.otelAttrs...) - } else { - // ignore the error for now; should not happen - _ = stats.RecordWithTags( - ctx, - por.mutators, - obsmetrics.ProcessorAcceptedLogRecords.M(0), - obsmetrics.ProcessorRefusedLogRecords.M(int64(numRecords)), - obsmetrics.ProcessorDroppedMetricPoints.M(0), - ) + if por.level != configtelemetry.LevelNone { + por.recordData(ctx, config.LogsDataType, int64(0), int64(numRecords), int64(0)) } } // LogsDropped reports that the logs were dropped. func (por *Processor) LogsDropped(ctx context.Context, numRecords int) { - if por.level == configtelemetry.LevelNone { - return - } - - if por.useOtelForMetrics { - por.droppedLogRecordsCounter.Add(ctx, int64(numRecords), por.otelAttrs...) - } else { - // ignore the error for now; should not happen - _ = stats.RecordWithTags( - ctx, - por.mutators, - obsmetrics.ProcessorAcceptedLogRecords.M(0), - obsmetrics.ProcessorRefusedLogRecords.M(0), - obsmetrics.ProcessorDroppedLogRecords.M(int64(numRecords)), - ) + if por.level != configtelemetry.LevelNone { + por.recordData(ctx, config.LogsDataType, int64(0), int64(0), int64(numRecords)) } } From bc6a8138dc7b0c2a0aea979c91432374b7239915 Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Sun, 20 Nov 2022 23:54:10 -0500 Subject: [PATCH 5/6] rebase and update pr --- obsreport/obsreport_processor.go | 114 ++++++++++-------- obsreport/obsreport_test.go | 15 ++- obsreport/obsreporttest/obsreporttest.go | 6 +- obsreport/obsreporttest/obsreporttest_test.go | 15 ++- .../obsreporttest/otelprometheuschecker.go | 10 +- 5 files changed, 88 insertions(+), 72 deletions(-) diff --git a/obsreport/obsreport_processor.go b/obsreport/obsreport_processor.go index 420a49ab5e1..b8a0aaa0681 100644 --- a/obsreport/obsreport_processor.go +++ b/obsreport/obsreport_processor.go @@ -21,9 +21,11 @@ import ( "go.opencensus.io/stats" "go.opencensus.io/tag" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/instrument" "go.opentelemetry.io/otel/metric/instrument/syncint64" "go.opentelemetry.io/otel/metric/unit" + "go.uber.org/multierr" "go.uber.org/zap" "go.opentelemetry.io/collector/component" @@ -40,7 +42,7 @@ var ( // BuildProcessorCustomMetricName is used to be build a metric name following // the standards used in the Collector. The configType should be the same -// value used to identify the type on the config. +// value used to identify the type on the component. func BuildProcessorCustomMetricName(configType, metric string) string { componentPrefix := obsmetrics.ProcessorPrefix if !strings.HasSuffix(componentPrefix, obsmetrics.NameSep) { @@ -57,6 +59,7 @@ type Processor struct { level configtelemetry.Level mutators []tag.Mutator + meter metric.Meter logger *zap.Logger useOtelForMetrics bool @@ -81,118 +84,125 @@ type ProcessorSettings struct { } // NewProcessor creates a new Processor. -func NewProcessor(cfg ProcessorSettings) *Processor { +func NewProcessor(cfg ProcessorSettings) (*Processor, error) { return newProcessor(cfg, featuregate.GetRegistry()) } -func newProcessor(cfg ProcessorSettings, registry *featuregate.Registry) *Processor { +// Deprecated: [v0.65.0] use NewProcessor. +func MustNewProcessor(cfg ProcessorSettings) *Processor { + proc, err := NewProcessor(cfg) + if err != nil { + panic(err) + } + + return proc +} + +func newProcessor(cfg ProcessorSettings, registry *featuregate.Registry) (*Processor, error) { proc := &Processor{ level: cfg.ProcessorCreateSettings.MetricsLevel, mutators: []tag.Mutator{tag.Upsert(obsmetrics.TagKeyProcessor, cfg.ProcessorID.String(), tag.WithTTL(tag.TTLNoPropagation))}, logger: cfg.ProcessorCreateSettings.Logger, useOtelForMetrics: registry.IsEnabled(obsreportconfig.UseOtelForInternalMetricsfeatureGateID), + meter: cfg.ProcessorCreateSettings.MeterProvider.Meter(processorScope), otelAttrs: []attribute.KeyValue{ attribute.String(obsmetrics.ProcessorKey, cfg.ProcessorID.String()), }, } - proc.createOtelMetrics(cfg) - - return proc + if err := proc.createOtelMetrics(); err != nil { + return nil, err + } + return proc, nil } -func (por *Processor) createOtelMetrics(cfg ProcessorSettings) { +func (por *Processor) createOtelMetrics() error { if !por.useOtelForMetrics { - return + return nil } - meter := cfg.ProcessorCreateSettings.MeterProvider.Meter(processorScope) + var errors, err error - var err error - handleError := func(metricName string, err error) { - if err != nil { - por.logger.Warn("failed to create otel instrument", zap.Error(err), zap.String("metric", metricName)) - } - } - - por.acceptedSpansCounter, err = meter.SyncInt64().Counter( + por.acceptedSpansCounter, err = por.meter.SyncInt64().Counter( obsmetrics.ProcessorPrefix+obsmetrics.AcceptedSpansKey, instrument.WithDescription("Number of spans successfully pushed into the next component in the pipeline."), instrument.WithUnit(unit.Dimensionless), ) - handleError(obsmetrics.ProcessorPrefix+obsmetrics.AcceptedSpansKey, err) + errors = multierr.Append(errors, err) - por.refusedSpansCounter, err = meter.SyncInt64().Counter( + por.refusedSpansCounter, err = por.meter.SyncInt64().Counter( obsmetrics.ProcessorPrefix+obsmetrics.RefusedSpansKey, instrument.WithDescription("Number of spans that were rejected by the next component in the pipeline."), instrument.WithUnit(unit.Dimensionless), ) - handleError(obsmetrics.ProcessorPrefix+obsmetrics.RefusedSpansKey, err) + errors = multierr.Append(errors, err) - por.droppedSpansCounter, err = meter.SyncInt64().Counter( + por.droppedSpansCounter, err = por.meter.SyncInt64().Counter( obsmetrics.ProcessorPrefix+obsmetrics.DroppedSpansKey, instrument.WithDescription("Number of spans that were dropped."), instrument.WithUnit(unit.Dimensionless), ) - handleError(obsmetrics.ProcessorPrefix+obsmetrics.DroppedSpansKey, err) + errors = multierr.Append(errors, err) - por.acceptedMetricPointsCounter, err = meter.SyncInt64().Counter( + por.acceptedMetricPointsCounter, err = por.meter.SyncInt64().Counter( obsmetrics.ProcessorPrefix+obsmetrics.AcceptedMetricPointsKey, instrument.WithDescription("Number of metric points successfully pushed into the next component in the pipeline."), instrument.WithUnit(unit.Dimensionless), ) - handleError(obsmetrics.ProcessorPrefix+obsmetrics.AcceptedMetricPointsKey, err) + errors = multierr.Append(errors, err) - por.refusedMetricPointsCounter, err = meter.SyncInt64().Counter( + por.refusedMetricPointsCounter, err = por.meter.SyncInt64().Counter( obsmetrics.ProcessorPrefix+obsmetrics.RefusedMetricPointsKey, instrument.WithDescription("Number of metric points that were rejected by the next component in the pipeline."), instrument.WithUnit(unit.Dimensionless), ) - handleError(obsmetrics.ProcessorPrefix+obsmetrics.RefusedMetricPointsKey, err) + errors = multierr.Append(errors, err) - por.droppedMetricPointsCounter, err = meter.SyncInt64().Counter( + por.droppedMetricPointsCounter, err = por.meter.SyncInt64().Counter( obsmetrics.ProcessorPrefix+obsmetrics.DroppedMetricPointsKey, instrument.WithDescription("Number of metric points that were dropped."), instrument.WithUnit(unit.Dimensionless), ) - handleError(obsmetrics.ProcessorPrefix+obsmetrics.DroppedMetricPointsKey, err) + errors = multierr.Append(errors, err) - por.acceptedLogRecordsCounter, err = meter.SyncInt64().Counter( + por.acceptedLogRecordsCounter, err = por.meter.SyncInt64().Counter( obsmetrics.ProcessorPrefix+obsmetrics.AcceptedLogRecordsKey, instrument.WithDescription("Number of log records successfully pushed into the next component in the pipeline."), instrument.WithUnit(unit.Dimensionless), ) - handleError(obsmetrics.ProcessorPrefix+obsmetrics.AcceptedLogRecordsKey, err) + errors = multierr.Append(errors, err) - por.refusedLogRecordsCounter, err = meter.SyncInt64().Counter( + por.refusedLogRecordsCounter, err = por.meter.SyncInt64().Counter( obsmetrics.ProcessorPrefix+obsmetrics.RefusedLogRecordsKey, instrument.WithDescription("Number of log records that were rejected by the next component in the pipeline."), instrument.WithUnit(unit.Dimensionless), ) - handleError(obsmetrics.ProcessorPrefix+obsmetrics.RefusedLogRecordsKey, err) + errors = multierr.Append(errors, err) - por.droppedLogRecordsCounter, err = meter.SyncInt64().Counter( + por.droppedLogRecordsCounter, err = por.meter.SyncInt64().Counter( obsmetrics.ProcessorPrefix+obsmetrics.DroppedLogRecordsKey, instrument.WithDescription("Number of log records that were dropped."), instrument.WithUnit(unit.Dimensionless), ) - handleError(obsmetrics.ProcessorPrefix+obsmetrics.DroppedLogRecordsKey, err) + errors = multierr.Append(errors, err) + + return errors } -func (por *Processor) recordWithOtel(ctx context.Context, dataType config.DataType, accepted, refused, dropped int64) { +func (por *Processor) recordWithOtel(ctx context.Context, dataType component.DataType, accepted, refused, dropped int64) { var acceptedCount, refusedCount, droppedCount syncint64.Counter switch dataType { - case config.TracesDataType: + case component.DataTypeTraces: acceptedCount = por.acceptedSpansCounter refusedCount = por.refusedSpansCounter droppedCount = por.droppedSpansCounter - case config.MetricsDataType: + case component.DataTypeMetrics: acceptedCount = por.acceptedMetricPointsCounter refusedCount = por.refusedMetricPointsCounter droppedCount = por.droppedMetricPointsCounter - case config.LogsDataType: + case component.DataTypeLogs: acceptedCount = por.acceptedLogRecordsCounter refusedCount = por.refusedLogRecordsCounter droppedCount = por.droppedLogRecordsCounter @@ -203,19 +213,19 @@ func (por *Processor) recordWithOtel(ctx context.Context, dataType config.DataTy droppedCount.Add(ctx, dropped, por.otelAttrs...) } -func (por *Processor) recordWithOC(ctx context.Context, dataType config.DataType, accepted, refused, dropped int64) { +func (por *Processor) recordWithOC(ctx context.Context, dataType component.DataType, accepted, refused, dropped int64) { var acceptedMeasure, refusedMeasure, droppedMeasure *stats.Int64Measure switch dataType { - case config.TracesDataType: + case component.DataTypeTraces: acceptedMeasure = obsmetrics.ProcessorAcceptedSpans refusedMeasure = obsmetrics.ProcessorRefusedSpans droppedMeasure = obsmetrics.ProcessorDroppedSpans - case config.MetricsDataType: + case component.DataTypeMetrics: acceptedMeasure = obsmetrics.ProcessorAcceptedMetricPoints refusedMeasure = obsmetrics.ProcessorRefusedMetricPoints droppedMeasure = obsmetrics.ProcessorDroppedMetricPoints - case config.LogsDataType: + case component.DataTypeLogs: acceptedMeasure = obsmetrics.ProcessorAcceptedLogRecords refusedMeasure = obsmetrics.ProcessorRefusedLogRecords droppedMeasure = obsmetrics.ProcessorDroppedLogRecords @@ -231,7 +241,7 @@ func (por *Processor) recordWithOC(ctx context.Context, dataType config.DataType ) } -func (por *Processor) recordData(ctx context.Context, dataType config.DataType, accepted, refused, dropped int64) { +func (por *Processor) recordData(ctx context.Context, dataType component.DataType, accepted, refused, dropped int64) { if por.useOtelForMetrics { por.recordWithOtel(ctx, dataType, accepted, refused, dropped) } else { @@ -242,62 +252,62 @@ func (por *Processor) recordData(ctx context.Context, dataType config.DataType, // TracesAccepted reports that the trace data was accepted. func (por *Processor) TracesAccepted(ctx context.Context, numSpans int) { if por.level != configtelemetry.LevelNone { - por.recordData(ctx, config.TracesDataType, int64(numSpans), int64(0), int64(0)) + por.recordData(ctx, component.DataTypeTraces, int64(numSpans), int64(0), int64(0)) } } // TracesRefused reports that the trace data was refused. func (por *Processor) TracesRefused(ctx context.Context, numSpans int) { if por.level != configtelemetry.LevelNone { - por.recordData(ctx, config.TracesDataType, int64(0), int64(numSpans), int64(0)) + por.recordData(ctx, component.DataTypeTraces, int64(0), int64(numSpans), int64(0)) } } // TracesDropped reports that the trace data was dropped. func (por *Processor) TracesDropped(ctx context.Context, numSpans int) { if por.level != configtelemetry.LevelNone { - por.recordData(ctx, config.TracesDataType, int64(0), int64(0), int64(numSpans)) + por.recordData(ctx, component.DataTypeTraces, int64(0), int64(0), int64(numSpans)) } } // MetricsAccepted reports that the metrics were accepted. func (por *Processor) MetricsAccepted(ctx context.Context, numPoints int) { if por.level != configtelemetry.LevelNone { - por.recordData(ctx, config.MetricsDataType, int64(numPoints), int64(0), int64(0)) + por.recordData(ctx, component.DataTypeMetrics, int64(numPoints), int64(0), int64(0)) } } // MetricsRefused reports that the metrics were refused. func (por *Processor) MetricsRefused(ctx context.Context, numPoints int) { if por.level != configtelemetry.LevelNone { - por.recordData(ctx, config.MetricsDataType, int64(0), int64(numPoints), int64(0)) + por.recordData(ctx, component.DataTypeMetrics, int64(0), int64(numPoints), int64(0)) } } // MetricsDropped reports that the metrics were dropped. func (por *Processor) MetricsDropped(ctx context.Context, numPoints int) { if por.level != configtelemetry.LevelNone { - por.recordData(ctx, config.MetricsDataType, int64(0), int64(0), int64(numPoints)) + por.recordData(ctx, component.DataTypeMetrics, int64(0), int64(0), int64(numPoints)) } } // LogsAccepted reports that the logs were accepted. func (por *Processor) LogsAccepted(ctx context.Context, numRecords int) { if por.level != configtelemetry.LevelNone { - por.recordData(ctx, config.LogsDataType, int64(numRecords), int64(0), int64(0)) + por.recordData(ctx, component.DataTypeLogs, int64(numRecords), int64(0), int64(0)) } } // LogsRefused reports that the logs were refused. func (por *Processor) LogsRefused(ctx context.Context, numRecords int) { if por.level != configtelemetry.LevelNone { - por.recordData(ctx, config.LogsDataType, int64(0), int64(numRecords), int64(0)) + por.recordData(ctx, component.DataTypeLogs, int64(0), int64(numRecords), int64(0)) } } // LogsDropped reports that the logs were dropped. func (por *Processor) LogsDropped(ctx context.Context, numRecords int) { if por.level != configtelemetry.LevelNone { - por.recordData(ctx, config.LogsDataType, int64(0), int64(0), int64(numRecords)) + por.recordData(ctx, component.DataTypeLogs, int64(0), int64(0), int64(numRecords)) } } diff --git a/obsreport/obsreport_test.go b/obsreport/obsreport_test.go index fe4797ea8b4..f6f1a2b6d34 100644 --- a/obsreport/obsreport_test.go +++ b/obsreport/obsreport_test.go @@ -479,14 +479,15 @@ func TestReceiveWithLongLivedCtx(t *testing.T) { } func TestProcessorTraceData(t *testing.T) { - testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) { + testTelemetry(t, func(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) { const acceptedSpans = 27 const refusedSpans = 19 const droppedSpans = 13 - obsrep := newProcessor(ProcessorSettings{ + obsrep, err := newProcessor(ProcessorSettings{ ProcessorID: processor, ProcessorCreateSettings: tt.ToProcessorCreateSettings(), }, registry) + require.NoError(t, err) obsrep.TracesAccepted(context.Background(), acceptedSpans) obsrep.TracesRefused(context.Background(), refusedSpans) obsrep.TracesDropped(context.Background(), droppedSpans) @@ -496,15 +497,16 @@ func TestProcessorTraceData(t *testing.T) { } func TestProcessorMetricsData(t *testing.T) { - testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) { + testTelemetry(t, func(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) { const acceptedPoints = 29 const refusedPoints = 11 const droppedPoints = 17 - obsrep := newProcessor(ProcessorSettings{ + obsrep, err := newProcessor(ProcessorSettings{ ProcessorID: processor, ProcessorCreateSettings: tt.ToProcessorCreateSettings(), }, registry) + require.NoError(t, err) obsrep.MetricsAccepted(context.Background(), acceptedPoints) obsrep.MetricsRefused(context.Background(), refusedPoints) obsrep.MetricsDropped(context.Background(), droppedPoints) @@ -536,15 +538,16 @@ func TestBuildProcessorCustomMetricName(t *testing.T) { } func TestProcessorLogRecords(t *testing.T) { - testTelemetry(t, func(tt obsreporttest.TestTelemetry, registry *featuregate.Registry) { + testTelemetry(t, func(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) { const acceptedRecords = 29 const refusedRecords = 11 const droppedRecords = 17 - obsrep := newProcessor(ProcessorSettings{ + obsrep, err := newProcessor(ProcessorSettings{ ProcessorID: processor, ProcessorCreateSettings: tt.ToProcessorCreateSettings(), }, registry) + require.NoError(t, err) obsrep.LogsAccepted(context.Background(), acceptedRecords) obsrep.LogsRefused(context.Background(), refusedRecords) obsrep.LogsDropped(context.Background(), droppedRecords) diff --git a/obsreport/obsreporttest/obsreporttest.go b/obsreport/obsreporttest/obsreporttest.go index e2356a1d7bb..1d7e496b8f3 100644 --- a/obsreport/obsreporttest/obsreporttest.go +++ b/obsreport/obsreporttest/obsreporttest.go @@ -157,19 +157,19 @@ func CheckExporterLogs(tts TestTelemetry, exporter component.ID, sentLogRecords, // CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values. // When this function is called it is required to also call SetupTelemetry as first thing. -func CheckProcessorTraces(tts TestTelemetry, processor config.ComponentID, acceptedSpans, refusedSpans, droppedSpans int64) error { +func CheckProcessorTraces(tts TestTelemetry, processor component.ID, acceptedSpans, refusedSpans, droppedSpans int64) error { return tts.otelPrometheusChecker.checkProcessorTraces(processor, acceptedSpans, refusedSpans, droppedSpans) } // CheckProcessorMetrics checks that for the current exported values for metrics exporter metrics match given values. // When this function is called it is required to also call SetupTelemetry as first thing. -func CheckProcessorMetrics(tts TestTelemetry, processor config.ComponentID, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints int64) error { +func CheckProcessorMetrics(tts TestTelemetry, processor component.ID, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints int64) error { return tts.otelPrometheusChecker.checkProcessorMetrics(processor, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints) } // CheckProcessorLogs checks that for the current exported values for logs exporter metrics match given values. // When this function is called it is required to also call SetupTelemetry as first thing. -func CheckProcessorLogs(tts TestTelemetry, processor config.ComponentID, acceptedLogRecords, refusedLogRecords, droppedLogRecords int64) error { +func CheckProcessorLogs(tts TestTelemetry, processor component.ID, acceptedLogRecords, refusedLogRecords, droppedLogRecords int64) error { return tts.otelPrometheusChecker.checkProcessorLogs(processor, acceptedLogRecords, refusedLogRecords, droppedLogRecords) } diff --git a/obsreport/obsreporttest/obsreporttest_test.go b/obsreport/obsreporttest/obsreporttest_test.go index 7021219bceb..d8151a2c1f6 100644 --- a/obsreport/obsreporttest/obsreporttest_test.go +++ b/obsreport/obsreporttest/obsreporttest_test.go @@ -33,9 +33,9 @@ const ( var ( scraper = component.NewID("fakeScraper") - receiver = config.NewComponentID("fakeReicever") - processor = config.NewComponentID("fakeProcessor") - exporter = config.NewComponentID("fakeExporter") + receiver = component.NewID("fakeReicever") + processor= component.NewID("fakeProcessor") + exporter = component.NewID("fakeExporter") ) func TestCheckScraperMetricsViews(t *testing.T) { @@ -127,10 +127,11 @@ func TestCheckProcessorTracesViews(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - por := obsreport.NewProcessor(obsreport.ProcessorSettings{ + por, err := obsreport.NewProcessor(obsreport.ProcessorSettings{ ProcessorID: processor, ProcessorCreateSettings: tt.ToProcessorCreateSettings(), }) + assert.NoError(t, err) por.TracesAccepted(context.Background(), 7) por.TracesRefused(context.Background(), 8) @@ -151,10 +152,11 @@ func TestCheckProcessorMetricsViews(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - por := obsreport.NewProcessor(obsreport.ProcessorSettings{ + por, err := obsreport.NewProcessor(obsreport.ProcessorSettings{ ProcessorID: processor, ProcessorCreateSettings: tt.ToProcessorCreateSettings(), }) + assert.NoError(t, err) por.MetricsAccepted(context.Background(), 7) por.MetricsRefused(context.Background(), 8) @@ -175,10 +177,11 @@ func TestCheckProcessorLogViews(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - por := obsreport.NewProcessor(obsreport.ProcessorSettings{ + por, err := obsreport.NewProcessor(obsreport.ProcessorSettings{ ProcessorID: processor, ProcessorCreateSettings: tt.ToProcessorCreateSettings(), }) + assert.NoError(t, err) por.LogsAccepted(context.Background(), 7) por.LogsRefused(context.Background(), 8) diff --git a/obsreport/obsreporttest/otelprometheuschecker.go b/obsreport/obsreporttest/otelprometheuschecker.go index d9e872d6ac3..cf970050024 100644 --- a/obsreport/obsreporttest/otelprometheuschecker.go +++ b/obsreport/obsreporttest/otelprometheuschecker.go @@ -62,7 +62,7 @@ func (pc *prometheusChecker) checkReceiverMetrics(receiver component.ID, protoco pc.checkCounter("receiver_refused_metric_points", droppedMetricPoints, receiverAttrs)) } -func (pc *prometheusChecker) checkProcessorTraces(processor config.ComponentID, acceptedSpans, refusedSpans, droppedSpans int64) error { +func (pc *prometheusChecker) checkProcessorTraces(processor component.ID, acceptedSpans, refusedSpans, droppedSpans int64) error { processorAttrs := attributesForProcessorMetrics(processor) return multierr.Combine( pc.checkCounter("processor_accepted_spans", acceptedSpans, processorAttrs), @@ -70,7 +70,7 @@ func (pc *prometheusChecker) checkProcessorTraces(processor config.ComponentID, pc.checkCounter("processor_dropped_spans", droppedSpans, processorAttrs)) } -func (pc *prometheusChecker) checkProcessorMetrics(processor config.ComponentID, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints int64) error { +func (pc *prometheusChecker) checkProcessorMetrics(processor component.ID, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints int64) error { processorAttrs := attributesForProcessorMetrics(processor) return multierr.Combine( pc.checkCounter("processor_accepted_metric_points", acceptedMetricPoints, processorAttrs), @@ -78,7 +78,7 @@ func (pc *prometheusChecker) checkProcessorMetrics(processor config.ComponentID, pc.checkCounter("processor_dropped_metric_points", droppedMetricPoints, processorAttrs)) } -func (pc *prometheusChecker) checkProcessorLogs(processor config.ComponentID, acceptedLogRecords, refusedLogRecords, droppedLogRecords int64) error { +func (pc *prometheusChecker) checkProcessorLogs(processor component.ID, acceptedLogRecords, refusedLogRecords, droppedLogRecords int64) error { processorAttrs := attributesForProcessorMetrics(processor) return multierr.Combine( pc.checkCounter("processor_accepted_log_records", acceptedLogRecords, processorAttrs), @@ -86,7 +86,7 @@ func (pc *prometheusChecker) checkProcessorLogs(processor config.ComponentID, ac pc.checkCounter("processor_dropped_log_records", droppedLogRecords, processorAttrs)) } -func (pc *prometheusChecker) checkExporterTraces(exporter config.ComponentID, sentSpans, sendFailedSpans int64) error { +func (pc *prometheusChecker) checkExporterTraces(exporter component.ID, sentSpans, sendFailedSpans int64) error { exporterAttrs := attributesForExporterMetrics(exporter) return multierr.Combine( pc.checkCounter("exporter_sent_spans", sentSpans, exporterAttrs), @@ -191,7 +191,7 @@ func attributesForReceiverMetrics(receiver component.ID, transport string) []att } } -func attributesForProcessorMetrics(processor config.ComponentID) []attribute.KeyValue { +func attributesForProcessorMetrics(processor component.ID) []attribute.KeyValue { return []attribute.KeyValue{ attribute.String(processorTag.Name(), processor.String()), } From aab71bbae11dbd0609ed103f0b57154a66214753 Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Mon, 21 Nov 2022 15:29:00 -0500 Subject: [PATCH 6/6] fix indentation --- obsreport/obsreport_test.go | 68 ++++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 31 deletions(-) diff --git a/obsreport/obsreport_test.go b/obsreport/obsreport_test.go index f6f1a2b6d34..1e5c6172784 100644 --- a/obsreport/obsreport_test.go +++ b/obsreport/obsreport_test.go @@ -479,25 +479,30 @@ func TestReceiveWithLongLivedCtx(t *testing.T) { } func TestProcessorTraceData(t *testing.T) { - testTelemetry(t, func(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) { - const acceptedSpans = 27 - const refusedSpans = 19 - const droppedSpans = 13 - obsrep, err := newProcessor(ProcessorSettings{ - ProcessorID: processor, - ProcessorCreateSettings: tt.ToProcessorCreateSettings(), - }, registry) - require.NoError(t, err) - obsrep.TracesAccepted(context.Background(), acceptedSpans) - obsrep.TracesRefused(context.Background(), refusedSpans) - obsrep.TracesDropped(context.Background(), droppedSpans) + testTelemetry(t, testProcessorTraceData) +} - require.NoError(t, obsreporttest.CheckProcessorTraces(tt, processor, acceptedSpans, refusedSpans, droppedSpans)) - }) +func testProcessorTraceData(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) { + const acceptedSpans = 27 + const refusedSpans = 19 + const droppedSpans = 13 + obsrep, err := newProcessor(ProcessorSettings{ + ProcessorID: processor, + ProcessorCreateSettings: tt.ToProcessorCreateSettings(), + }, registry) + require.NoError(t, err) + obsrep.TracesAccepted(context.Background(), acceptedSpans) + obsrep.TracesRefused(context.Background(), refusedSpans) + obsrep.TracesDropped(context.Background(), droppedSpans) + + require.NoError(t, obsreporttest.CheckProcessorTraces(tt, processor, acceptedSpans, refusedSpans, droppedSpans)) } func TestProcessorMetricsData(t *testing.T) { - testTelemetry(t, func(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) { + testTelemetry(t, testProcessorMetricsData) +} + +func testProcessorMetricsData(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) { const acceptedPoints = 29 const refusedPoints = 11 const droppedPoints = 17 @@ -512,7 +517,6 @@ func TestProcessorMetricsData(t *testing.T) { obsrep.MetricsDropped(context.Background(), droppedPoints) require.NoError(t, obsreporttest.CheckProcessorMetrics(tt, processor, acceptedPoints, refusedPoints, droppedPoints)) - }) } func TestBuildProcessorCustomMetricName(t *testing.T) { @@ -538,20 +542,22 @@ func TestBuildProcessorCustomMetricName(t *testing.T) { } func TestProcessorLogRecords(t *testing.T) { - testTelemetry(t, func(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) { - const acceptedRecords = 29 - const refusedRecords = 11 - const droppedRecords = 17 + testTelemetry(t, testProcessorLogRecords) +} - obsrep, err := newProcessor(ProcessorSettings{ - ProcessorID: processor, - ProcessorCreateSettings: tt.ToProcessorCreateSettings(), - }, registry) - require.NoError(t, err) - obsrep.LogsAccepted(context.Background(), acceptedRecords) - obsrep.LogsRefused(context.Background(), refusedRecords) - obsrep.LogsDropped(context.Background(), droppedRecords) +func testProcessorLogRecords(t *testing.T, tt obsreporttest.TestTelemetry, registry *featuregate.Registry) { + const acceptedRecords = 29 + const refusedRecords = 11 + const droppedRecords = 17 - require.NoError(t, obsreporttest.CheckProcessorLogs(tt, processor, acceptedRecords, refusedRecords, droppedRecords)) - }) -} + obsrep, err := newProcessor(ProcessorSettings{ + ProcessorID: processor, + ProcessorCreateSettings: tt.ToProcessorCreateSettings(), + }, registry) + require.NoError(t, err) + obsrep.LogsAccepted(context.Background(), acceptedRecords) + obsrep.LogsRefused(context.Background(), refusedRecords) + obsrep.LogsDropped(context.Background(), droppedRecords) + + require.NoError(t, obsreporttest.CheckProcessorLogs(tt, processor, acceptedRecords, refusedRecords, droppedRecords)) +} \ No newline at end of file