From be0fcd39e72b857a6d5bf3daed804ed677ead804 Mon Sep 17 00:00:00 2001 From: Nicolas Martyanoff Date: Tue, 7 Feb 2017 15:53:25 +0100 Subject: [PATCH] weight spans using their sample rate The root span of a trace can contain a _sample_rate value if the client sampled spans. When this is the case, we scale stats (hit and error counts, and durations) by a factor, or weight, equal to 1.0 / _sample_rate. This also means that we have to use floating point values to accumulate values in StatsRawBucket. --- agent/concentrator.go | 9 +++++++-- agent/model_test.go | 3 ++- fixtures/stats.go | 4 ++-- model/span.go | 16 +++++++++++++++ model/span_test.go | 22 +++++++++++++++++++++ model/stats_test.go | 45 +++++++++++++++++++++++++------------------ model/statsraw.go | 18 ++++++++--------- sampler/sampler.go | 10 ++-------- 8 files changed, 86 insertions(+), 41 deletions(-) diff --git a/agent/concentrator.go b/agent/concentrator.go index e395d92ff..de8f328de 100644 --- a/agent/concentrator.go +++ b/agent/concentrator.go @@ -45,11 +45,16 @@ func (c *Concentrator) Add(t processedTrace) { c.buckets[btime] = b } + weight := 1.0 + if t.Root != nil { + weight = t.Root.Weight() + } + if t.Root != nil && s.SpanID == t.Root.SpanID && t.Sublayers != nil { // handle sublayers - b.HandleSpan(s, t.Env, c.aggregators, &t.Sublayers) + b.HandleSpan(s, t.Env, c.aggregators, weight, &t.Sublayers) } else { - b.HandleSpan(s, t.Env, c.aggregators, nil) + b.HandleSpan(s, t.Env, c.aggregators, weight, nil) } } diff --git a/agent/model_test.go b/agent/model_test.go index 6f02d7d22..64f6be474 100644 --- a/agent/model_test.go +++ b/agent/model_test.go @@ -22,8 +22,9 @@ func BenchmarkHandleSpanRandom(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { trace := fixtures.RandomTrace(10, 8) + root := trace.GetRoot() for _, span := range trace { - sb.HandleSpan(span, defaultEnv, aggr, nil) + sb.HandleSpan(span, defaultEnv, aggr, root.Weight(), nil) } } } diff --git a/fixtures/stats.go b/fixtures/stats.go index b93b27cf7..36a48f789 100644 --- a/fixtures/stats.go +++ b/fixtures/stats.go @@ -12,7 +12,7 @@ const defaultEnv = "none" // TestStatsBucket returns a fixed stats bucket to be used in unit tests func TestStatsBucket() model.StatsBucket { srb := model.NewStatsRawBucket(0, 1e9) - srb.HandleSpan(TestSpan(), defaultEnv, defaultAggregators, nil) + srb.HandleSpan(TestSpan(), defaultEnv, defaultAggregators, 1.0, nil) sb := srb.Export() // marshalling then unmarshalling data to: @@ -36,7 +36,7 @@ func TestStatsBucket() model.StatsBucket { func StatsBucketWithSpans(s []model.Span) model.StatsBucket { srb := model.NewStatsRawBucket(0, 1e9) for _, s := range s { - srb.HandleSpan(s, defaultEnv, defaultAggregators, nil) + srb.HandleSpan(s, defaultEnv, defaultAggregators, 1.0, nil) } return srb.Export() } diff --git a/model/span.go b/model/span.go index eb4de9ca3..2ad92fa6c 100644 --- a/model/span.go +++ b/model/span.go @@ -5,6 +5,11 @@ import ( "math/rand" ) +const ( + // SpanSampleRateMetricKey is the metric key holding the sample rate + SpanSampleRateMetricKey = "_sample_rate" +) + // Span is the common struct we use to represent a dapper-like span type Span struct { // Mandatory @@ -59,3 +64,14 @@ func NewFlushMarker() Span { func (s *Span) End() int64 { return s.Start + s.Duration } + +// Weight returns the weight of the span as defined for sampling, i.e. the +// inverse of the sampling rate. +func (s *Span) Weight() float64 { + sampleRate, ok := s.Metrics[SpanSampleRateMetricKey] + if !ok || sampleRate <= 0.0 || sampleRate > 1.0 { + return 1.0 + } + + return 1.0 / sampleRate +} diff --git a/model/span_test.go b/model/span_test.go index 4346a3206..0bce86de8 100644 --- a/model/span_test.go +++ b/model/span_test.go @@ -38,3 +38,25 @@ func TestSpanFlushMarker(t *testing.T) { s := NewFlushMarker() assert.True(s.IsFlushMarker()) } + +func TestSpanWeight(t *testing.T) { + assert := assert.New(t) + + span := testSpan() + assert.Equal(1.0, span.Weight()) + + span.Metrics[SpanSampleRateMetricKey] = -1.0 + assert.Equal(1.0, span.Weight()) + + span.Metrics[SpanSampleRateMetricKey] = 0.0 + assert.Equal(1.0, span.Weight()) + + span.Metrics[SpanSampleRateMetricKey] = 0.25 + assert.Equal(4.0, span.Weight()) + + span.Metrics[SpanSampleRateMetricKey] = 1.0 + assert.Equal(1.0, span.Weight()) + + span.Metrics[SpanSampleRateMetricKey] = 1.5 + assert.Equal(1.0, span.Weight()) +} diff --git a/model/stats_test.go b/model/stats_test.go index ace7c1014..e29c0bd34 100644 --- a/model/stats_test.go +++ b/model/stats_test.go @@ -34,10 +34,17 @@ func testTrace() Trace { // B |----------------------| duration: 20 // C |-----| |---| duration: 5+3 return Trace{ - Span{TraceID: 42, SpanID: 42, ParentID: 0, Service: "A", Name: "A.foo", Type: "web", Resource: "α", Start: 0, Duration: 100}, - Span{TraceID: 42, SpanID: 100, ParentID: 42, Service: "B", Name: "B.bar", Type: "web", Resource: "α", Start: 1, Duration: 20}, - Span{TraceID: 42, SpanID: 2000, ParentID: 100, Service: "C", Name: "sql.query", Type: "sql", Resource: "SELECT value FROM table", Start: 2, Duration: 5}, - Span{TraceID: 42, SpanID: 3000, ParentID: 100, Service: "C", Name: "sql.query", Type: "sql", Resource: "SELECT ololololo... value FROM table", Start: 10, Duration: 3, Error: 1}, + Span{TraceID: 42, SpanID: 42, ParentID: 0, Service: "A", + Name: "A.foo", Type: "web", Resource: "α", Start: 0, Duration: 100, + Metrics: map[string]float64{SpanSampleRateMetricKey: 0.5}}, + Span{TraceID: 42, SpanID: 100, ParentID: 42, Service: "B", + Name: "B.bar", Type: "web", Resource: "α", Start: 1, Duration: 20}, + Span{TraceID: 42, SpanID: 2000, ParentID: 100, Service: "C", + Name: "sql.query", Type: "sql", Resource: "SELECT value FROM table", + Start: 2, Duration: 5}, + Span{TraceID: 42, SpanID: 3000, ParentID: 100, Service: "C", + Name: "sql.query", Type: "sql", Resource: "SELECT ololololo... value FROM table", + Start: 10, Duration: 3, Error: 1}, } } @@ -55,7 +62,7 @@ func TestStatsBucketDefault(t *testing.T) { // No custom aggregators only the defaults aggr := []string{} for _, s := range testSpans() { - srb.HandleSpan(s, defaultEnv, aggr, nil) + srb.HandleSpan(s, defaultEnv, aggr, 1.0, nil) } sb := srb.Export() @@ -123,7 +130,7 @@ func TestStatsBucketExtraAggregators(t *testing.T) { // one custom aggregator aggr := []string{"version"} for _, s := range testSpans() { - srb.HandleSpan(s, defaultEnv, aggr, nil) + srb.HandleSpan(s, defaultEnv, aggr, 1.0, nil) } sb := srb.Export() @@ -182,7 +189,7 @@ func TestStatsBucketMany(t *testing.T) { s := templateSpan s.Resource = "α" + strconv.Itoa(i) srbCopy := *srb - srbCopy.HandleSpan(s, defaultEnv, aggr, nil) + srbCopy.HandleSpan(s, defaultEnv, aggr, 1.0, nil) } sb := srb.Export() @@ -215,7 +222,7 @@ func TestStatsBucketSublayers(t *testing.T) { // No custom aggregators only the defaults aggr := []string{} for _, s := range tr { - srb.HandleSpan(s, defaultEnv, aggr, &sublayers) + srb.HandleSpan(s, defaultEnv, aggr, root.Weight(), &sublayers) } sb := srb.Export() @@ -226,18 +233,18 @@ func TestStatsBucketSublayers(t *testing.T) { "A.foo|_sublayers.duration.by_type|env:default,resource:α,service:A,sublayer_type:sql": 8, "A.foo|_sublayers.duration.by_type|env:default,resource:α,service:A,sublayer_type:web": 92, "A.foo|_sublayers.span_count|env:default,resource:α,service:A,:": 4, - "A.foo|duration|env:default,resource:α,service:A": 100, + "A.foo|duration|env:default,resource:α,service:A": 200, "A.foo|errors|env:default,resource:α,service:A": 0, - "A.foo|hits|env:default,resource:α,service:A": 1, + "A.foo|hits|env:default,resource:α,service:A": 2, "B.bar|_sublayers.duration.by_service|env:default,resource:α,service:B,sublayer_service:A": 80, "B.bar|_sublayers.duration.by_service|env:default,resource:α,service:B,sublayer_service:B": 12, "B.bar|_sublayers.duration.by_service|env:default,resource:α,service:B,sublayer_service:C": 8, "B.bar|_sublayers.duration.by_type|env:default,resource:α,service:B,sublayer_type:sql": 8, "B.bar|_sublayers.duration.by_type|env:default,resource:α,service:B,sublayer_type:web": 92, "B.bar|_sublayers.span_count|env:default,resource:α,service:B,:": 4, - "B.bar|duration|env:default,resource:α,service:B": 20, + "B.bar|duration|env:default,resource:α,service:B": 40, "B.bar|errors|env:default,resource:α,service:B": 0, - "B.bar|hits|env:default,resource:α,service:B": 1, + "B.bar|hits|env:default,resource:α,service:B": 2, "sql.query|_sublayers.duration.by_service|env:default,resource:SELECT ololololo... value FROM table,service:C,sublayer_service:A": 80, "sql.query|_sublayers.duration.by_service|env:default,resource:SELECT ololololo... value FROM table,service:C,sublayer_service:B": 12, "sql.query|_sublayers.duration.by_service|env:default,resource:SELECT ololololo... value FROM table,service:C,sublayer_service:C": 8, @@ -250,12 +257,12 @@ func TestStatsBucketSublayers(t *testing.T) { "sql.query|_sublayers.duration.by_type|env:default,resource:SELECT value FROM table,service:C,sublayer_type:web": 92, "sql.query|_sublayers.span_count|env:default,resource:SELECT ololololo... value FROM table,service:C,:": 4, "sql.query|_sublayers.span_count|env:default,resource:SELECT value FROM table,service:C,:": 4, - "sql.query|duration|env:default,resource:SELECT ololololo... value FROM table,service:C": 3, - "sql.query|duration|env:default,resource:SELECT value FROM table,service:C": 5, - "sql.query|errors|env:default,resource:SELECT ololololo... value FROM table,service:C": 1, + "sql.query|duration|env:default,resource:SELECT ololololo... value FROM table,service:C": 6, + "sql.query|duration|env:default,resource:SELECT value FROM table,service:C": 10, + "sql.query|errors|env:default,resource:SELECT ololololo... value FROM table,service:C": 2, "sql.query|errors|env:default,resource:SELECT value FROM table,service:C": 0, - "sql.query|hits|env:default,resource:SELECT ololololo... value FROM table,service:C": 1, - "sql.query|hits|env:default,resource:SELECT value FROM table,service:C": 1, + "sql.query|hits|env:default,resource:SELECT ololololo... value FROM table,service:C": 2, + "sql.query|hits|env:default,resource:SELECT value FROM table,service:C": 2, } assert.Len(sb.Counts, len(expectedCounts), "Missing counts!") @@ -324,7 +331,7 @@ func BenchmarkHandleSpan(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { for _, s := range testSpans() { - srb.HandleSpan(s, defaultEnv, aggr, nil) + srb.HandleSpan(s, defaultEnv, aggr, 1.0, nil) } } } @@ -343,7 +350,7 @@ func BenchmarkHandleSpanSublayers(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { for _, s := range tr { - srb.HandleSpan(s, defaultEnv, aggr, &sublayers) + srb.HandleSpan(s, defaultEnv, aggr, root.Weight(), &sublayers) } } } diff --git a/model/statsraw.go b/model/statsraw.go index 0823f2f40..9f2faa25d 100644 --- a/model/statsraw.go +++ b/model/statsraw.go @@ -12,9 +12,9 @@ import ( type groupedStats struct { tags TagSet - hits int64 - errors int64 - duration int64 + hits float64 + errors float64 + duration float64 durationDistribution *quantile.SliceSummary } @@ -163,7 +163,7 @@ func assembleGrain(b *bytes.Buffer, env, resource, service string, m map[string] } // HandleSpan adds the span to this bucket stats, aggregated with the finest grain matching given aggregators -func (sb *StatsRawBucket) HandleSpan(s Span, env string, aggregators []string, sublayers *[]SublayerValue) { +func (sb *StatsRawBucket) HandleSpan(s Span, env string, aggregators []string, weight float64, sublayers *[]SublayerValue) { if env == "" { panic("env should never be empty") } @@ -179,7 +179,7 @@ func (sb *StatsRawBucket) HandleSpan(s Span, env string, aggregators []string, s } grain, tags := assembleGrain(&sb.keyBuf, env, s.Resource, s.Service, m) - sb.add(s, grain, tags) + sb.add(s, weight, grain, tags) // sublayers - special case if sublayers != nil { @@ -189,7 +189,7 @@ func (sb *StatsRawBucket) HandleSpan(s Span, env string, aggregators []string, s } } -func (sb *StatsRawBucket) add(s Span, aggr string, tags TagSet) { +func (sb *StatsRawBucket) add(s Span, weight float64, aggr string, tags TagSet) { var gs groupedStats var ok bool @@ -198,11 +198,11 @@ func (sb *StatsRawBucket) add(s Span, aggr string, tags TagSet) { gs = newGroupedStats(tags) } - gs.hits++ + gs.hits += weight if s.Error != 0 { - gs.errors++ + gs.errors += weight } - gs.duration += s.Duration + gs.duration += float64(s.Duration) * weight // TODO add for s.Metrics ability to define arbitrary counts and distros, check some config? // alter resolution of duration distro diff --git a/sampler/sampler.go b/sampler/sampler.go index 8092be05e..0b0bba25a 100644 --- a/sampler/sampler.go +++ b/sampler/sampler.go @@ -21,9 +21,6 @@ import ( ) const ( - // SampleRateMetricKey is the metric key holding the sample rate - SampleRateMetricKey = "_sample_rate" - // Sampler parameters not (yet?) configurable defaultDecayPeriod time.Duration = 30 * time.Second defaultSignatureScoreOffset float64 = 1 @@ -154,7 +151,7 @@ func ApplySampleRate(root *model.Span, sampleRate float64) bool { // GetTraceAppliedSampleRate gets the sample rate the sample rate applied earlier in the pipeline. func GetTraceAppliedSampleRate(root *model.Span) float64 { - if rate, ok := root.Metrics[SampleRateMetricKey]; ok { + if rate, ok := root.Metrics[model.SpanSampleRateMetricKey]; ok { return rate } @@ -166,8 +163,5 @@ func SetTraceAppliedSampleRate(root *model.Span, sampleRate float64) { if root.Metrics == nil { root.Metrics = make(map[string]float64) } - if _, ok := root.Metrics[SampleRateMetricKey]; !ok { - root.Metrics[SampleRateMetricKey] = 1.0 - } - root.Metrics[SampleRateMetricKey] = sampleRate + root.Metrics[model.SpanSampleRateMetricKey] = sampleRate }