Skip to content

Commit

Permalink
Fix issue #33464 [processor/groupbytrace] migrate from opencensus lib…
Browse files Browse the repository at this point in the history
…rary (#33770)

This PR migrates the internal metrics from opencensus for the
groupbytrace processor to Opentelemetry for metrics, via mdatagen. Used
metadata.Telemetrybuilder and replaced all metrices from metrics file to
metadata.yaml file. Updated tests to move from opencensus.

Fixes
[#33464](#33464)

Testing: Unit tests

Documentation: Documentation generated by metadata.yaml.

---------

Signed-off-by: Alex Boten <[email protected]>
Co-authored-by: Alex Boten <[email protected]>
  • Loading branch information
honeychaudharyc and codeboten authored Jul 4, 2024
1 parent 475960c commit fa987c3
Show file tree
Hide file tree
Showing 18 changed files with 473 additions and 329 deletions.
71 changes: 71 additions & 0 deletions processor/groupbytraceprocessor/documentation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
[comment]: <> (Code generated by mdatagen. DO NOT EDIT.)

# groupbytrace

## Internal Telemetry

The following telemetry is emitted by this component.

### processor_groupbytrace_conf_num_traces

Maximum number of traces to hold in the internal storage

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| 1 | Gauge | Int |

### processor_groupbytrace_event_latency

How long the queue events are taking to be processed

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| ms | Histogram | Int |

### processor_groupbytrace_incomplete_releases

Releases that are suspected to have been incomplete

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| <nil> | Sum | Int | true |

### processor_groupbytrace_num_events_in_queue

Number of events currently in the queue

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| 1 | Gauge | Int |

### processor_groupbytrace_num_traces_in_memory

Number of traces currently in the in-memory storage

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| 1 | Gauge | Int |

### processor_groupbytrace_spans_released

Spans released to the next consumer

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| 1 | Sum | Int | true |

### processor_groupbytrace_traces_evicted

Traces evicted from the internal buffer

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| 1 | Sum | Int | true |

### processor_groupbytrace_traces_released

Traces released to the next consumer

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| 1 | Sum | Int | true |
20 changes: 10 additions & 10 deletions processor/groupbytraceprocessor/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import (
"sync"
"time"

"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor/internal/metadata"
)

const (
Expand Down Expand Up @@ -44,8 +46,6 @@ var (
return &hash
},
}

eventTagKey = tag.MustNewKey("event")
)

type eventType int
Expand All @@ -70,8 +70,8 @@ type eventMachine struct {
metricsCollectionInterval time.Duration
shutdownTimeout time.Duration

logger *zap.Logger

logger *zap.Logger
telemetry *metadata.TelemetryBuilder
onTraceReceived func(td tracesWithID, worker *eventMachineWorker) error
onTraceExpired func(traceID pcommon.TraceID, worker *eventMachineWorker) error
onTraceReleased func(rss []ptrace.ResourceSpans) error
Expand All @@ -84,9 +84,10 @@ type eventMachine struct {
closed bool
}

func newEventMachine(logger *zap.Logger, bufferSize int, numWorkers int, numTraces int) *eventMachine {
func newEventMachine(logger *zap.Logger, bufferSize int, numWorkers int, numTraces int, telemetry *metadata.TelemetryBuilder) *eventMachine {
em := &eventMachine{
logger: logger,
telemetry: telemetry,
workers: make([]*eventMachineWorker, numWorkers),
close: make(chan struct{}),
shutdownLock: &sync.RWMutex{},
Expand Down Expand Up @@ -119,7 +120,7 @@ func (em *eventMachine) numEvents() int {
func (em *eventMachine) periodicMetrics() {
numEvents := em.numEvents()
em.logger.Debug("recording current state of the queue", zap.Int("num-events", numEvents))
stats.Record(context.Background(), mNumEventsInQueue.M(int64(numEvents)))
em.telemetry.ProcessorGroupbytraceNumEventsInQueue.Record(context.Background(), int64(numEvents))

em.shutdownLock.RLock()
closed := em.closed
Expand Down Expand Up @@ -288,8 +289,7 @@ func (em *eventMachine) handleEventWithObservability(event string, do func() err
start := time.Now()
succeeded, err := doWithTimeout(time.Second, do)
duration := time.Since(start)

_ = stats.RecordWithTags(context.Background(), []tag.Mutator{tag.Upsert(eventTagKey, event)}, mEventLatency.M(duration.Milliseconds()))
em.telemetry.ProcessorGroupbytraceEventLatency.Record(context.Background(), duration.Milliseconds(), metric.WithAttributeSet(attribute.NewSet(attribute.String("event", event))))

if err != nil {
em.logger.Error("failed to process event", zap.Error(err), zap.String("event", event))
Expand Down
77 changes: 46 additions & 31 deletions processor/groupbytraceprocessor/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package groupbytraceprocessor

import (
"context"
"errors"
"strings"
"sync"
Expand All @@ -13,14 +14,19 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor/processortest"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor/internal/metadata"
)

func TestEventCallback(t *testing.T) {
set := processortest.NewNopSettings()
tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings)

for _, tt := range []struct {
casename string
typ eventType
Expand Down Expand Up @@ -80,7 +86,7 @@ func TestEventCallback(t *testing.T) {
require.NoError(t, err)

wg := &sync.WaitGroup{}
em := newEventMachine(logger, 50, 1, 1_000)
em := newEventMachine(logger, 50, 1, 1_000, tel)
tt.registerCallback(em, wg)

em.startInBackground()
Expand All @@ -100,6 +106,8 @@ func TestEventCallback(t *testing.T) {
}

func TestEventCallbackNotSet(t *testing.T) {
set := processortest.NewNopSettings()
tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings)
for _, tt := range []struct {
casename string
typ eventType
Expand Down Expand Up @@ -127,7 +135,7 @@ func TestEventCallbackNotSet(t *testing.T) {
require.NoError(t, err)

wg := &sync.WaitGroup{}
em := newEventMachine(logger, 50, 1, 1_000)
em := newEventMachine(logger, 50, 1, 1_000, tel)
em.onError = func(_ event) {
wg.Done()
}
Expand All @@ -147,6 +155,8 @@ func TestEventCallbackNotSet(t *testing.T) {
}

func TestEventInvalidPayload(t *testing.T) {
set := processortest.NewNopSettings()
tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings)
for _, tt := range []struct {
casename string
typ eventType
Expand Down Expand Up @@ -195,7 +205,7 @@ func TestEventInvalidPayload(t *testing.T) {
require.NoError(t, err)

wg := &sync.WaitGroup{}
em := newEventMachine(logger, 50, 1, 1_000)
em := newEventMachine(logger, 50, 1, 1_000, tel)
em.onError = func(_ event) {
wg.Done()
}
Expand All @@ -216,12 +226,14 @@ func TestEventInvalidPayload(t *testing.T) {
}

func TestEventUnknownType(t *testing.T) {
set := processortest.NewNopSettings()
tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings)
// prepare
logger, err := zap.NewDevelopment()
require.NoError(t, err)

wg := &sync.WaitGroup{}
em := newEventMachine(logger, 50, 1, 1_000)
em := newEventMachine(logger, 50, 1, 1_000, tel)
em.onError = func(_ event) {
wg.Done()
}
Expand All @@ -239,6 +251,8 @@ func TestEventUnknownType(t *testing.T) {
}

func TestEventTracePerWorker(t *testing.T) {
set := processortest.NewNopSettings()
tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings)
for _, tt := range []struct {
casename string
traceID [16]byte
Expand All @@ -265,7 +279,7 @@ func TestEventTracePerWorker(t *testing.T) {
},
} {
t.Run(tt.casename, func(t *testing.T) {
em := newEventMachine(zap.NewNop(), 200, 100, 1_000)
em := newEventMachine(zap.NewNop(), 200, 100, 1_000, tel)

var wg sync.WaitGroup
var workerForTrace *eventMachineWorker
Expand Down Expand Up @@ -342,13 +356,15 @@ func TestEventConsumeConsistency(t *testing.T) {
}

func TestEventShutdown(t *testing.T) {
set := processortest.NewNopSettings()
tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings)
// prepare
wg := sync.WaitGroup{}
wg.Add(1)

traceReceivedFired := &atomic.Int64{}
traceExpiredFired := &atomic.Int64{}
em := newEventMachine(zap.NewNop(), 50, 1, 1_000)
em := newEventMachine(zap.NewNop(), 50, 1, 1_000, tel)
em.onTraceReceived = func(tracesWithID, *eventMachineWorker) error {
traceReceivedFired.Store(1)
return nil
Expand Down Expand Up @@ -413,16 +429,11 @@ func TestEventShutdown(t *testing.T) {

func TestPeriodicMetrics(t *testing.T) {
// prepare
views := metricViews()

// ensure that we are starting with a clean state
view.Unregister(views...)
assert.NoError(t, view.Register(views...))

// try to be nice with the next consumer (test)
defer view.Unregister(views...)
s := setupTestTelemetry()
telemetryBuilder, err := metadata.NewTelemetryBuilder(s.NewSettings().TelemetrySettings)
require.NoError(t, err)

em := newEventMachine(zap.NewNop(), 50, 1, 1_000)
em := newEventMachine(zap.NewNop(), 50, 1, 1_000, telemetryBuilder)
em.metricsCollectionInterval = time.Millisecond

wg := sync.WaitGroup{}
Expand All @@ -443,7 +454,7 @@ func TestPeriodicMetrics(t *testing.T) {
}()

// sanity check
assertGaugeNotCreated(t, mNumEventsInQueue)
assertGaugeNotCreated(t, "processor_groupbytrace_num_events_in_queue", s)

// test
em.workers[0].fire(event{typ: traceReceived})
Expand All @@ -452,14 +463,14 @@ func TestPeriodicMetrics(t *testing.T) {

// ensure our gauge is showing 1 item in the queue
assert.Eventually(t, func() bool {
return getGaugeValue(t, mNumEventsInQueue) == 1
return getGaugeValue(t, "processor_groupbytrace_num_events_in_queue", s) == 1
}, 1*time.Second, 10*time.Millisecond)

wg.Done() // release all events

// ensure our gauge is now showing no items in the queue
assert.Eventually(t, func() bool {
return getGaugeValue(t, mNumEventsInQueue) == 0
return getGaugeValue(t, "processor_groupbytrace_num_events_in_queue", s) == 0
}, 1*time.Second, 10*time.Millisecond)

// signal and wait for the recursive call to finish
Expand All @@ -470,8 +481,10 @@ func TestPeriodicMetrics(t *testing.T) {
}

func TestForceShutdown(t *testing.T) {
set := processortest.NewNopSettings()
tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings)
// prepare
em := newEventMachine(zap.NewNop(), 50, 1, 1_000)
em := newEventMachine(zap.NewNop(), 50, 1, 1_000, tel)
em.shutdownTimeout = 20 * time.Millisecond

// test
Expand Down Expand Up @@ -515,16 +528,18 @@ func TestDoWithTimeout_TimeoutTrigger(t *testing.T) {
assert.WithinDuration(t, start, time.Now(), 100*time.Millisecond)
}

func getGaugeValue(t *testing.T, gauge *stats.Int64Measure) float64 {
viewData, err := view.RetrieveData("processor_groupbytrace_" + gauge.Name())
require.NoError(t, err)
require.Len(t, viewData, 1) // we expect exactly one data point, the last value

return viewData[0].Data.(*view.LastValueData).Value
func getGaugeValue(t *testing.T, name string, tt componentTestTelemetry) int64 {
var md metricdata.ResourceMetrics
require.NoError(t, tt.reader.Collect(context.Background(), &md))
m := tt.getMetric(name, md).Data
g := m.(metricdata.Gauge[int64])
assert.Len(t, g.DataPoints, 1, "expected exactly one data point")
return g.DataPoints[0].Value
}

func assertGaugeNotCreated(t *testing.T, gauge *stats.Int64Measure) {
viewData, err := view.RetrieveData("processor_groupbytrace_" + gauge.Name())
require.NoError(t, err)
assert.Len(t, viewData, 0, "gauge exists already but shouldn't")
func assertGaugeNotCreated(t *testing.T, name string, tt componentTestTelemetry) {
var md metricdata.ResourceMetrics
require.NoError(t, tt.reader.Collect(context.Background(), &md))
got := tt.getMetric(name, md)
assert.Equal(t, got, metricdata.Metrics{}, "gauge exists already but shouldn't")
}
10 changes: 4 additions & 6 deletions processor/groupbytraceprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"time"

"go.opencensus.io/stats/view"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
Expand All @@ -31,8 +30,6 @@ var (

// NewFactory returns a new factory for the Filter processor.
func NewFactory() processor.Factory {
// TODO: find a more appropriate way to get this done, as we are swallowing the error here
_ = view.Register(metricViews()...)

return processor.NewFactory(
metadata.Type,
Expand Down Expand Up @@ -70,8 +67,9 @@ func createTracesProcessor(
return nil, errDiscardOrphansNotSupported
}

processor := newGroupByTraceProcessor(params, nextConsumer, *oCfg)
// the only supported storage for now
st = newMemoryStorage()

return newGroupByTraceProcessor(params.Logger, st, nextConsumer, *oCfg), nil
st = newMemoryStorage(processor.telemetryBuilder)
processor.st = st
return processor, nil
}
Loading

0 comments on commit fa987c3

Please sign in to comment.