Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

Commit

Permalink
Merge pull request #226 from DataDog/nicolas/sampling
Browse files Browse the repository at this point in the history
weight spans using their sample rate
  • Loading branch information
galdor authored Feb 8, 2017
2 parents 57f6499 + be0fcd3 commit 622fa02
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 41 deletions.
9 changes: 7 additions & 2 deletions agent/concentrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,16 @@ func (c *Concentrator) Add(t processedTrace) {
c.buckets[btime] = b
}

weight := 1.0
if t.Root != nil {
weight = t.Root.Weight()
}

if t.Root != nil && s.SpanID == t.Root.SpanID && t.Sublayers != nil {
// handle sublayers
b.HandleSpan(s, t.Env, c.aggregators, &t.Sublayers)
b.HandleSpan(s, t.Env, c.aggregators, weight, &t.Sublayers)
} else {
b.HandleSpan(s, t.Env, c.aggregators, nil)
b.HandleSpan(s, t.Env, c.aggregators, weight, nil)
}
}

Expand Down
3 changes: 2 additions & 1 deletion agent/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ func BenchmarkHandleSpanRandom(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
trace := fixtures.RandomTrace(10, 8)
root := trace.GetRoot()
for _, span := range trace {
sb.HandleSpan(span, defaultEnv, aggr, nil)
sb.HandleSpan(span, defaultEnv, aggr, root.Weight(), nil)
}
}
}
4 changes: 2 additions & 2 deletions fixtures/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const defaultEnv = "none"
// TestStatsBucket returns a fixed stats bucket to be used in unit tests
func TestStatsBucket() model.StatsBucket {
srb := model.NewStatsRawBucket(0, 1e9)
srb.HandleSpan(TestSpan(), defaultEnv, defaultAggregators, nil)
srb.HandleSpan(TestSpan(), defaultEnv, defaultAggregators, 1.0, nil)
sb := srb.Export()

// marshalling then unmarshalling data to:
Expand All @@ -36,7 +36,7 @@ func TestStatsBucket() model.StatsBucket {
func StatsBucketWithSpans(s []model.Span) model.StatsBucket {
srb := model.NewStatsRawBucket(0, 1e9)
for _, s := range s {
srb.HandleSpan(s, defaultEnv, defaultAggregators, nil)
srb.HandleSpan(s, defaultEnv, defaultAggregators, 1.0, nil)
}
return srb.Export()
}
Expand Down
16 changes: 16 additions & 0 deletions model/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import (
"math/rand"
)

const (
// SpanSampleRateMetricKey is the metric key holding the sample rate
SpanSampleRateMetricKey = "_sample_rate"
)

// Span is the common struct we use to represent a dapper-like span
type Span struct {
// Mandatory
Expand Down Expand Up @@ -59,3 +64,14 @@ func NewFlushMarker() Span {
func (s *Span) End() int64 {
return s.Start + s.Duration
}

// Weight returns the weight of the span as defined for sampling, i.e. the
// inverse of the sampling rate.
func (s *Span) Weight() float64 {
sampleRate, ok := s.Metrics[SpanSampleRateMetricKey]
if !ok || sampleRate <= 0.0 || sampleRate > 1.0 {
return 1.0
}

return 1.0 / sampleRate
}
22 changes: 22 additions & 0 deletions model/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,25 @@ func TestSpanFlushMarker(t *testing.T) {
s := NewFlushMarker()
assert.True(s.IsFlushMarker())
}

func TestSpanWeight(t *testing.T) {
assert := assert.New(t)

span := testSpan()
assert.Equal(1.0, span.Weight())

span.Metrics[SpanSampleRateMetricKey] = -1.0
assert.Equal(1.0, span.Weight())

span.Metrics[SpanSampleRateMetricKey] = 0.0
assert.Equal(1.0, span.Weight())

span.Metrics[SpanSampleRateMetricKey] = 0.25
assert.Equal(4.0, span.Weight())

span.Metrics[SpanSampleRateMetricKey] = 1.0
assert.Equal(1.0, span.Weight())

span.Metrics[SpanSampleRateMetricKey] = 1.5
assert.Equal(1.0, span.Weight())
}
45 changes: 26 additions & 19 deletions model/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,17 @@ func testTrace() Trace {
// B |----------------------| duration: 20
// C |-----| |---| duration: 5+3
return Trace{
Span{TraceID: 42, SpanID: 42, ParentID: 0, Service: "A", Name: "A.foo", Type: "web", Resource: "α", Start: 0, Duration: 100},
Span{TraceID: 42, SpanID: 100, ParentID: 42, Service: "B", Name: "B.bar", Type: "web", Resource: "α", Start: 1, Duration: 20},
Span{TraceID: 42, SpanID: 2000, ParentID: 100, Service: "C", Name: "sql.query", Type: "sql", Resource: "SELECT value FROM table", Start: 2, Duration: 5},
Span{TraceID: 42, SpanID: 3000, ParentID: 100, Service: "C", Name: "sql.query", Type: "sql", Resource: "SELECT ololololo... value FROM table", Start: 10, Duration: 3, Error: 1},
Span{TraceID: 42, SpanID: 42, ParentID: 0, Service: "A",
Name: "A.foo", Type: "web", Resource: "α", Start: 0, Duration: 100,
Metrics: map[string]float64{SpanSampleRateMetricKey: 0.5}},
Span{TraceID: 42, SpanID: 100, ParentID: 42, Service: "B",
Name: "B.bar", Type: "web", Resource: "α", Start: 1, Duration: 20},
Span{TraceID: 42, SpanID: 2000, ParentID: 100, Service: "C",
Name: "sql.query", Type: "sql", Resource: "SELECT value FROM table",
Start: 2, Duration: 5},
Span{TraceID: 42, SpanID: 3000, ParentID: 100, Service: "C",
Name: "sql.query", Type: "sql", Resource: "SELECT ololololo... value FROM table",
Start: 10, Duration: 3, Error: 1},
}
}

Expand All @@ -55,7 +62,7 @@ func TestStatsBucketDefault(t *testing.T) {
// No custom aggregators only the defaults
aggr := []string{}
for _, s := range testSpans() {
srb.HandleSpan(s, defaultEnv, aggr, nil)
srb.HandleSpan(s, defaultEnv, aggr, 1.0, nil)
}
sb := srb.Export()

Expand Down Expand Up @@ -123,7 +130,7 @@ func TestStatsBucketExtraAggregators(t *testing.T) {
// one custom aggregator
aggr := []string{"version"}
for _, s := range testSpans() {
srb.HandleSpan(s, defaultEnv, aggr, nil)
srb.HandleSpan(s, defaultEnv, aggr, 1.0, nil)
}
sb := srb.Export()

Expand Down Expand Up @@ -182,7 +189,7 @@ func TestStatsBucketMany(t *testing.T) {
s := templateSpan
s.Resource = "α" + strconv.Itoa(i)
srbCopy := *srb
srbCopy.HandleSpan(s, defaultEnv, aggr, nil)
srbCopy.HandleSpan(s, defaultEnv, aggr, 1.0, nil)
}
sb := srb.Export()

Expand Down Expand Up @@ -215,7 +222,7 @@ func TestStatsBucketSublayers(t *testing.T) {
// No custom aggregators only the defaults
aggr := []string{}
for _, s := range tr {
srb.HandleSpan(s, defaultEnv, aggr, &sublayers)
srb.HandleSpan(s, defaultEnv, aggr, root.Weight(), &sublayers)
}
sb := srb.Export()

Expand All @@ -226,18 +233,18 @@ func TestStatsBucketSublayers(t *testing.T) {
"A.foo|_sublayers.duration.by_type|env:default,resource:α,service:A,sublayer_type:sql": 8,
"A.foo|_sublayers.duration.by_type|env:default,resource:α,service:A,sublayer_type:web": 92,
"A.foo|_sublayers.span_count|env:default,resource:α,service:A,:": 4,
"A.foo|duration|env:default,resource:α,service:A": 100,
"A.foo|duration|env:default,resource:α,service:A": 200,
"A.foo|errors|env:default,resource:α,service:A": 0,
"A.foo|hits|env:default,resource:α,service:A": 1,
"A.foo|hits|env:default,resource:α,service:A": 2,
"B.bar|_sublayers.duration.by_service|env:default,resource:α,service:B,sublayer_service:A": 80,
"B.bar|_sublayers.duration.by_service|env:default,resource:α,service:B,sublayer_service:B": 12,
"B.bar|_sublayers.duration.by_service|env:default,resource:α,service:B,sublayer_service:C": 8,
"B.bar|_sublayers.duration.by_type|env:default,resource:α,service:B,sublayer_type:sql": 8,
"B.bar|_sublayers.duration.by_type|env:default,resource:α,service:B,sublayer_type:web": 92,
"B.bar|_sublayers.span_count|env:default,resource:α,service:B,:": 4,
"B.bar|duration|env:default,resource:α,service:B": 20,
"B.bar|duration|env:default,resource:α,service:B": 40,
"B.bar|errors|env:default,resource:α,service:B": 0,
"B.bar|hits|env:default,resource:α,service:B": 1,
"B.bar|hits|env:default,resource:α,service:B": 2,
"sql.query|_sublayers.duration.by_service|env:default,resource:SELECT ololololo... value FROM table,service:C,sublayer_service:A": 80,
"sql.query|_sublayers.duration.by_service|env:default,resource:SELECT ololololo... value FROM table,service:C,sublayer_service:B": 12,
"sql.query|_sublayers.duration.by_service|env:default,resource:SELECT ololololo... value FROM table,service:C,sublayer_service:C": 8,
Expand All @@ -250,12 +257,12 @@ func TestStatsBucketSublayers(t *testing.T) {
"sql.query|_sublayers.duration.by_type|env:default,resource:SELECT value FROM table,service:C,sublayer_type:web": 92,
"sql.query|_sublayers.span_count|env:default,resource:SELECT ololololo... value FROM table,service:C,:": 4,
"sql.query|_sublayers.span_count|env:default,resource:SELECT value FROM table,service:C,:": 4,
"sql.query|duration|env:default,resource:SELECT ololololo... value FROM table,service:C": 3,
"sql.query|duration|env:default,resource:SELECT value FROM table,service:C": 5,
"sql.query|errors|env:default,resource:SELECT ololololo... value FROM table,service:C": 1,
"sql.query|duration|env:default,resource:SELECT ololololo... value FROM table,service:C": 6,
"sql.query|duration|env:default,resource:SELECT value FROM table,service:C": 10,
"sql.query|errors|env:default,resource:SELECT ololololo... value FROM table,service:C": 2,
"sql.query|errors|env:default,resource:SELECT value FROM table,service:C": 0,
"sql.query|hits|env:default,resource:SELECT ololololo... value FROM table,service:C": 1,
"sql.query|hits|env:default,resource:SELECT value FROM table,service:C": 1,
"sql.query|hits|env:default,resource:SELECT ololololo... value FROM table,service:C": 2,
"sql.query|hits|env:default,resource:SELECT value FROM table,service:C": 2,
}

assert.Len(sb.Counts, len(expectedCounts), "Missing counts!")
Expand Down Expand Up @@ -324,7 +331,7 @@ func BenchmarkHandleSpan(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for _, s := range testSpans() {
srb.HandleSpan(s, defaultEnv, aggr, nil)
srb.HandleSpan(s, defaultEnv, aggr, 1.0, nil)
}
}
}
Expand All @@ -343,7 +350,7 @@ func BenchmarkHandleSpanSublayers(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for _, s := range tr {
srb.HandleSpan(s, defaultEnv, aggr, &sublayers)
srb.HandleSpan(s, defaultEnv, aggr, root.Weight(), &sublayers)
}
}
}
Expand Down
18 changes: 9 additions & 9 deletions model/statsraw.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (

type groupedStats struct {
tags TagSet
hits int64
errors int64
duration int64
hits float64
errors float64
duration float64
durationDistribution *quantile.SliceSummary
}

Expand Down Expand Up @@ -163,7 +163,7 @@ func assembleGrain(b *bytes.Buffer, env, resource, service string, m map[string]
}

// HandleSpan adds the span to this bucket stats, aggregated with the finest grain matching given aggregators
func (sb *StatsRawBucket) HandleSpan(s Span, env string, aggregators []string, sublayers *[]SublayerValue) {
func (sb *StatsRawBucket) HandleSpan(s Span, env string, aggregators []string, weight float64, sublayers *[]SublayerValue) {
if env == "" {
panic("env should never be empty")
}
Expand All @@ -179,7 +179,7 @@ func (sb *StatsRawBucket) HandleSpan(s Span, env string, aggregators []string, s
}

grain, tags := assembleGrain(&sb.keyBuf, env, s.Resource, s.Service, m)
sb.add(s, grain, tags)
sb.add(s, weight, grain, tags)

// sublayers - special case
if sublayers != nil {
Expand All @@ -189,7 +189,7 @@ func (sb *StatsRawBucket) HandleSpan(s Span, env string, aggregators []string, s
}
}

func (sb *StatsRawBucket) add(s Span, aggr string, tags TagSet) {
func (sb *StatsRawBucket) add(s Span, weight float64, aggr string, tags TagSet) {
var gs groupedStats
var ok bool

Expand All @@ -198,11 +198,11 @@ func (sb *StatsRawBucket) add(s Span, aggr string, tags TagSet) {
gs = newGroupedStats(tags)
}

gs.hits++
gs.hits += weight
if s.Error != 0 {
gs.errors++
gs.errors += weight
}
gs.duration += s.Duration
gs.duration += float64(s.Duration) * weight

// TODO add for s.Metrics ability to define arbitrary counts and distros, check some config?
// alter resolution of duration distro
Expand Down
10 changes: 2 additions & 8 deletions sampler/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ import (
)

const (
// SampleRateMetricKey is the metric key holding the sample rate
SampleRateMetricKey = "_sample_rate"

// Sampler parameters not (yet?) configurable
defaultDecayPeriod time.Duration = 30 * time.Second
defaultSignatureScoreOffset float64 = 1
Expand Down Expand Up @@ -154,7 +151,7 @@ func ApplySampleRate(root *model.Span, sampleRate float64) bool {

// GetTraceAppliedSampleRate gets the sample rate the sample rate applied earlier in the pipeline.
func GetTraceAppliedSampleRate(root *model.Span) float64 {
if rate, ok := root.Metrics[SampleRateMetricKey]; ok {
if rate, ok := root.Metrics[model.SpanSampleRateMetricKey]; ok {
return rate
}

Expand All @@ -166,8 +163,5 @@ func SetTraceAppliedSampleRate(root *model.Span, sampleRate float64) {
if root.Metrics == nil {
root.Metrics = make(map[string]float64)
}
if _, ok := root.Metrics[SampleRateMetricKey]; !ok {
root.Metrics[SampleRateMetricKey] = 1.0
}
root.Metrics[SampleRateMetricKey] = sampleRate
root.Metrics[model.SpanSampleRateMetricKey] = sampleRate
}

0 comments on commit 622fa02

Please sign in to comment.