Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/tailsampling] Include componentID as prefix in metrics 'policy' #34192

2 changes: 1 addition & 1 deletion processor/tailsamplingprocessor/factory.go
Original file line number Diff line number Diff line change
@@ -38,5 +38,5 @@ func createTracesProcessor(
nextConsumer consumer.Traces,
) (processor.Traces, error) {
tCfg := cfg.(*Config)
return newTracesProcessor(ctx, params.TelemetrySettings, nextConsumer, *tCfg)
return newTracesProcessor(ctx, params, nextConsumer, *tCfg)
}
16 changes: 11 additions & 5 deletions processor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
@@ -83,8 +83,9 @@ type Option func(*tailSamplingSpanProcessor)

// newTracesProcessor returns a processor.TracesProcessor that will perform tail sampling according to the given
// configuration.
func newTracesProcessor(ctx context.Context, settings component.TelemetrySettings, nextConsumer consumer.Traces, cfg Config, opts ...Option) (processor.Traces, error) {
telemetry, err := metadata.NewTelemetryBuilder(settings)
func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsumer consumer.Traces, cfg Config, opts ...Option) (processor.Traces, error) {
telemetrySettings := set.TelemetrySettings
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a good reason for renaming component.TelemetrySettings from settings to telemetrySettings?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was only to differentiate between the new argument processor.Settings (settings) from initial argument which was processor.Settings.TelemetrySettings (telemetrySettings)

Do i change it to below instead?

func newTracesProcessor(ctx context.Context, processorSettings processor.Settings, ...) (processor.Traces, error) {
  settings := processorSettings.TelemetrySettings
  telemetry, err := metadata.NewTelemetryBuilder(settings)
  if err != nil {
    return nil, err
  }
  ...
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's okay. This is small enough to not be a problem.

telemetry, err := metadata.NewTelemetryBuilder(telemetrySettings)
if err != nil {
return nil, err
}
@@ -102,7 +103,7 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting
nextConsumer: nextConsumer,
maxNumTraces: cfg.NumTraces,
sampledIDCache: sampledDecisions,
logger: settings.Logger,
logger: telemetrySettings.Logger,
numTracesOnMap: &atomic.Uint64{},
deleteChan: make(chan pcommon.TraceID, cfg.NumTraces),
}
@@ -119,6 +120,7 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting
if tsp.policies == nil {
policyNames := map[string]bool{}
tsp.policies = make([]*policy, len(cfg.PolicyCfgs))
componentID := set.ID.Name()
for i := range cfg.PolicyCfgs {
policyCfg := &cfg.PolicyCfgs[i]

@@ -127,14 +129,18 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting
}
policyNames[policyCfg.Name] = true

eval, err := getPolicyEvaluator(settings, policyCfg)
eval, err := getPolicyEvaluator(telemetrySettings, policyCfg)
if err != nil {
return nil, err
}
uniquePolicyName := policyCfg.Name
if componentID != "" {
uniquePolicyName = fmt.Sprintf("%s.%s", componentID, policyCfg.Name)
}
p := &policy{
name: policyCfg.Name,
evaluator: eval,
attribute: metric.WithAttributes(attribute.String("policy", policyCfg.Name)),
attribute: metric.WithAttributes(attribute.String("policy", uniquePolicyName)),
}
tsp.policies[i] = p
}
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor/processortest"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
)
@@ -24,8 +25,7 @@ func BenchmarkSampling(b *testing.B) {
ExpectedNewTracesPerSec: 64,
PolicyCfgs: testPolicy,
}

sp, _ := newTracesProcessor(context.Background(), componenttest.NewNopTelemetrySettings(), consumertest.NewNop(), cfg)
sp, _ := newTracesProcessor(context.Background(), processortest.NewNopSettings(), consumertest.NewNop(), cfg)
tsp := sp.(*tailSamplingSpanProcessor)
require.NoError(b, tsp.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
14 changes: 7 additions & 7 deletions processor/tailsamplingprocessor/processor_decisions_test.go
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ func TestSamplingPolicyTypicalPath(t *testing.T) {
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
@@ -71,7 +71,7 @@ func TestSamplingPolicyInvertSampled(t *testing.T) {
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
@@ -116,7 +116,7 @@ func TestSamplingMultiplePolicies(t *testing.T) {
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
@@ -167,7 +167,7 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) {
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
@@ -213,7 +213,7 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) {
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
@@ -264,7 +264,7 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) {
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
@@ -334,7 +334,7 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) {
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
idb := newSyncIDBatcher()

mpe := &mockPolicyEvaluator{}
105 changes: 101 additions & 4 deletions processor/tailsamplingprocessor/processor_telemetry_test.go
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/featuregate"
@@ -37,7 +38,7 @@ func TestMetricsAfterOneEvaluation(t *testing.T) {
},
}
cs := &consumertest.TracesSink{}
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher))
require.NoError(t, err)
defer func() {
@@ -211,6 +212,102 @@ func TestMetricsAfterOneEvaluation(t *testing.T) {
assert.Len(t, cs.AllTraces(), 1)
}

func TestMetricsWithComponentID(t *testing.T) {
// prepare
s := setupTestTelemetry()
b := newSyncIDBatcher()
syncBatcher := b.(*syncIDBatcher)

cfg := Config{
DecisionWait: 1,
NumTraces: 100,
PolicyCfgs: []PolicyCfg{
{
sharedPolicyCfg: sharedPolicyCfg{
Name: "always",
Type: AlwaysSample,
},
},
},
}
cs := &consumertest.TracesSink{}
ct := s.NewSettings()
ct.ID = component.MustNewIDWithName("tail_sampling", "unique_id") // e.g tail_sampling/unique_id
proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher))
require.NoError(t, err)
defer func() {
err = proc.Shutdown(context.Background())
require.NoError(t, err)
}()

err = proc.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)

// test
err = proc.ConsumeTraces(context.Background(), simpleTraces())
require.NoError(t, err)

tsp := proc.(*tailSamplingSpanProcessor)
tsp.policyTicker.OnTick() // the first tick always gets an empty batch
tsp.policyTicker.OnTick()

// verify
var md metricdata.ResourceMetrics
require.NoError(t, s.reader.Collect(context.Background(), &md))
require.Equal(t, 8, s.len(md))

for _, tt := range []struct {
opts []metricdatatest.Option
m metricdata.Metrics
}{
{
opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp()},
m: metricdata.Metrics{
Name: "otelcol_processor_tail_sampling_count_traces_sampled",
Description: "Count of traces that were sampled or not per sampling policy",
Unit: "{traces}",
Data: metricdata.Sum[int64]{
IsMonotonic: true,
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String("policy", "unique_id.always"),
attribute.String("sampled", "true"),
),
Value: 1,
},
},
},
},
},
{
opts: []metricdatatest.Option{metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()},
m: metricdata.Metrics{
Name: "otelcol_processor_tail_sampling_sampling_decision_latency",
Description: "Latency (in microseconds) of a given sampling policy",
Unit: "µs",
Data: metricdata.Histogram[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String("policy", "unique_id.always"),
),
},
},
},
},
},
} {
got := s.getMetric(tt.m.Name, md)
metricdatatest.AssertEqual(t, tt.m, got, tt.opts...)
}

// sanity check
assert.Len(t, cs.AllTraces(), 1)
}

func TestProcessorTailSamplingCountSpansSampled(t *testing.T) {
err := featuregate.GlobalRegistry().Set("processor.tailsamplingprocessor.metricstatcountspanssampled", true)
require.NoError(t, err)
@@ -238,7 +335,7 @@ func TestProcessorTailSamplingCountSpansSampled(t *testing.T) {
},
}
cs := &consumertest.TracesSink{}
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher))
require.NoError(t, err)
defer func() {
@@ -303,7 +400,7 @@ func TestProcessorTailSamplingSamplingTraceRemovalAge(t *testing.T) {
},
}
cs := &consumertest.TracesSink{}
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher))
require.NoError(t, err)
defer func() {
@@ -364,7 +461,7 @@ func TestProcessorTailSamplingSamplingLateSpanAge(t *testing.T) {
},
}
cs := &consumertest.TracesSink{}
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
proc, err := newTracesProcessor(context.Background(), ct, cs, cfg, withDecisionBatcher(syncBatcher))
require.NoError(t, err)
defer func() {
21 changes: 10 additions & 11 deletions processor/tailsamplingprocessor/processor_test.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ import (
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor/processortest"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
@@ -121,7 +122,7 @@ func TestTraceIntegrity(t *testing.T) {
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
idb := newSyncIDBatcher()

mpe1 := &mockPolicyEvaluator{}
@@ -182,7 +183,7 @@ func TestSequentialTraceArrival(t *testing.T) {
ExpectedNewTracesPerSec: 64,
PolicyCfgs: testPolicy,
}
sp, err := newTracesProcessor(context.Background(), componenttest.NewNopTelemetrySettings(), consumertest.NewNop(), cfg, withTickerFrequency(time.Millisecond))
sp, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), consumertest.NewNop(), cfg, withTickerFrequency(time.Millisecond))
require.NoError(t, err)

err = sp.Start(context.Background(), componenttest.NewNopHost())
@@ -215,7 +216,7 @@ func TestConcurrentTraceArrival(t *testing.T) {
ExpectedNewTracesPerSec: 64,
PolicyCfgs: testPolicy,
}
sp, err := newTracesProcessor(context.Background(), componenttest.NewNopTelemetrySettings(), consumertest.NewNop(), cfg, withTickerFrequency(time.Millisecond))
sp, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), consumertest.NewNop(), cfg, withTickerFrequency(time.Millisecond))
require.NoError(t, err)

err = sp.Start(context.Background(), componenttest.NewNopHost())
@@ -269,7 +270,7 @@ func TestConcurrentArrivalAndEvaluation(t *testing.T) {
ExpectedNewTracesPerSec: 64,
PolicyCfgs: testLatencyPolicy,
}
sp, err := newTracesProcessor(context.Background(), componenttest.NewNopTelemetrySettings(), consumertest.NewNop(), cfg, withTickerFrequency(time.Millisecond))
sp, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), consumertest.NewNop(), cfg, withTickerFrequency(time.Millisecond))
require.NoError(t, err)

err = sp.Start(context.Background(), componenttest.NewNopHost())
@@ -313,7 +314,7 @@ func TestSequentialTraceMapSize(t *testing.T) {
ExpectedNewTracesPerSec: 64,
PolicyCfgs: testPolicy,
}
sp, err := newTracesProcessor(context.Background(), componenttest.NewNopTelemetrySettings(), consumertest.NewNop(), cfg, withTickerFrequency(100*time.Millisecond))
sp, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), consumertest.NewNop(), cfg, withTickerFrequency(100*time.Millisecond))
require.NoError(t, err)

err = sp.Start(context.Background(), componenttest.NewNopHost())
@@ -347,7 +348,7 @@ func TestConcurrentTraceMapSize(t *testing.T) {
ExpectedNewTracesPerSec: 64,
PolicyCfgs: testPolicy,
}
sp, _ := newTracesProcessor(context.Background(), componenttest.NewNopTelemetrySettings(), consumertest.NewNop(), cfg, withTickerFrequency(100*time.Millisecond))
sp, _ := newTracesProcessor(context.Background(), processortest.NewNopSettings(), consumertest.NewNop(), cfg, withTickerFrequency(100*time.Millisecond))
require.NoError(t, sp.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
require.NoError(t, sp.Shutdown(context.Background()))
@@ -388,7 +389,7 @@ func TestMultipleBatchesAreCombinedIntoOne(t *testing.T) {
},
}
s := setupTestTelemetry()
ct := s.NewSettings().TelemetrySettings
ct := s.NewSettings()
idb := newSyncIDBatcher()
msp := new(consumertest.TracesSink)

@@ -449,8 +450,7 @@ func TestMultipleBatchesAreCombinedIntoOne(t *testing.T) {
func TestSubSecondDecisionTime(t *testing.T) {
// prepare
msp := new(consumertest.TracesSink)

tsp, err := newTracesProcessor(context.Background(), componenttest.NewNopTelemetrySettings(), msp, Config{
tsp, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), msp, Config{
DecisionWait: 500 * time.Millisecond,
NumTraces: defaultNumTraces,
PolicyCfgs: testPolicy,
@@ -501,15 +501,14 @@ func TestPolicyLoggerAddsPolicyName(t *testing.T) {

func TestDuplicatePolicyName(t *testing.T) {
// prepare
set := componenttest.NewNopTelemetrySettings()
msp := new(consumertest.TracesSink)

alwaysSample := sharedPolicyCfg{
Name: "always_sample",
Type: AlwaysSample,
}

_, err := newTracesProcessor(context.Background(), set, msp, Config{
_, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), msp, Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
PolicyCfgs: []PolicyCfg{
Loading