Skip to content

Commit

Permalink
[processor/tailsampling] support trace_state policy (#10852)
Browse files Browse the repository at this point in the history
  • Loading branch information
ralphgj authored Jun 21, 2022
1 parent 63ebd10 commit 8875a71
Show file tree
Hide file tree
Showing 11 changed files with 225 additions and 2 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
- `googlemanagedprometheusexporter` The Google Managed Service for Prometheus exporter is alpha. (#10925)

### 💡 Enhancements 💡
- `mongodbatlasreceiver` Add support for receiving alerts (#10854)

- `tailsamplingprocessor`: Add trace_state policy (#10852)
- `mongodbatlasreceiver` Add support for receiving alerts (#10854)
- `cmd/mdatagen`: Allow attribute values of any types (#9245)
- `metricstransformprocessor`: Migrate the processor from OC to pdata (#10817)
- This behavior can be reverted by disabling the `processor.metricstransformprocessor.UseOTLPDataModel` feature gate.
Expand Down
6 changes: 6 additions & 0 deletions processor/tailsamplingprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Multiple policies exist today and it is straight forward to add more. These incl
- `probabilistic`: Sample a percentage of traces. Read [a comparison with the Probabilistic Sampling Processor](#probabilistic-sampling-processor-compared-to-the-tail-sampling-processor-with-the-probabilistic-policy).
- `status_code`: Sample based upon the status code (`OK`, `ERROR` or `UNSET`)
- `string_attribute`: Sample based on string attributes value matches, both exact and regex value matches are supported
- `trace_state`: Sample based on [TraceState](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#tracestate) value matches
- `rate_limiting`: Sample based on rate
- `span_count`: Sample based on the minimum number of spans within a batch. If all traces within the batch have less number of spans than the threshold, the batch will not be sampled.
- `and`: Sample based on multiple policies, creates an AND policy
Expand Down Expand Up @@ -95,6 +96,11 @@ processors:
type: span_count,
span_count: {min_spans: 2}
},
{
name: test-policy-11,
type: trace_state,
trace_state: { key: key3, values: [value1, value2] }
},
{
name: and-policy-1,
type: and,
Expand Down
3 changes: 3 additions & 0 deletions processor/tailsamplingprocessor/and_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ func getAndSubPolicyEvaluator(logger *zap.Logger, cfg *AndSubPolicyCfg) (samplin
case Probabilistic:
pfCfg := cfg.ProbabilisticCfg
return sampling.NewProbabilisticSampler(logger, pfCfg.HashSalt, pfCfg.SamplingPercentage), nil
case TraceState:
tsfCfg := cfg.TraceStateCfg
return sampling.NewTraceStateFilter(logger, tsfCfg.Key, tsfCfg.Values), nil
default:
return nil, fmt.Errorf("unknown sampling policy type %s", cfg.Type)
}
Expand Down
3 changes: 3 additions & 0 deletions processor/tailsamplingprocessor/composite_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ func getSubPolicyEvaluator(logger *zap.Logger, cfg *SubPolicyCfg) (sampling.Poli
case SpanCount:
scCfg := cfg.SpanCountCfg
return sampling.NewSpanCount(logger, scCfg.MinSpans), nil
case TraceState:
tsfCfg := cfg.TraceStateCfg
return sampling.NewTraceStateFilter(logger, tsfCfg.Key, tsfCfg.Values), nil
default:
return nil, fmt.Errorf("unknown sampling policy type %s", cfg.Type)
}
Expand Down
15 changes: 15 additions & 0 deletions processor/tailsamplingprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const (
And PolicyType = "and"
// Span Count sample traces that are have more spans per Trace than a given threshold.
SpanCount PolicyType = "span_count"
// TraceState sample traces with specified values by the given key
TraceState PolicyType = "trace_state"
)

// SubPolicyCfg holds the common configuration to all policies under composite policy.
Expand All @@ -70,6 +72,8 @@ type SubPolicyCfg struct {
AndCfg AndCfg `mapstructure:"and"`
// Configs for span counter filter sampling policy evaluator.
SpanCountCfg SpanCountCfg `mapstructure:"span_count"`
// Configs for trace_state policy evaluator.
TraceStateCfg TraceStateCfg `mapstructure:"trace_state"`
}

type AndSubPolicyCfg struct {
Expand All @@ -91,6 +95,15 @@ type AndSubPolicyCfg struct {
StatusCodeCfg StatusCodeCfg `mapstructure:"status_code"`
// Configs for span counter filter sampling policy evaluator.
SpanCountCfg SpanCountCfg `mapstructure:"span_count"`
// Configs for trace_state filter sampling policy evaluator
TraceStateCfg TraceStateCfg `mapstructure:"trace_state"`
}

type TraceStateCfg struct {
// Tag that the filter is going to be matching against.
Key string `mapstructure:"key"`
// Values indicate the set of values to use when matching against trace_state values.
Values []string `mapstructure:"values"`
}

type AndCfg struct {
Expand Down Expand Up @@ -136,6 +149,8 @@ type PolicyCfg struct {
AndCfg AndCfg `mapstructure:"and"`
// Configs for span count filter sampling policy evaluator.
SpanCountCfg SpanCountCfg `mapstructure:"span_count"`
// Configs for defining trace_state policy
TraceStateCfg TraceStateCfg `mapstructure:"trace_state"`
}

// LatencyCfg holds the configurable settings to create a latency filter sampling policy
Expand Down
5 changes: 5 additions & 0 deletions processor/tailsamplingprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ func TestLoadConfig(t *testing.T) {
Type: SpanCount,
SpanCountCfg: SpanCountCfg{MinSpans: 2},
},
{
Name: "test-policy-9",
Type: TraceState,
TraceStateCfg: TraceStateCfg{Key: "key3", Values: []string{"value1", "value2"}},
},
{
Name: "and-policy-1",
Type: And,
Expand Down
2 changes: 1 addition & 1 deletion processor/tailsamplingprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
go.opencensus.io v0.23.0
go.opentelemetry.io/collector v0.53.1-0.20220615184617-4cefca87d2c6
go.opentelemetry.io/collector/pdata v0.53.1-0.20220615184617-4cefca87d2c6
go.opentelemetry.io/otel/trace v1.7.0
go.uber.org/atomic v1.9.0
go.uber.org/goleak v1.1.12
go.uber.org/zap v1.21.0
Expand All @@ -30,7 +31,6 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/metric v0.30.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sampling // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
tracesdk "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

type traceStateFilter struct {
key string
logger *zap.Logger
matcher func(string) bool
}

var _ PolicyEvaluator = (*traceStateFilter)(nil)

// NewTraceStateFilter creates a policy evaluator that samples all traces with
// the given value by the specific key in the trace_state.
func NewTraceStateFilter(logger *zap.Logger, key string, values []string) PolicyEvaluator {
// initialize the exact value map
valuesMap := make(map[string]struct{})
for _, value := range values {
// the key-value pair("=" will take one character) in trace_state can't exceed 256 characters
if value != "" && len(key)+len(value) < 256 {
valuesMap[value] = struct{}{}
}
}
return &traceStateFilter{
key: key,
logger: logger,
matcher: func(toMatch string) bool {
_, matched := valuesMap[toMatch]
return matched
},
}
}

// Evaluate looks at the trace data and returns a corresponding SamplingDecision.
func (tsf *traceStateFilter) Evaluate(_ pcommon.TraceID, trace *TraceData) (Decision, error) {
trace.Lock()
batches := trace.ReceivedBatches
trace.Unlock()

return hasSpanWithCondition(batches, func(span ptrace.Span) bool {
traceState, err := tracesdk.ParseTraceState(string(span.TraceState()))
if err != nil {
return false
}
if ok := tsf.matcher(traceState.Get(tsf.key)); ok {
return true
}
return false
}), nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sampling

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
)

// TestTraceStateCfg is replicated with StringAttributeCfg
type TestTraceStateCfg struct {
Key string
Values []string
}

func TestTraceStateFilter(t *testing.T) {

cases := []struct {
Desc string
Trace *TraceData
filterCfg *TestTraceStateCfg
Decision Decision
}{
{
Desc: "nonmatching trace_state key",
Trace: newTraceState("non_matching=value"),
filterCfg: &TestTraceStateCfg{Key: "example", Values: []string{"value"}},
Decision: NotSampled,
},
{
Desc: "nonmatching trace_state value",
Trace: newTraceState("example=non_matching"),
filterCfg: &TestTraceStateCfg{Key: "example", Values: []string{"value"}},
Decision: NotSampled,
},
{
Desc: "matching trace_state",
Trace: newTraceState("example=value"),
filterCfg: &TestTraceStateCfg{Key: "example", Values: []string{"value"}},
Decision: Sampled,
},
{
Desc: "nonmatching trace_state on empty filter list",
Trace: newTraceState("example=value"),
filterCfg: &TestTraceStateCfg{Key: "example", Values: []string{}},
Decision: NotSampled,
},
{
Desc: "nonmatching trace_state on multiple key-values",
Trace: newTraceState("example=non_matching,non_matching=value"),
filterCfg: &TestTraceStateCfg{Key: "example", Values: []string{"value"}},
Decision: NotSampled,
},
{
Desc: "matching trace_state on multiple key-values",
Trace: newTraceState("example=value,non_matching=value"),
filterCfg: &TestTraceStateCfg{Key: "example", Values: []string{"value"}},
Decision: Sampled,
},
{
Desc: "nonmatching trace_state on multiple filter list",
Trace: newTraceState("example=non_matching"),
filterCfg: &TestTraceStateCfg{Key: "example", Values: []string{"value1", "value2"}},
Decision: NotSampled,
},
{
Desc: "matching trace_state on multiple filter list",
Trace: newTraceState("example=value1"),
filterCfg: &TestTraceStateCfg{Key: "example", Values: []string{"value1", "value2"}},
Decision: Sampled,
},
}

for _, c := range cases {
t.Run(c.Desc, func(t *testing.T) {
filter := NewTraceStateFilter(zap.NewNop(), c.filterCfg.Key, c.filterCfg.Values)
decision, err := filter.Evaluate(pcommon.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}), c.Trace)
assert.NoError(t, err)
assert.Equal(t, decision, c.Decision)
})
}
}

func newTraceState(traceState string) *TraceData {
var traceBatches []ptrace.Traces
traces := ptrace.NewTraces()
rs := traces.ResourceSpans().AppendEmpty()
ils := rs.ScopeSpans().AppendEmpty()
span := ils.Spans().AppendEmpty()
span.SetTraceID(pcommon.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}))
span.SetSpanID(pcommon.NewSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}))
span.SetTraceState(ptrace.TraceState(traceState))
traceBatches = append(traceBatches, traces)
return &TraceData{
ReceivedBatches: traceBatches,
}
}
3 changes: 3 additions & 0 deletions processor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ func getPolicyEvaluator(logger *zap.Logger, cfg *PolicyCfg) (sampling.PolicyEval
case SpanCount:
spCfg := cfg.SpanCountCfg
return sampling.NewSpanCount(logger, spCfg.MinSpans), nil
case TraceState:
tsfCfg := cfg.TraceStateCfg
return sampling.NewTraceStateFilter(logger, tsfCfg.Key, tsfCfg.Values), nil
default:
return nil, fmt.Errorf("unknown sampling policy type %s", cfg.Type)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ processors:
type: span_count,
span_count: {min_spans: 2}
},
{
name: test-policy-9,
type: trace_state,
trace_state: { key: key3, values: [ value1, value2 ] }
},
{
name: and-policy-1,
type: and,
Expand Down

0 comments on commit 8875a71

Please sign in to comment.