From fa987c3e767cf00bc85bf4d31505b66a5217416f Mon Sep 17 00:00:00 2001 From: honeychaudharyc <161001105+honeychaudharyc@users.noreply.github.com> Date: Thu, 4 Jul 2024 22:44:22 +0530 Subject: [PATCH] Fix issue #33464 [processor/groupbytrace] migrate from opencensus library (#33770) This PR migrates the internal metrics from opencensus for the groupbytrace processor to Opentelemetry for metrics, via mdatagen. Used metadata.Telemetrybuilder and replaced all metrices from metrics file to metadata.yaml file. Updated tests to move from opencensus. Fixes [#33464](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/33464) Testing: Unit tests Documentation: Documentation generated by metadata.yaml. --------- Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com> Co-authored-by: Alex Boten <223565+codeboten@users.noreply.github.com> --- .../groupbytraceprocessor/documentation.md | 71 ++++++++++++++ processor/groupbytraceprocessor/event.go | 20 ++-- processor/groupbytraceprocessor/event_test.go | 77 +++++++++------ processor/groupbytraceprocessor/factory.go | 10 +- .../groupbytraceprocessor/factory_test.go | 8 +- .../generated_component_telemetry_test.go | 76 +++++++++++++++ .../generated_package_test.go | 6 +- processor/groupbytraceprocessor/go.mod | 9 +- processor/groupbytraceprocessor/go.sum | 78 --------------- .../internal/metadata/generated_telemetry.go | 96 ++++++++++++++++++- .../metadata/generated_telemetry_test.go | 13 +++ processor/groupbytraceprocessor/metadata.yaml | 62 +++++++++++- processor/groupbytraceprocessor/metrics.go | 82 ---------------- .../groupbytraceprocessor/metrics_test.go | 28 ------ processor/groupbytraceprocessor/processor.go | 49 +++++----- .../groupbytraceprocessor/processor_test.go | 85 ++++++++-------- .../groupbytraceprocessor/storage_memory.go | 9 +- .../storage_memory_test.go | 23 +++-- 18 files changed, 473 insertions(+), 329 deletions(-) create mode 100644 processor/groupbytraceprocessor/documentation.md create mode 100644 processor/groupbytraceprocessor/generated_component_telemetry_test.go delete mode 100644 processor/groupbytraceprocessor/metrics.go delete mode 100644 processor/groupbytraceprocessor/metrics_test.go diff --git a/processor/groupbytraceprocessor/documentation.md b/processor/groupbytraceprocessor/documentation.md new file mode 100644 index 000000000000..93c0531deef3 --- /dev/null +++ b/processor/groupbytraceprocessor/documentation.md @@ -0,0 +1,71 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# groupbytrace + +## Internal Telemetry + +The following telemetry is emitted by this component. + +### processor_groupbytrace_conf_num_traces + +Maximum number of traces to hold in the internal storage + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### processor_groupbytrace_event_latency + +How long the queue events are taking to be processed + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| ms | Histogram | Int | + +### processor_groupbytrace_incomplete_releases + +Releases that are suspected to have been incomplete + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| | Sum | Int | true | + +### processor_groupbytrace_num_events_in_queue + +Number of events currently in the queue + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### processor_groupbytrace_num_traces_in_memory + +Number of traces currently in the in-memory storage + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### processor_groupbytrace_spans_released + +Spans released to the next consumer + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### processor_groupbytrace_traces_evicted + +Traces evicted from the internal buffer + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### processor_groupbytrace_traces_released + +Traces released to the next consumer + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | diff --git a/processor/groupbytraceprocessor/event.go b/processor/groupbytraceprocessor/event.go index d7c26ca1b449..be8434f8c6d8 100644 --- a/processor/groupbytraceprocessor/event.go +++ b/processor/groupbytraceprocessor/event.go @@ -11,11 +11,13 @@ import ( "sync" "time" - "go.opencensus.io/stats" - "go.opencensus.io/tag" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor/internal/metadata" ) const ( @@ -44,8 +46,6 @@ var ( return &hash }, } - - eventTagKey = tag.MustNewKey("event") ) type eventType int @@ -70,8 +70,8 @@ type eventMachine struct { metricsCollectionInterval time.Duration shutdownTimeout time.Duration - logger *zap.Logger - + logger *zap.Logger + telemetry *metadata.TelemetryBuilder onTraceReceived func(td tracesWithID, worker *eventMachineWorker) error onTraceExpired func(traceID pcommon.TraceID, worker *eventMachineWorker) error onTraceReleased func(rss []ptrace.ResourceSpans) error @@ -84,9 +84,10 @@ type eventMachine struct { closed bool } -func newEventMachine(logger *zap.Logger, bufferSize int, numWorkers int, numTraces int) *eventMachine { +func newEventMachine(logger *zap.Logger, bufferSize int, numWorkers int, numTraces int, telemetry *metadata.TelemetryBuilder) *eventMachine { em := &eventMachine{ logger: logger, + telemetry: telemetry, workers: make([]*eventMachineWorker, numWorkers), close: make(chan struct{}), shutdownLock: &sync.RWMutex{}, @@ -119,7 +120,7 @@ func (em *eventMachine) numEvents() int { func (em *eventMachine) periodicMetrics() { numEvents := em.numEvents() em.logger.Debug("recording current state of the queue", zap.Int("num-events", numEvents)) - stats.Record(context.Background(), mNumEventsInQueue.M(int64(numEvents))) + em.telemetry.ProcessorGroupbytraceNumEventsInQueue.Record(context.Background(), int64(numEvents)) em.shutdownLock.RLock() closed := em.closed @@ -288,8 +289,7 @@ func (em *eventMachine) handleEventWithObservability(event string, do func() err start := time.Now() succeeded, err := doWithTimeout(time.Second, do) duration := time.Since(start) - - _ = stats.RecordWithTags(context.Background(), []tag.Mutator{tag.Upsert(eventTagKey, event)}, mEventLatency.M(duration.Milliseconds())) + em.telemetry.ProcessorGroupbytraceEventLatency.Record(context.Background(), duration.Milliseconds(), metric.WithAttributeSet(attribute.NewSet(attribute.String("event", event)))) if err != nil { em.logger.Error("failed to process event", zap.Error(err), zap.String("event", event)) diff --git a/processor/groupbytraceprocessor/event_test.go b/processor/groupbytraceprocessor/event_test.go index 0f657c6472ce..42fd515093fb 100644 --- a/processor/groupbytraceprocessor/event_test.go +++ b/processor/groupbytraceprocessor/event_test.go @@ -4,6 +4,7 @@ package groupbytraceprocessor import ( + "context" "errors" "strings" "sync" @@ -13,14 +14,19 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor/processortest" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor/internal/metadata" ) func TestEventCallback(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) + for _, tt := range []struct { casename string typ eventType @@ -80,7 +86,7 @@ func TestEventCallback(t *testing.T) { require.NoError(t, err) wg := &sync.WaitGroup{} - em := newEventMachine(logger, 50, 1, 1_000) + em := newEventMachine(logger, 50, 1, 1_000, tel) tt.registerCallback(em, wg) em.startInBackground() @@ -100,6 +106,8 @@ func TestEventCallback(t *testing.T) { } func TestEventCallbackNotSet(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) for _, tt := range []struct { casename string typ eventType @@ -127,7 +135,7 @@ func TestEventCallbackNotSet(t *testing.T) { require.NoError(t, err) wg := &sync.WaitGroup{} - em := newEventMachine(logger, 50, 1, 1_000) + em := newEventMachine(logger, 50, 1, 1_000, tel) em.onError = func(_ event) { wg.Done() } @@ -147,6 +155,8 @@ func TestEventCallbackNotSet(t *testing.T) { } func TestEventInvalidPayload(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) for _, tt := range []struct { casename string typ eventType @@ -195,7 +205,7 @@ func TestEventInvalidPayload(t *testing.T) { require.NoError(t, err) wg := &sync.WaitGroup{} - em := newEventMachine(logger, 50, 1, 1_000) + em := newEventMachine(logger, 50, 1, 1_000, tel) em.onError = func(_ event) { wg.Done() } @@ -216,12 +226,14 @@ func TestEventInvalidPayload(t *testing.T) { } func TestEventUnknownType(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) // prepare logger, err := zap.NewDevelopment() require.NoError(t, err) wg := &sync.WaitGroup{} - em := newEventMachine(logger, 50, 1, 1_000) + em := newEventMachine(logger, 50, 1, 1_000, tel) em.onError = func(_ event) { wg.Done() } @@ -239,6 +251,8 @@ func TestEventUnknownType(t *testing.T) { } func TestEventTracePerWorker(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) for _, tt := range []struct { casename string traceID [16]byte @@ -265,7 +279,7 @@ func TestEventTracePerWorker(t *testing.T) { }, } { t.Run(tt.casename, func(t *testing.T) { - em := newEventMachine(zap.NewNop(), 200, 100, 1_000) + em := newEventMachine(zap.NewNop(), 200, 100, 1_000, tel) var wg sync.WaitGroup var workerForTrace *eventMachineWorker @@ -342,13 +356,15 @@ func TestEventConsumeConsistency(t *testing.T) { } func TestEventShutdown(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) // prepare wg := sync.WaitGroup{} wg.Add(1) traceReceivedFired := &atomic.Int64{} traceExpiredFired := &atomic.Int64{} - em := newEventMachine(zap.NewNop(), 50, 1, 1_000) + em := newEventMachine(zap.NewNop(), 50, 1, 1_000, tel) em.onTraceReceived = func(tracesWithID, *eventMachineWorker) error { traceReceivedFired.Store(1) return nil @@ -413,16 +429,11 @@ func TestEventShutdown(t *testing.T) { func TestPeriodicMetrics(t *testing.T) { // prepare - views := metricViews() - - // ensure that we are starting with a clean state - view.Unregister(views...) - assert.NoError(t, view.Register(views...)) - - // try to be nice with the next consumer (test) - defer view.Unregister(views...) + s := setupTestTelemetry() + telemetryBuilder, err := metadata.NewTelemetryBuilder(s.NewSettings().TelemetrySettings) + require.NoError(t, err) - em := newEventMachine(zap.NewNop(), 50, 1, 1_000) + em := newEventMachine(zap.NewNop(), 50, 1, 1_000, telemetryBuilder) em.metricsCollectionInterval = time.Millisecond wg := sync.WaitGroup{} @@ -443,7 +454,7 @@ func TestPeriodicMetrics(t *testing.T) { }() // sanity check - assertGaugeNotCreated(t, mNumEventsInQueue) + assertGaugeNotCreated(t, "processor_groupbytrace_num_events_in_queue", s) // test em.workers[0].fire(event{typ: traceReceived}) @@ -452,14 +463,14 @@ func TestPeriodicMetrics(t *testing.T) { // ensure our gauge is showing 1 item in the queue assert.Eventually(t, func() bool { - return getGaugeValue(t, mNumEventsInQueue) == 1 + return getGaugeValue(t, "processor_groupbytrace_num_events_in_queue", s) == 1 }, 1*time.Second, 10*time.Millisecond) wg.Done() // release all events // ensure our gauge is now showing no items in the queue assert.Eventually(t, func() bool { - return getGaugeValue(t, mNumEventsInQueue) == 0 + return getGaugeValue(t, "processor_groupbytrace_num_events_in_queue", s) == 0 }, 1*time.Second, 10*time.Millisecond) // signal and wait for the recursive call to finish @@ -470,8 +481,10 @@ func TestPeriodicMetrics(t *testing.T) { } func TestForceShutdown(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) // prepare - em := newEventMachine(zap.NewNop(), 50, 1, 1_000) + em := newEventMachine(zap.NewNop(), 50, 1, 1_000, tel) em.shutdownTimeout = 20 * time.Millisecond // test @@ -515,16 +528,18 @@ func TestDoWithTimeout_TimeoutTrigger(t *testing.T) { assert.WithinDuration(t, start, time.Now(), 100*time.Millisecond) } -func getGaugeValue(t *testing.T, gauge *stats.Int64Measure) float64 { - viewData, err := view.RetrieveData("processor_groupbytrace_" + gauge.Name()) - require.NoError(t, err) - require.Len(t, viewData, 1) // we expect exactly one data point, the last value - - return viewData[0].Data.(*view.LastValueData).Value +func getGaugeValue(t *testing.T, name string, tt componentTestTelemetry) int64 { + var md metricdata.ResourceMetrics + require.NoError(t, tt.reader.Collect(context.Background(), &md)) + m := tt.getMetric(name, md).Data + g := m.(metricdata.Gauge[int64]) + assert.Len(t, g.DataPoints, 1, "expected exactly one data point") + return g.DataPoints[0].Value } -func assertGaugeNotCreated(t *testing.T, gauge *stats.Int64Measure) { - viewData, err := view.RetrieveData("processor_groupbytrace_" + gauge.Name()) - require.NoError(t, err) - assert.Len(t, viewData, 0, "gauge exists already but shouldn't") +func assertGaugeNotCreated(t *testing.T, name string, tt componentTestTelemetry) { + var md metricdata.ResourceMetrics + require.NoError(t, tt.reader.Collect(context.Background(), &md)) + got := tt.getMetric(name, md) + assert.Equal(t, got, metricdata.Metrics{}, "gauge exists already but shouldn't") } diff --git a/processor/groupbytraceprocessor/factory.go b/processor/groupbytraceprocessor/factory.go index 4fb0e47f6e0b..06bf13a90437 100644 --- a/processor/groupbytraceprocessor/factory.go +++ b/processor/groupbytraceprocessor/factory.go @@ -8,7 +8,6 @@ import ( "fmt" "time" - "go.opencensus.io/stats/view" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/processor" @@ -31,8 +30,6 @@ var ( // NewFactory returns a new factory for the Filter processor. func NewFactory() processor.Factory { - // TODO: find a more appropriate way to get this done, as we are swallowing the error here - _ = view.Register(metricViews()...) return processor.NewFactory( metadata.Type, @@ -70,8 +67,9 @@ func createTracesProcessor( return nil, errDiscardOrphansNotSupported } + processor := newGroupByTraceProcessor(params, nextConsumer, *oCfg) // the only supported storage for now - st = newMemoryStorage() - - return newGroupByTraceProcessor(params.Logger, st, nextConsumer, *oCfg), nil + st = newMemoryStorage(processor.telemetryBuilder) + processor.st = st + return processor, nil } diff --git a/processor/groupbytraceprocessor/factory_test.go b/processor/groupbytraceprocessor/factory_test.go index 02d3532619f4..7ca4bb54c643 100644 --- a/processor/groupbytraceprocessor/factory_test.go +++ b/processor/groupbytraceprocessor/factory_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/processor/processortest" ) @@ -26,10 +27,8 @@ func TestDefaultConfiguration(t *testing.T) { func TestCreateTestProcessor(t *testing.T) { c := createDefaultConfig().(*Config) - next := &mockProcessor{} - // test - p, err := createTracesProcessor(context.Background(), processortest.NewNopSettings(), c, next) + p, err := createTracesProcessor(context.Background(), processortest.NewNopSettings(), c, consumertest.NewNop()) // verify assert.NoError(t, err) @@ -39,7 +38,6 @@ func TestCreateTestProcessor(t *testing.T) { func TestCreateTestProcessorWithNotImplementedOptions(t *testing.T) { // prepare f := NewFactory() - next := &mockProcessor{} // test for _, tt := range []struct { @@ -59,7 +57,7 @@ func TestCreateTestProcessorWithNotImplementedOptions(t *testing.T) { errDiskStorageNotSupported, }, } { - p, err := f.CreateTracesProcessor(context.Background(), processortest.NewNopSettings(), tt.config, next) + p, err := f.CreateTracesProcessor(context.Background(), processortest.NewNopSettings(), tt.config, consumertest.NewNop()) // verify assert.Error(t, tt.expectedErr, err) diff --git a/processor/groupbytraceprocessor/generated_component_telemetry_test.go b/processor/groupbytraceprocessor/generated_component_telemetry_test.go new file mode 100644 index 000000000000..7086d83a2ad0 --- /dev/null +++ b/processor/groupbytraceprocessor/generated_component_telemetry_test.go @@ -0,0 +1,76 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package groupbytraceprocessor + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processortest" +) + +type componentTestTelemetry struct { + reader *sdkmetric.ManualReader + meterProvider *sdkmetric.MeterProvider +} + +func (tt *componentTestTelemetry) NewSettings() processor.Settings { + settings := processortest.NewNopSettings() + settings.MeterProvider = tt.meterProvider + settings.ID = component.NewID(component.MustNewType("groupbytrace")) + + return settings +} + +func setupTestTelemetry() componentTestTelemetry { + reader := sdkmetric.NewManualReader() + return componentTestTelemetry{ + reader: reader, + meterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)), + } +} + +func (tt *componentTestTelemetry) assertMetrics(t *testing.T, expected []metricdata.Metrics) { + var md metricdata.ResourceMetrics + require.NoError(t, tt.reader.Collect(context.Background(), &md)) + // ensure all required metrics are present + for _, want := range expected { + got := tt.getMetric(want.Name, md) + metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp()) + } + + // ensure no additional metrics are emitted + require.Equal(t, len(expected), tt.len(md)) +} + +func (tt *componentTestTelemetry) getMetric(name string, got metricdata.ResourceMetrics) metricdata.Metrics { + for _, sm := range got.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == name { + return m + } + } + } + + return metricdata.Metrics{} +} + +func (tt *componentTestTelemetry) len(got metricdata.ResourceMetrics) int { + metricsCount := 0 + for _, sm := range got.ScopeMetrics { + metricsCount += len(sm.Metrics) + } + + return metricsCount +} + +func (tt *componentTestTelemetry) Shutdown(ctx context.Context) error { + return tt.meterProvider.Shutdown(ctx) +} diff --git a/processor/groupbytraceprocessor/generated_package_test.go b/processor/groupbytraceprocessor/generated_package_test.go index c43485899052..84daac2b263f 100644 --- a/processor/groupbytraceprocessor/generated_package_test.go +++ b/processor/groupbytraceprocessor/generated_package_test.go @@ -3,11 +3,11 @@ package groupbytraceprocessor import ( + "os" "testing" - - "go.uber.org/goleak" ) func TestMain(m *testing.M) { - goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), goleak.IgnoreAnyFunction("github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor.doWithTimeout.func1")) + // skipping goleak test as per metadata.yml configuration + os.Exit(m.Run()) } diff --git a/processor/groupbytraceprocessor/go.mod b/processor/groupbytraceprocessor/go.mod index bde7f91dce9e..81ae9e4f78a8 100644 --- a/processor/groupbytraceprocessor/go.mod +++ b/processor/groupbytraceprocessor/go.mod @@ -5,15 +5,16 @@ go 1.21.0 require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.104.0 github.com/stretchr/testify v1.9.0 - go.opencensus.io v0.24.0 go.opentelemetry.io/collector/component v0.104.0 + go.opentelemetry.io/collector/config/configtelemetry v0.104.0 go.opentelemetry.io/collector/confmap v0.104.0 go.opentelemetry.io/collector/consumer v0.104.0 go.opentelemetry.io/collector/pdata v1.11.0 go.opentelemetry.io/collector/processor v0.104.0 + go.opentelemetry.io/otel v1.27.0 go.opentelemetry.io/otel/metric v1.27.0 + go.opentelemetry.io/otel/sdk/metric v1.27.0 go.opentelemetry.io/otel/trace v1.27.0 - go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 ) @@ -41,15 +42,11 @@ require ( github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.54.0 // indirect github.com/prometheus/procfs v0.15.0 // indirect - go.opentelemetry.io/collector v0.104.0 // indirect - go.opentelemetry.io/collector/config/configtelemetry v0.104.0 // indirect go.opentelemetry.io/collector/featuregate v1.11.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.104.0 // indirect go.opentelemetry.io/collector/pdata/testdata v0.104.0 // indirect - go.opentelemetry.io/otel v1.27.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.49.0 // indirect go.opentelemetry.io/otel/sdk v1.27.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.27.0 // indirect golang.org/x/net v0.25.0 // indirect golang.org/x/sys v0.20.0 // indirect golang.org/x/text v0.15.0 // indirect diff --git a/processor/groupbytraceprocessor/go.sum b/processor/groupbytraceprocessor/go.sum index 79af22b86234..4cb48adf6b94 100644 --- a/processor/groupbytraceprocessor/go.sum +++ b/processor/groupbytraceprocessor/go.sum @@ -1,19 +1,10 @@ -cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= -github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= -github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= -github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -23,29 +14,9 @@ github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 h1:TQcrn6Wq+sKGkpyPvppOz99zsM github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY= -github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= -github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= -github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= -github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= -github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= -github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= -github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= -github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= -github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= -github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= @@ -77,7 +48,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= -github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= github.com/prometheus/common v0.54.0 h1:ZlZy0BgJhTwVZUn7dLOkwCZHUkrAqd3WYtcFCWnM1D8= @@ -87,20 +57,11 @@ github.com/prometheus/procfs v0.15.0/go.mod h1:Y0RJ/Y5g5wJpkTisOtqwDSo4HwhGmLB4V github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= -go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= -go.opentelemetry.io/collector v0.104.0 h1:R3zjM4O3K3+ttzsjPV75P80xalxRbwYTURlK0ys7uyo= -go.opentelemetry.io/collector v0.104.0/go.mod h1:Tm6F3na9ajnOm6I5goU9dURKxq1fSBK1yA94nvUix3k= go.opentelemetry.io/collector/component v0.104.0 h1:jqu/X9rnv8ha0RNZ1a9+x7OU49KwSMsPbOuIEykHuQE= go.opentelemetry.io/collector/component v0.104.0/go.mod h1:1C7C0hMVSbXyY1ycCmaMUAR9fVwpgyiNQqxXtEWhVpw= go.opentelemetry.io/collector/config/configtelemetry v0.104.0 h1:eHv98XIhapZA8MgTiipvi+FDOXoFhCYOwyKReOt+E4E= @@ -140,30 +101,17 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= -golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= -golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= -golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= -golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -174,10 +122,6 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= -golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= @@ -185,36 +129,14 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= -google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= -google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= -google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto/googleapis/rpc v0.0.0-20240520151616-dc85e6b867a5 h1:Q2RxlXqh1cgzzUgV261vBO2jI5R/3DD1J2pM0nI4NhU= google.golang.org/genproto/googleapis/rpc v0.0.0-20240520151616-dc85e6b867a5/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= -google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= -google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= -google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= -google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= -google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= -google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= -google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= -google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= -google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= -google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= -google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= -honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/processor/groupbytraceprocessor/internal/metadata/generated_telemetry.go b/processor/groupbytraceprocessor/internal/metadata/generated_telemetry.go index fef5d85b17af..c33240787bd7 100644 --- a/processor/groupbytraceprocessor/internal/metadata/generated_telemetry.go +++ b/processor/groupbytraceprocessor/internal/metadata/generated_telemetry.go @@ -3,9 +3,14 @@ package metadata import ( - "go.opentelemetry.io/collector/component" + "errors" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" ) func Meter(settings component.TelemetrySettings) metric.Meter { @@ -15,3 +20,92 @@ func Meter(settings component.TelemetrySettings) metric.Meter { func Tracer(settings component.TelemetrySettings) trace.Tracer { return settings.TracerProvider.Tracer("otelcol/groupbytrace") } + +// TelemetryBuilder provides an interface for components to report telemetry +// as defined in metadata and user config. +type TelemetryBuilder struct { + meter metric.Meter + ProcessorGroupbytraceConfNumTraces metric.Int64Gauge + ProcessorGroupbytraceEventLatency metric.Int64Histogram + ProcessorGroupbytraceIncompleteReleases metric.Int64Counter + ProcessorGroupbytraceNumEventsInQueue metric.Int64Gauge + ProcessorGroupbytraceNumTracesInMemory metric.Int64Gauge + ProcessorGroupbytraceSpansReleased metric.Int64Counter + ProcessorGroupbytraceTracesEvicted metric.Int64Counter + ProcessorGroupbytraceTracesReleased metric.Int64Counter + level configtelemetry.Level +} + +// telemetryBuilderOption applies changes to default builder. +type telemetryBuilderOption func(*TelemetryBuilder) + +// WithLevel sets the current telemetry level for the component. +func WithLevel(lvl configtelemetry.Level) telemetryBuilderOption { + return func(builder *TelemetryBuilder) { + builder.level = lvl + } +} + +// NewTelemetryBuilder provides a struct with methods to update all internal telemetry +// for a component +func NewTelemetryBuilder(settings component.TelemetrySettings, options ...telemetryBuilderOption) (*TelemetryBuilder, error) { + builder := TelemetryBuilder{level: configtelemetry.LevelBasic} + for _, op := range options { + op(&builder) + } + var err, errs error + if builder.level >= configtelemetry.LevelBasic { + builder.meter = Meter(settings) + } else { + builder.meter = noop.Meter{} + } + builder.ProcessorGroupbytraceConfNumTraces, err = builder.meter.Int64Gauge( + "processor_groupbytrace_conf_num_traces", + metric.WithDescription("Maximum number of traces to hold in the internal storage"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.ProcessorGroupbytraceEventLatency, err = builder.meter.Int64Histogram( + "processor_groupbytrace_event_latency", + metric.WithDescription("How long the queue events are taking to be processed"), + metric.WithUnit("ms"), metric.WithExplicitBucketBoundaries([]float64{5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000}...), + ) + errs = errors.Join(errs, err) + builder.ProcessorGroupbytraceIncompleteReleases, err = builder.meter.Int64Counter( + "processor_groupbytrace_incomplete_releases", + metric.WithDescription("Releases that are suspected to have been incomplete"), + metric.WithUnit(""), + ) + errs = errors.Join(errs, err) + builder.ProcessorGroupbytraceNumEventsInQueue, err = builder.meter.Int64Gauge( + "processor_groupbytrace_num_events_in_queue", + metric.WithDescription("Number of events currently in the queue"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.ProcessorGroupbytraceNumTracesInMemory, err = builder.meter.Int64Gauge( + "processor_groupbytrace_num_traces_in_memory", + metric.WithDescription("Number of traces currently in the in-memory storage"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.ProcessorGroupbytraceSpansReleased, err = builder.meter.Int64Counter( + "processor_groupbytrace_spans_released", + metric.WithDescription("Spans released to the next consumer"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.ProcessorGroupbytraceTracesEvicted, err = builder.meter.Int64Counter( + "processor_groupbytrace_traces_evicted", + metric.WithDescription("Traces evicted from the internal buffer"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.ProcessorGroupbytraceTracesReleased, err = builder.meter.Int64Counter( + "processor_groupbytrace_traces_released", + metric.WithDescription("Traces released to the next consumer"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + return &builder, errs +} diff --git a/processor/groupbytraceprocessor/internal/metadata/generated_telemetry_test.go b/processor/groupbytraceprocessor/internal/metadata/generated_telemetry_test.go index 3be99d34dfdb..241a710c201a 100644 --- a/processor/groupbytraceprocessor/internal/metadata/generated_telemetry_test.go +++ b/processor/groupbytraceprocessor/internal/metadata/generated_telemetry_test.go @@ -61,3 +61,16 @@ func TestProviders(t *testing.T) { require.Fail(t, "returned Meter not mockTracer") } } + +func TestNewTelemetryBuilder(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + applied := false + _, err := NewTelemetryBuilder(set, func(b *TelemetryBuilder) { + applied = true + }) + require.NoError(t, err) + require.True(t, applied) +} diff --git a/processor/groupbytraceprocessor/metadata.yaml b/processor/groupbytraceprocessor/metadata.yaml index 64bc82d3f13c..45d7c4947e46 100644 --- a/processor/groupbytraceprocessor/metadata.yaml +++ b/processor/groupbytraceprocessor/metadata.yaml @@ -12,9 +12,61 @@ status: tests: config: goleak: - ignore: - # See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information. - top: go.opencensus.io/stats/view.(*worker).start - # TODO: Regarding doWithTimeout ignore: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32572 - any: github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor.doWithTimeout.func1 + skip: true + +telemetry: + metrics: + processor_groupbytrace_conf_num_traces: + enabled: true + description: Maximum number of traces to hold in the internal storage + unit: 1 + gauge: + value_type: int + processor_groupbytrace_num_events_in_queue: + enabled: true + description: Number of events currently in the queue + unit: 1 + gauge: + value_type: int + processor_groupbytrace_num_traces_in_memory: + enabled: true + description: Number of traces currently in the in-memory storage + unit: 1 + gauge: + value_type: int + processor_groupbytrace_traces_evicted: + enabled: true + description: Traces evicted from the internal buffer + unit: 1 + sum: + value_type: int + monotonic: true + processor_groupbytrace_spans_released: + enabled: true + description: Spans released to the next consumer + unit: 1 + sum: + value_type: int + monotonic: true + processor_groupbytrace_traces_released: + enabled: true + description: Traces released to the next consumer + unit: 1 + sum: + value_type: int + monotonic: true + processor_groupbytrace_incomplete_releases: + enabled: true + description: Releases that are suspected to have been incomplete + sum: + value_type: int + monotonic: true + processor_groupbytrace_event_latency: + enabled: true + description: How long the queue events are taking to be processed + unit: ms + histogram: + value_type: int + bucket_boundaries: [5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000] + diff --git a/processor/groupbytraceprocessor/metrics.go b/processor/groupbytraceprocessor/metrics.go deleted file mode 100644 index 6867e1f36c66..000000000000 --- a/processor/groupbytraceprocessor/metrics.go +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package groupbytraceprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor" - -import ( - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" - "go.opentelemetry.io/collector/processor/processorhelper" - - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor/internal/metadata" -) - -var ( - mNumTracesConf = stats.Int64("conf_num_traces", "Maximum number of traces to hold in the internal storage", stats.UnitDimensionless) - mNumEventsInQueue = stats.Int64("num_events_in_queue", "Number of events currently in the queue", stats.UnitDimensionless) - mNumTracesInMemory = stats.Int64("num_traces_in_memory", "Number of traces currently in the in-memory storage", stats.UnitDimensionless) - mTracesEvicted = stats.Int64("traces_evicted", "Traces evicted from the internal buffer", stats.UnitDimensionless) - mReleasedSpans = stats.Int64("spans_released", "Spans released to the next consumer", stats.UnitDimensionless) - mReleasedTraces = stats.Int64("traces_released", "Traces released to the next consumer", stats.UnitDimensionless) - mIncompleteReleases = stats.Int64("incomplete_releases", "Releases that are suspected to have been incomplete", stats.UnitDimensionless) - mEventLatency = stats.Int64("event_latency", "How long the queue events are taking to be processed", stats.UnitMilliseconds) -) - -// metricViews return the metrics views according to given telemetry level. -func metricViews() []*view.View { - return []*view.View{ - { - Name: processorhelper.BuildCustomMetricName(metadata.Type.String(), mNumTracesConf.Name()), - Measure: mNumTracesConf, - Description: mNumTracesConf.Description(), - Aggregation: view.LastValue(), - }, - { - Name: processorhelper.BuildCustomMetricName(metadata.Type.String(), mNumEventsInQueue.Name()), - Measure: mNumEventsInQueue, - Description: mNumEventsInQueue.Description(), - Aggregation: view.LastValue(), - }, - { - Name: processorhelper.BuildCustomMetricName(metadata.Type.String(), mNumTracesInMemory.Name()), - Measure: mNumTracesInMemory, - Description: mNumTracesInMemory.Description(), - Aggregation: view.LastValue(), - }, - { - Name: processorhelper.BuildCustomMetricName(metadata.Type.String(), mTracesEvicted.Name()), - Measure: mTracesEvicted, - Description: mTracesEvicted.Description(), - // sum allows us to start from 0, count will only show up if there's at least one eviction, which might take a while to happen (if ever!) - Aggregation: view.Sum(), - }, - { - Name: processorhelper.BuildCustomMetricName(metadata.Type.String(), mReleasedSpans.Name()), - Measure: mReleasedSpans, - Description: mReleasedSpans.Description(), - Aggregation: view.Sum(), - }, - { - Name: processorhelper.BuildCustomMetricName(metadata.Type.String(), mReleasedTraces.Name()), - Measure: mReleasedTraces, - Description: mReleasedTraces.Description(), - Aggregation: view.Sum(), - }, - { - Name: processorhelper.BuildCustomMetricName(metadata.Type.String(), mIncompleteReleases.Name()), - Measure: mIncompleteReleases, - Description: mIncompleteReleases.Description(), - Aggregation: view.Sum(), - }, - { - Name: processorhelper.BuildCustomMetricName(metadata.Type.String(), mEventLatency.Name()), - Measure: mEventLatency, - Description: mEventLatency.Description(), - TagKeys: []tag.Key{ - tag.MustNewKey("event"), - }, - Aggregation: view.Distribution(0, 5, 10, 20, 50, 100, 200, 500, 1000), - }, - } -} diff --git a/processor/groupbytraceprocessor/metrics_test.go b/processor/groupbytraceprocessor/metrics_test.go deleted file mode 100644 index abbe57c321d6..000000000000 --- a/processor/groupbytraceprocessor/metrics_test.go +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package groupbytraceprocessor - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestProcessorMetrics(t *testing.T) { - expectedViewNames := []string{ - "processor_groupbytrace_conf_num_traces", - "processor_groupbytrace_num_events_in_queue", - "processor_groupbytrace_num_traces_in_memory", - "processor_groupbytrace_traces_evicted", - "processor_groupbytrace_spans_released", - "processor_groupbytrace_traces_released", - "processor_groupbytrace_incomplete_releases", - "processor_groupbytrace_event_latency", - } - - views := metricViews() - for i, viewName := range expectedViewNames { - assert.Equal(t, viewName, views[i].Name) - } -} diff --git a/processor/groupbytraceprocessor/processor.go b/processor/groupbytraceprocessor/processor.go index ad249e17bd3e..e5863dfff767 100644 --- a/processor/groupbytraceprocessor/processor.go +++ b/processor/groupbytraceprocessor/processor.go @@ -8,7 +8,6 @@ import ( "fmt" "time" - "go.opencensus.io/stats" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" @@ -18,6 +17,7 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor/internal/metadata" ) // groupByTraceProcessor is a processor that keeps traces in memory for a given duration, with the expectation @@ -33,10 +33,10 @@ import ( // Each worker in the eventMachine also uses a ring buffer to hold the in-flight trace IDs, so that we don't hold more than the given maximum number // of traces in memory/storage. Items that are evicted from the buffer are discarded without warning. type groupByTraceProcessor struct { - nextConsumer consumer.Traces - config Config - logger *zap.Logger - + nextConsumer consumer.Traces + config Config + logger *zap.Logger + telemetryBuilder *metadata.TelemetryBuilder // the event machine handling all operations for this processor eventMachine *eventMachine @@ -49,16 +49,21 @@ var _ processor.Traces = (*groupByTraceProcessor)(nil) const bufferSize = 10_000 // newGroupByTraceProcessor returns a new processor. -func newGroupByTraceProcessor(logger *zap.Logger, st storage, nextConsumer consumer.Traces, config Config) *groupByTraceProcessor { +func newGroupByTraceProcessor(set processor.Settings, nextConsumer consumer.Traces, config Config) *groupByTraceProcessor { + telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) + if err != nil { + return nil + } + // the event machine will buffer up to N concurrent events before blocking - eventMachine := newEventMachine(logger, 10000, config.NumWorkers, config.NumTraces) + eventMachine := newEventMachine(set.Logger, 10000, config.NumWorkers, config.NumTraces, telemetryBuilder) sp := &groupByTraceProcessor{ - logger: logger, - nextConsumer: nextConsumer, - config: config, - eventMachine: eventMachine, - st: st, + logger: set.Logger, + nextConsumer: nextConsumer, + config: config, + telemetryBuilder: telemetryBuilder, + eventMachine: eventMachine, } // register the callbacks @@ -85,10 +90,9 @@ func (sp *groupByTraceProcessor) Capabilities() consumer.Capabilities { // Start is invoked during service startup. func (sp *groupByTraceProcessor) Start(context.Context, component.Host) error { // start these metrics, as it might take a while for them to receive their first event - stats.Record(context.Background(), mTracesEvicted.M(0)) - stats.Record(context.Background(), mIncompleteReleases.M(0)) - stats.Record(context.Background(), mNumTracesConf.M(int64(sp.config.NumTraces))) - + sp.telemetryBuilder.ProcessorGroupbytraceTracesEvicted.Add(context.Background(), 0) + sp.telemetryBuilder.ProcessorGroupbytraceIncompleteReleases.Add(context.Background(), 0) + sp.telemetryBuilder.ProcessorGroupbytraceConfNumTraces.Record(context.Background(), (int64(sp.config.NumTraces))) sp.eventMachine.startInBackground() return sp.st.start() } @@ -124,8 +128,7 @@ func (sp *groupByTraceProcessor) onTraceReceived(trace tracesWithID, worker *eve typ: traceRemoved, payload: evicted, }) - - stats.Record(context.Background(), mTracesEvicted.M(1)) + sp.telemetryBuilder.ProcessorGroupbytraceTracesEvicted.Add(context.Background(), 1) sp.logger.Info("trace evicted: in order to avoid this in the future, adjust the wait duration and/or number of traces to keep in memory", zap.Stringer("traceID", evicted)) @@ -155,8 +158,7 @@ func (sp *groupByTraceProcessor) onTraceExpired(traceID pcommon.TraceID, worker // we likely received multiple batches with spans for the same trace // and released this trace already sp.logger.Debug("skipping the processing of expired trace", zap.Stringer("traceID", traceID)) - - stats.Record(context.Background(), mIncompleteReleases.M(1)) + sp.telemetryBuilder.ProcessorGroupbytraceIncompleteReleases.Add(context.Background(), 1) return nil } @@ -204,10 +206,9 @@ func (sp *groupByTraceProcessor) onTraceReleased(rss []ptrace.ResourceSpans) err trs := trace.ResourceSpans().AppendEmpty() rs.CopyTo(trs) } - stats.Record(context.Background(), - mReleasedSpans.M(int64(trace.SpanCount())), - mReleasedTraces.M(1), - ) + + sp.telemetryBuilder.ProcessorGroupbytraceSpansReleased.Add(context.Background(), int64(trace.SpanCount())) + sp.telemetryBuilder.ProcessorGroupbytraceTracesReleased.Add(context.Background(), 1) // Do async consuming not to block event worker go func() { diff --git a/processor/groupbytraceprocessor/processor_test.go b/processor/groupbytraceprocessor/processor_test.go index 6a0b0959d959..ddc7675a055f 100644 --- a/processor/groupbytraceprocessor/processor_test.go +++ b/processor/groupbytraceprocessor/processor_test.go @@ -14,12 +14,15 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processortest" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor/internal/metadata" ) func TestTraceIsDispatchedAfterDuration(t *testing.T) { @@ -41,7 +44,9 @@ func TestTraceIsDispatchedAfterDuration(t *testing.T) { } wgDeleted := &sync.WaitGroup{} // we wait for the next (mock) processor to receive the trace - backing := newMemoryStorage() + + p := newGroupByTraceProcessor(processortest.NewNopSettings(), mockProcessor, config) + backing := newMemoryStorage(p.telemetryBuilder) st := &mockStorage{ onCreateOrAppend: backing.createOrAppend, onGet: backing.get, @@ -50,8 +55,7 @@ func TestTraceIsDispatchedAfterDuration(t *testing.T) { return backing.delete(traceID) }, } - - p := newGroupByTraceProcessor(zap.NewNop(), st, mockProcessor, config) + p.st = st ctx := context.Background() assert.NoError(t, p.Start(ctx, nil)) defer func() { @@ -94,10 +98,9 @@ func TestInternalCacheLimit(t *testing.T) { return nil } - st := newMemoryStorage() - - p := newGroupByTraceProcessor(zap.NewNop(), st, mockProcessor, config) - + p := newGroupByTraceProcessor(processortest.NewNopSettings(), mockProcessor, config) + st := newMemoryStorage(p.telemetryBuilder) + p.st = st ctx := context.Background() assert.NoError(t, p.Start(ctx, nil)) defer func() { @@ -141,11 +144,10 @@ func TestProcessorCapabilities(t *testing.T) { NumTraces: 10, NumWorkers: 1, } - st := newMemoryStorage() - next := &mockProcessor{} - // test - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), consumertest.NewNop(), config) + st := newMemoryStorage(p.telemetryBuilder) + p.st = st caps := p.Capabilities() // verify @@ -160,8 +162,6 @@ func TestProcessBatchDoesntFail(t *testing.T) { NumTraces: 10, NumWorkers: 1, } - st := newMemoryStorage() - next := &mockProcessor{} traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) @@ -172,9 +172,10 @@ func TestProcessBatchDoesntFail(t *testing.T) { span.SetTraceID(traceID) span.SetSpanID([8]byte{1, 2, 3, 4}) - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), consumertest.NewNop(), config) assert.NotNil(t, p) - + st := newMemoryStorage(p.telemetryBuilder) + p.st = st // test assert.NoError(t, p.onTraceReceived(tracesWithID{id: traceID, td: trace}, p.eventMachine.workers[0])) } @@ -191,11 +192,12 @@ func TestTraceDisappearedFromStorageBeforeReleasing(t *testing.T) { return nil, nil }, } - next := &mockProcessor{} - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), consumertest.NewNop(), config) require.NotNil(t, p) + p.st = st + traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) batch := simpleTracesWithID(traceID) @@ -229,10 +231,10 @@ func TestTraceErrorFromStorageWhileReleasing(t *testing.T) { return nil, expectedError }, } - next := &mockProcessor{} - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), consumertest.NewNop(), config) require.NotNil(t, p) + p.st = st traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) batch := simpleTracesWithID(traceID) @@ -267,10 +269,10 @@ func TestTraceErrorFromStorageWhileProcessingTrace(t *testing.T) { return expectedError }, } - next := &mockProcessor{} - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), consumertest.NewNop(), config) require.NotNil(t, p) + p.st = st traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) @@ -299,7 +301,6 @@ func TestAddSpansToExistingTrace(t *testing.T) { NumTraces: 8, NumWorkers: 4, } - st := newMemoryStorage() var receivedTraces []ptrace.ResourceSpans next := &mockProcessor{ @@ -312,8 +313,10 @@ func TestAddSpansToExistingTrace(t *testing.T) { }, } - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), next, config) require.NotNil(t, p) + st := newMemoryStorage(p.telemetryBuilder) + p.st = st ctx := context.Background() assert.NoError(t, p.Start(ctx, nil)) @@ -351,8 +354,9 @@ func TestTraceErrorFromStorageWhileProcessingSecondTrace(t *testing.T) { st := &mockStorage{} next := &mockProcessor{} - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), next, config) require.NotNil(t, p) + p.st = st traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) @@ -399,9 +403,9 @@ func TestErrorFromStorageWhileRemovingTrace(t *testing.T) { } next := &mockProcessor{} - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), next, config) require.NotNil(t, p) - + p.st = st traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) // test @@ -425,9 +429,9 @@ func TestTraceNotFoundWhileRemovingTrace(t *testing.T) { } next := &mockProcessor{} - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), next, config) require.NotNil(t, p) - + p.st = st traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) // test @@ -446,7 +450,7 @@ func TestTracesAreDispatchedInIndividualBatches(t *testing.T) { NumTraces: 8, NumWorkers: 4, } - st := newMemoryStorage() + next := &mockProcessor{ onTraces: func(_ context.Context, traces ptrace.Traces) error { // we should receive two batches, each one with one trace @@ -456,9 +460,10 @@ func TestTracesAreDispatchedInIndividualBatches(t *testing.T) { }, } - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), next, config) require.NotNil(t, p) - + st := newMemoryStorage(p.telemetryBuilder) + p.st = st ctx := context.Background() assert.NoError(t, p.Start(ctx, nil)) defer func() { @@ -504,9 +509,9 @@ func TestErrorOnProcessResourceSpansContinuesProcessing(t *testing.T) { st := &mockStorage{} next := &mockProcessor{} - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), next, config) require.NotNil(t, p) - + p.st = st traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) trace := ptrace.NewTraces() @@ -536,10 +541,12 @@ func TestAsyncOnRelease(t *testing.T) { blocker := &blockingConsumer{ blockCh: blockCh, } - + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) sp := &groupByTraceProcessor{ - logger: zap.NewNop(), - nextConsumer: blocker, + logger: zap.NewNop(), + nextConsumer: blocker, + telemetryBuilder: tel, } assert.NoError(t, sp.onTraceReleased(nil)) close(blockCh) @@ -552,7 +559,6 @@ func BenchmarkConsumeTracesCompleteOnFirstBatch(b *testing.B) { NumTraces: defaultNumTraces, NumWorkers: 4 * defaultNumWorkers, } - st := newMemoryStorage() // For each input trace there are always <= 2 events in the machine simultaneously. semaphoreCh := make(chan struct{}, bufferSize/2) @@ -561,9 +567,10 @@ func BenchmarkConsumeTracesCompleteOnFirstBatch(b *testing.B) { return nil }} - p := newGroupByTraceProcessor(zap.NewNop(), st, next, config) + p := newGroupByTraceProcessor(processortest.NewNopSettings(), next, config) require.NotNil(b, p) - + st := newMemoryStorage(p.telemetryBuilder) + p.st = st ctx := context.Background() require.NoError(b, p.Start(ctx, nil)) defer func() { diff --git a/processor/groupbytraceprocessor/storage_memory.go b/processor/groupbytraceprocessor/storage_memory.go index 909164d12330..4cd97fd51451 100644 --- a/processor/groupbytraceprocessor/storage_memory.go +++ b/processor/groupbytraceprocessor/storage_memory.go @@ -8,14 +8,16 @@ import ( "sync" "time" - "go.opencensus.io/stats" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor/internal/metadata" ) type memoryStorage struct { sync.RWMutex content map[pcommon.TraceID][]ptrace.ResourceSpans + telemetry *metadata.TelemetryBuilder stopped bool stoppedLock sync.RWMutex metricsCollectionInterval time.Duration @@ -23,10 +25,11 @@ type memoryStorage struct { var _ storage = (*memoryStorage)(nil) -func newMemoryStorage() *memoryStorage { +func newMemoryStorage(telemetry *metadata.TelemetryBuilder) *memoryStorage { return &memoryStorage{ content: make(map[pcommon.TraceID][]ptrace.ResourceSpans), metricsCollectionInterval: time.Second, + telemetry: telemetry, } } @@ -88,7 +91,7 @@ func (st *memoryStorage) shutdown() error { func (st *memoryStorage) periodicMetrics() { numTraces := st.count() - stats.Record(context.Background(), mNumTracesInMemory.M(int64(numTraces))) + st.telemetry.ProcessorGroupbytraceNumTracesInMemory.Record(context.Background(), int64(numTraces)) st.stoppedLock.RLock() stopped := st.stopped diff --git a/processor/groupbytraceprocessor/storage_memory_test.go b/processor/groupbytraceprocessor/storage_memory_test.go index 673569d36ffc..2a4abbb706f8 100644 --- a/processor/groupbytraceprocessor/storage_memory_test.go +++ b/processor/groupbytraceprocessor/storage_memory_test.go @@ -10,11 +10,15 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor/processortest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor/internal/metadata" ) func TestMemoryCreateAndGetTrace(t *testing.T) { - // prepare - st := newMemoryStorage() + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) + st := newMemoryStorage(tel) traceIDs := []pcommon.TraceID{ pcommon.TraceID([16]byte{1, 2, 3, 4}), @@ -48,8 +52,9 @@ func TestMemoryCreateAndGetTrace(t *testing.T) { } func TestMemoryDeleteTrace(t *testing.T) { - // prepare - st := newMemoryStorage() + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) // prepare + st := newMemoryStorage(tel) traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) @@ -75,8 +80,9 @@ func TestMemoryDeleteTrace(t *testing.T) { } func TestMemoryAppendSpans(t *testing.T) { - // prepare - st := newMemoryStorage() + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) // prepare + st := newMemoryStorage(tel) traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) @@ -126,8 +132,9 @@ func TestMemoryAppendSpans(t *testing.T) { } func TestMemoryTraceIsBeingCloned(t *testing.T) { - // prepare - st := newMemoryStorage() + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) // prepare + st := newMemoryStorage(tel) traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) trace := ptrace.NewTraces()