Skip to content

Commit

Permalink
[processor/tailsampling] Support hot sampling policy loading (#37014)
Browse files Browse the repository at this point in the history
#### Description

Adding a feature. This pull-request adds support for hot sampling policy
loading to the tail sampling processor. This allows the collector (or
another service using the processor) to dynamically update tail sampling
policy without needing to restart the processor (or the entire
collector). This greatly minimizes the impact of sampling policy
modifications on pipeline availability and processing. Changes to policy
are safely applied on the next tick loop.

A collector (and/or other service) could use OpAMP to remotely manage
sampling policy with little to no negative impact on pipeline
availability and performance. This is what the https://tailctrl.io/
agent did.

#### Usage

Currently need to define a custom interface in order to set sampling
policy.

``` go
type SamplingProcessor interface {
	processor.Traces

	SetSamplingPolicy(cfgs []tailsamplingprocessor.PolicyCfg)
}

factory := tailsamplingprocessor.NewFactory()

tsp, _ := factory.CreateTraces()
sp = tsp.(SamplingProcessor)

sp.SetSamplingPolicy(cfgs)
```

#### Testing

Added a test to ensure changes to policy are loaded. Using the changes
in a private project.

---------

Signed-off-by: Sean Porter <[email protected]>
Co-authored-by: Matthew Wear <[email protected]>
  • Loading branch information
portertech and mwear authored Jan 7, 2025
1 parent 9226667 commit 5f9d943
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 30 deletions.
27 changes: 27 additions & 0 deletions .chloggen/tailsamplingprocessor_hot-policy-loading.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: tailsamplingprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support hot sampling policy loading

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37014]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
114 changes: 84 additions & 30 deletions processor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type policy struct {
type tailSamplingSpanProcessor struct {
ctx context.Context

set processor.Settings
telemetry *metadata.TelemetryBuilder
logger *zap.Logger

Expand All @@ -59,6 +60,9 @@ type tailSamplingSpanProcessor struct {
nonSampledIDCache cache.Cache[bool]
deleteChan chan pcommon.TraceID
numTracesOnMap *atomic.Uint64

setPolicyMux sync.Mutex
pendingPolicy []PolicyCfg
}

// spanAndScope a structure for holding information about span and its instrumentation scope.
Expand Down Expand Up @@ -108,6 +112,7 @@ func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsume

tsp := &tailSamplingSpanProcessor{
ctx: ctx,
set: set,
telemetry: telemetry,
nextConsumer: nextConsumer,
maxNumTraces: cfg.NumTraces,
Expand All @@ -128,31 +133,9 @@ func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsume
}

if tsp.policies == nil {
policyNames := map[string]bool{}
tsp.policies = make([]*policy, len(cfg.PolicyCfgs))
componentID := set.ID.Name()
for i := range cfg.PolicyCfgs {
policyCfg := &cfg.PolicyCfgs[i]

if policyNames[policyCfg.Name] {
return nil, fmt.Errorf("duplicate policy name %q", policyCfg.Name)
}
policyNames[policyCfg.Name] = true

eval, err := getPolicyEvaluator(telemetrySettings, policyCfg)
if err != nil {
return nil, err
}
uniquePolicyName := policyCfg.Name
if componentID != "" {
uniquePolicyName = fmt.Sprintf("%s.%s", componentID, policyCfg.Name)
}
p := &policy{
name: policyCfg.Name,
evaluator: eval,
attribute: metric.WithAttributes(attribute.String("policy", uniquePolicyName)),
}
tsp.policies[i] = p
err := tsp.loadSamplingPolicy(cfg.PolicyCfgs)
if err != nil {
return nil, err
}
}

Expand Down Expand Up @@ -262,7 +245,82 @@ type policyMetrics struct {
idNotFoundOnMapCount, evaluateErrorCount, decisionSampled, decisionNotSampled int64
}

func (tsp *tailSamplingSpanProcessor) loadSamplingPolicy(cfgs []PolicyCfg) error {
telemetrySettings := tsp.set.TelemetrySettings
componentID := tsp.set.ID.Name()

policyNames := map[string]bool{}
tsp.policies = make([]*policy, len(cfgs))

for i := range cfgs {
policyCfg := &cfgs[i]

if policyNames[policyCfg.Name] {
return fmt.Errorf("duplicate policy name %q", policyCfg.Name)
}
policyNames[policyCfg.Name] = true

eval, err := getPolicyEvaluator(telemetrySettings, policyCfg)
if err != nil {
return err
}
uniquePolicyName := policyCfg.Name
if componentID != "" {
uniquePolicyName = fmt.Sprintf("%s.%s", componentID, policyCfg.Name)
}
p := &policy{
name: policyCfg.Name,
evaluator: eval,
attribute: metric.WithAttributes(attribute.String("policy", uniquePolicyName)),
}
tsp.policies[i] = p
}

tsp.logger.Debug("Loaded sampling policy", zap.Int("policies.len", len(tsp.policies)))

return nil
}

func (tsp *tailSamplingSpanProcessor) SetSamplingPolicy(cfgs []PolicyCfg) {
tsp.logger.Debug("Setting pending sampling policy", zap.Int("pending.len", len(cfgs)))

tsp.setPolicyMux.Lock()
defer tsp.setPolicyMux.Unlock()

tsp.pendingPolicy = cfgs
}

func (tsp *tailSamplingSpanProcessor) loadPendingSamplingPolicy() {
tsp.setPolicyMux.Lock()
defer tsp.setPolicyMux.Unlock()

// Nothing pending, do nothing.
pLen := len(tsp.pendingPolicy)
if pLen == 0 {
return
}

tsp.logger.Debug("Loading pending sampling policy", zap.Int("pending.len", pLen))

// In case something goes wrong.
prev := tsp.policies

err := tsp.loadSamplingPolicy(tsp.pendingPolicy)

// Empty pending regardless of error. If policy is invalid, it will fail on
// every tick, no need to do extra work and flood the log with errors.
tsp.pendingPolicy = nil

if err != nil {
tsp.logger.Error("Failed to load pending sampling policy", zap.Error(err))
tsp.logger.Debug("Falling back to previous sampling policy")
tsp.policies = prev
}
}

func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
tsp.loadPendingSamplingPolicy()

metrics := policyMetrics{}

startTime := time.Now()
Expand Down Expand Up @@ -401,11 +459,7 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc
}

lenSpans := int64(len(spans))
lenPolicies := len(tsp.policies)
initialDecisions := make([]sampling.Decision, lenPolicies)
for i := 0; i < lenPolicies; i++ {
initialDecisions[i] = sampling.Pending
}

d, loaded := tsp.idToTrace.Load(id)
if !loaded {
spanCount := &atomic.Int64{}
Expand Down
87 changes: 87 additions & 0 deletions processor/tailsamplingprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,93 @@ func TestMultipleBatchesAreCombinedIntoOne(t *testing.T) {
}
}

func TestSetSamplingPolicy(t *testing.T) {
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
PolicyCfgs: []PolicyCfg{
{
sharedPolicyCfg: sharedPolicyCfg{
Name: "always",
Type: AlwaysSample,
},
},
},
}
s := setupTestTelemetry()
ct := s.NewSettings()
idb := newSyncIDBatcher()
msp := new(consumertest.TracesSink)

p, err := newTracesProcessor(context.Background(), ct, msp, cfg, withDecisionBatcher(idb))
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
require.NoError(t, p.Shutdown(context.Background()))
}()

tsp := p.(*tailSamplingSpanProcessor)

assert.Len(t, tsp.policies, 1)

tsp.policyTicker.OnTick()

assert.Len(t, tsp.policies, 1)

cfgs := []PolicyCfg{
{
sharedPolicyCfg: sharedPolicyCfg{
Name: "always",
Type: AlwaysSample,
},
},
{
sharedPolicyCfg: sharedPolicyCfg{
Name: "everything",
Type: AlwaysSample,
},
},
}
tsp.SetSamplingPolicy(cfgs)

assert.Len(t, tsp.policies, 1)

tsp.policyTicker.OnTick()

assert.Len(t, tsp.policies, 2)

// Duplicate policy name.
cfgs = []PolicyCfg{
{
sharedPolicyCfg: sharedPolicyCfg{
Name: "always",
Type: AlwaysSample,
},
},
{
sharedPolicyCfg: sharedPolicyCfg{
Name: "everything",
Type: AlwaysSample,
},
},
{
sharedPolicyCfg: sharedPolicyCfg{
Name: "everything",
Type: AlwaysSample,
},
},
}
tsp.SetSamplingPolicy(cfgs)

assert.Len(t, tsp.policies, 2)

tsp.policyTicker.OnTick()

// Should revert sampling policy.
assert.Len(t, tsp.policies, 2)
}

func TestSubSecondDecisionTime(t *testing.T) {
// prepare
msp := new(consumertest.TracesSink)
Expand Down

0 comments on commit 5f9d943

Please sign in to comment.