Skip to content

Commit

Permalink
[adaptive sampling] Clean-up after previous refactoring (#5954)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Previous refactoring left the code in a mixed up state

## Description of the changes
- move provider / aggregator / post-aggregator into separate files named
accordingly
- do the same for tests
- no actual code changes, just moving code around

## How was this change tested?
- CI

---------

Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro authored Sep 7, 2024
1 parent 88a0319 commit 9bdd368
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 247 deletions.
6 changes: 6 additions & 0 deletions plugin/sampling/strategyprovider/adaptive/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ const (
maxProbabilities = 10
)

// aggregator is a kind of trace processor that watches for root spans
// and calculates how many traces per service / per endpoint are being
// produced. It periodically flushes these stats ("throughput") to storage.
//
// It also invokes PostAggregator which actually computes adaptive sampling
// probabilities based on the observed throughput.
type aggregator struct {
sync.Mutex

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package adaptive

import (
"context"
"errors"
"math"
"math/rand"
Expand All @@ -17,7 +16,6 @@ import (
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/sampling/leaderelection"
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive/calculationstrategy"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/storage/samplingstore"
)

Expand Down Expand Up @@ -58,7 +56,7 @@ type throughputBucket struct {
endTime time.Time
}

// PostAggregator retrieves service throughput over a look back interval and calculates sampling probabilities
// PostAggregator retrieves service throughput over a lookback interval and calculates sampling probabilities
// per operation such that each operation is sampled at a specified target QPS. It achieves this by
// retrieving discrete buckets of operation throughput and doing a weighted average of the throughput
// and generating a probability to match the targetQPS.
Expand Down Expand Up @@ -129,16 +127,6 @@ func newPostAggregator(
}, nil
}

// GetSamplingStrategy implements protobuf endpoint for retrieving sampling strategy for a service.
func (p *Provider) GetSamplingStrategy(_ context.Context, service string) (*api_v2.SamplingStrategyResponse, error) {
p.RLock()
defer p.RUnlock()
if strategy, ok := p.strategyResponses[service]; ok {
return strategy, nil
}
return p.generateDefaultSamplingStrategyResponse(), nil
}

// Start initializes and starts the sampling postAggregator which regularly calculates sampling probabilities.
func (p *PostAggregator) Start() error {
p.logger.Info("starting adaptive sampling postAggregator")
Expand All @@ -148,52 +136,10 @@ func (p *PostAggregator) Start() error {
return nil
}

func (p *Provider) loadProbabilities() {
// TODO GetLatestProbabilities API can be changed to return the latest measured qps for initialization
probabilities, err := p.storage.GetLatestProbabilities()
if err != nil {
p.logger.Warn("failed to initialize probabilities", zap.Error(err))
return
}
p.Lock()
defer p.Unlock()
p.probabilities = probabilities
}

// runUpdateProbabilitiesLoop is a loop that reads probabilities from storage.
// The follower updates its local cache with the latest probabilities and serves them.
func (p *Provider) runUpdateProbabilitiesLoop() {
select {
case <-time.After(addJitter(p.followerRefreshInterval)):
// continue after jitter delay
case <-p.shutdown:
return
}

ticker := time.NewTicker(p.followerRefreshInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Only load probabilities if this strategy_store doesn't hold the leader lock
if !p.isLeader() {
p.loadProbabilities()
p.generateStrategyResponses()
}
case <-p.shutdown:
return
}
}
}

func (p *PostAggregator) isLeader() bool {
return p.electionParticipant.IsLeader()
}

func (p *Provider) isLeader() bool {
return p.electionParticipant.IsLeader()
}

// addJitter adds a random amount of time. Without jitter, if the host holding the leader
// lock were to die, then all other collectors can potentially wait for a full cycle before
// trying to acquire the lock. With jitter, we can reduce the average amount of time before a
Expand Down Expand Up @@ -457,40 +403,3 @@ func (p *PostAggregator) isUsingAdaptiveSampling(
}
return false
}

// generateStrategyResponses generates and caches SamplingStrategyResponse from the calculated sampling probabilities.
func (p *Provider) generateStrategyResponses() {
p.RLock()
strategies := make(map[string]*api_v2.SamplingStrategyResponse)
for svc, opProbabilities := range p.probabilities {
opStrategies := make([]*api_v2.OperationSamplingStrategy, len(opProbabilities))
var idx int
for op, probability := range opProbabilities {
opStrategies[idx] = &api_v2.OperationSamplingStrategy{
Operation: op,
ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{
SamplingRate: probability,
},
}
idx++
}
strategy := p.generateDefaultSamplingStrategyResponse()
strategy.OperationSampling.PerOperationStrategies = opStrategies
strategies[svc] = strategy
}
p.RUnlock()

p.Lock()
defer p.Unlock()
p.strategyResponses = strategies
}

func (p *Provider) generateDefaultSamplingStrategyResponse() *api_v2.SamplingStrategyResponse {
return &api_v2.SamplingStrategyResponse{
StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC,
OperationSampling: &api_v2.PerOperationSamplingStrategies{
DefaultSamplingProbability: p.InitialSamplingProbability,
DefaultLowerBoundTracesPerSecond: p.MinSamplesPerSecond,
},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package adaptive

import (
"context"
"errors"
"testing"
"time"
Expand All @@ -20,7 +19,6 @@ import (
"github.com/jaegertracing/jaeger/pkg/testutils"
epmocks "github.com/jaegertracing/jaeger/plugin/sampling/leaderelection/mocks"
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive/calculationstrategy"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
smocks "github.com/jaegertracing/jaeger/storage/samplingstore/mocks"
)

Expand Down Expand Up @@ -405,113 +403,6 @@ func TestRunCalculationLoop_GetThroughputError(t *testing.T) {
require.NoError(t, agg.Close())
}

func TestLoadProbabilities(t *testing.T) {
mockStorage := &smocks.Store{}
mockStorage.On("GetLatestProbabilities").Return(make(model.ServiceOperationProbabilities), nil)

p := &Provider{storage: mockStorage}
require.Nil(t, p.probabilities)
p.loadProbabilities()
require.NotNil(t, p.probabilities)
}

func TestRunUpdateProbabilitiesLoop(t *testing.T) {
mockStorage := &smocks.Store{}
mockStorage.On("GetLatestProbabilities").Return(make(model.ServiceOperationProbabilities), nil)
mockEP := &epmocks.ElectionParticipant{}
mockEP.On("Start").Return(nil)
mockEP.On("Close").Return(nil)
mockEP.On("IsLeader").Return(false)

p := &Provider{
storage: mockStorage,
shutdown: make(chan struct{}),
followerRefreshInterval: time.Millisecond,
electionParticipant: mockEP,
}
defer close(p.shutdown)
require.Nil(t, p.probabilities)
require.Nil(t, p.strategyResponses)
go p.runUpdateProbabilitiesLoop()

for i := 0; i < 1000; i++ {
p.RLock()
if p.probabilities != nil && p.strategyResponses != nil {
p.RUnlock()
break
}
p.RUnlock()
time.Sleep(time.Millisecond)
}
p.RLock()
assert.NotNil(t, p.probabilities)
assert.NotNil(t, p.strategyResponses)
p.RUnlock()
}

func TestRealisticRunCalculationLoop(t *testing.T) {
t.Skip("Skipped realistic calculation loop test")
logger := zap.NewNop()
// NB: This is an extremely long test since it uses near realistic (1/6th scale) processor config values
testThroughputs := []*model.Throughput{
{Service: "svcA", Operation: "GET", Count: 10},
{Service: "svcA", Operation: "POST", Count: 9},
{Service: "svcA", Operation: "PUT", Count: 5},
{Service: "svcA", Operation: "DELETE", Count: 20},
}
mockStorage := &smocks.Store{}
mockStorage.On("GetThroughput", mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")).
Return(testThroughputs, nil)
mockStorage.On("GetLatestProbabilities").Return(make(model.ServiceOperationProbabilities), nil)
mockStorage.On("InsertProbabilitiesAndQPS", "host", mock.AnythingOfType("model.ServiceOperationProbabilities"),
mock.AnythingOfType("model.ServiceOperationQPS")).Return(nil)
mockEP := &epmocks.ElectionParticipant{}
mockEP.On("Start").Return(nil)
mockEP.On("Close").Return(nil)
mockEP.On("IsLeader").Return(true)
cfg := Options{
TargetSamplesPerSecond: 1.0,
DeltaTolerance: 0.2,
InitialSamplingProbability: 0.001,
CalculationInterval: time.Second * 10,
AggregationBuckets: 1,
Delay: time.Second * 10,
}
s := NewProvider(cfg, logger, mockEP, mockStorage)
s.Start()

for i := 0; i < 100; i++ {
strategy, _ := s.GetSamplingStrategy(context.Background(), "svcA")
if len(strategy.OperationSampling.PerOperationStrategies) != 0 {
break
}
time.Sleep(250 * time.Millisecond)
}
s.Close()

strategy, err := s.GetSamplingStrategy(context.Background(), "svcA")
require.NoError(t, err)
require.Len(t, strategy.OperationSampling.PerOperationStrategies, 4)
strategies := strategy.OperationSampling.PerOperationStrategies

for _, s := range strategies {
switch s.Operation {
case "GET":
assert.Equal(t, 0.001, s.ProbabilisticSampling.SamplingRate,
"Already at 1QPS, no probability change")
case "POST":
assert.Equal(t, 0.001, s.ProbabilisticSampling.SamplingRate,
"Within epsilon of 1QPS, no probability change")
case "PUT":
assert.InEpsilon(t, 0.002, s.ProbabilisticSampling.SamplingRate, 0.025,
"Under sampled, double probability")
case "DELETE":
assert.InEpsilon(t, 0.0005, s.ProbabilisticSampling.SamplingRate, 0.025,
"Over sampled, halve probability")
}
}
}

func TestPrependBucket(t *testing.T) {
p := &PostAggregator{Options: Options{AggregationBuckets: 1}}
p.prependThroughputBucket(&throughputBucket{interval: time.Minute})
Expand Down Expand Up @@ -547,41 +438,6 @@ func TestConstructorFailure(t *testing.T) {
require.EqualError(t, err, "BucketsForCalculation cannot be less than 1")
}

func TestGenerateStrategyResponses(t *testing.T) {
probabilities := model.ServiceOperationProbabilities{
"svcA": map[string]float64{
"GET": 0.5,
},
}
p := &Provider{
probabilities: probabilities,
Options: Options{
InitialSamplingProbability: 0.001,
MinSamplesPerSecond: 0.0001,
},
}
p.generateStrategyResponses()

expectedResponse := map[string]*api_v2.SamplingStrategyResponse{
"svcA": {
StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC,
OperationSampling: &api_v2.PerOperationSamplingStrategies{
DefaultSamplingProbability: 0.001,
DefaultLowerBoundTracesPerSecond: 0.0001,
PerOperationStrategies: []*api_v2.OperationSamplingStrategy{
{
Operation: "GET",
ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{
SamplingRate: 0.5,
},
},
},
},
},
}
assert.Equal(t, expectedResponse, p.strategyResponses)
}

func TestUsingAdaptiveSampling(t *testing.T) {
p := &PostAggregator{}
throughput := serviceOperationThroughput{
Expand Down
Loading

0 comments on commit 9bdd368

Please sign in to comment.