Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

weight spans using their sample rate #226

Merged
merged 1 commit into from
Feb 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions agent/concentrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,16 @@ func (c *Concentrator) Add(t processedTrace) {
c.buckets[btime] = b
}

weight := 1.0
if t.Root != nil {
weight = t.Root.Weight()
}

if t.Root != nil && s.SpanID == t.Root.SpanID && t.Sublayers != nil {
// handle sublayers
b.HandleSpan(s, t.Env, c.aggregators, &t.Sublayers)
b.HandleSpan(s, t.Env, c.aggregators, weight, &t.Sublayers)
} else {
b.HandleSpan(s, t.Env, c.aggregators, nil)
b.HandleSpan(s, t.Env, c.aggregators, weight, nil)
}
}

Expand Down
3 changes: 2 additions & 1 deletion agent/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ func BenchmarkHandleSpanRandom(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
trace := fixtures.RandomTrace(10, 8)
root := trace.GetRoot()
for _, span := range trace {
sb.HandleSpan(span, defaultEnv, aggr, nil)
sb.HandleSpan(span, defaultEnv, aggr, root.Weight(), nil)
}
}
}
4 changes: 2 additions & 2 deletions fixtures/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const defaultEnv = "none"
// TestStatsBucket returns a fixed stats bucket to be used in unit tests
func TestStatsBucket() model.StatsBucket {
srb := model.NewStatsRawBucket(0, 1e9)
srb.HandleSpan(TestSpan(), defaultEnv, defaultAggregators, nil)
srb.HandleSpan(TestSpan(), defaultEnv, defaultAggregators, 1.0, nil)
sb := srb.Export()

// marshalling then unmarshalling data to:
Expand All @@ -36,7 +36,7 @@ func TestStatsBucket() model.StatsBucket {
func StatsBucketWithSpans(s []model.Span) model.StatsBucket {
srb := model.NewStatsRawBucket(0, 1e9)
for _, s := range s {
srb.HandleSpan(s, defaultEnv, defaultAggregators, nil)
srb.HandleSpan(s, defaultEnv, defaultAggregators, 1.0, nil)
}
return srb.Export()
}
Expand Down
16 changes: 16 additions & 0 deletions model/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import (
"math/rand"
)

const (
// SpanSampleRateMetricKey is the metric key holding the sample rate
SpanSampleRateMetricKey = "_sample_rate"
)

// Span is the common struct we use to represent a dapper-like span
type Span struct {
// Mandatory
Expand Down Expand Up @@ -59,3 +64,14 @@ func NewFlushMarker() Span {
func (s *Span) End() int64 {
return s.Start + s.Duration
}

// Weight returns the weight of the span as defined for sampling, i.e. the
// inverse of the sampling rate.
func (s *Span) Weight() float64 {
sampleRate, ok := s.Metrics[SpanSampleRateMetricKey]
if !ok || sampleRate <= 0.0 || sampleRate > 1.0 {
return 1.0
}

return 1.0 / sampleRate
}
22 changes: 22 additions & 0 deletions model/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,25 @@ func TestSpanFlushMarker(t *testing.T) {
s := NewFlushMarker()
assert.True(s.IsFlushMarker())
}

func TestSpanWeight(t *testing.T) {
assert := assert.New(t)

span := testSpan()
assert.Equal(1.0, span.Weight())

span.Metrics[SpanSampleRateMetricKey] = -1.0
assert.Equal(1.0, span.Weight())

span.Metrics[SpanSampleRateMetricKey] = 0.0
assert.Equal(1.0, span.Weight())

span.Metrics[SpanSampleRateMetricKey] = 0.25
assert.Equal(4.0, span.Weight())

span.Metrics[SpanSampleRateMetricKey] = 1.0
assert.Equal(1.0, span.Weight())

span.Metrics[SpanSampleRateMetricKey] = 1.5
assert.Equal(1.0, span.Weight())
}
45 changes: 26 additions & 19 deletions model/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,17 @@ func testTrace() Trace {
// B |----------------------| duration: 20
// C |-----| |---| duration: 5+3
return Trace{
Span{TraceID: 42, SpanID: 42, ParentID: 0, Service: "A", Name: "A.foo", Type: "web", Resource: "α", Start: 0, Duration: 100},
Span{TraceID: 42, SpanID: 100, ParentID: 42, Service: "B", Name: "B.bar", Type: "web", Resource: "α", Start: 1, Duration: 20},
Span{TraceID: 42, SpanID: 2000, ParentID: 100, Service: "C", Name: "sql.query", Type: "sql", Resource: "SELECT value FROM table", Start: 2, Duration: 5},
Span{TraceID: 42, SpanID: 3000, ParentID: 100, Service: "C", Name: "sql.query", Type: "sql", Resource: "SELECT ololololo... value FROM table", Start: 10, Duration: 3, Error: 1},
Span{TraceID: 42, SpanID: 42, ParentID: 0, Service: "A",
Name: "A.foo", Type: "web", Resource: "α", Start: 0, Duration: 100,
Metrics: map[string]float64{SpanSampleRateMetricKey: 0.5}},
Span{TraceID: 42, SpanID: 100, ParentID: 42, Service: "B",
Name: "B.bar", Type: "web", Resource: "α", Start: 1, Duration: 20},
Span{TraceID: 42, SpanID: 2000, ParentID: 100, Service: "C",
Name: "sql.query", Type: "sql", Resource: "SELECT value FROM table",
Start: 2, Duration: 5},
Span{TraceID: 42, SpanID: 3000, ParentID: 100, Service: "C",
Name: "sql.query", Type: "sql", Resource: "SELECT ololololo... value FROM table",
Start: 10, Duration: 3, Error: 1},
}
}

Expand All @@ -55,7 +62,7 @@ func TestStatsBucketDefault(t *testing.T) {
// No custom aggregators only the defaults
aggr := []string{}
for _, s := range testSpans() {
srb.HandleSpan(s, defaultEnv, aggr, nil)
srb.HandleSpan(s, defaultEnv, aggr, 1.0, nil)
}
sb := srb.Export()

Expand Down Expand Up @@ -123,7 +130,7 @@ func TestStatsBucketExtraAggregators(t *testing.T) {
// one custom aggregator
aggr := []string{"version"}
for _, s := range testSpans() {
srb.HandleSpan(s, defaultEnv, aggr, nil)
srb.HandleSpan(s, defaultEnv, aggr, 1.0, nil)
}
sb := srb.Export()

Expand Down Expand Up @@ -182,7 +189,7 @@ func TestStatsBucketMany(t *testing.T) {
s := templateSpan
s.Resource = "α" + strconv.Itoa(i)
srbCopy := *srb
srbCopy.HandleSpan(s, defaultEnv, aggr, nil)
srbCopy.HandleSpan(s, defaultEnv, aggr, 1.0, nil)
}
sb := srb.Export()

Expand Down Expand Up @@ -215,7 +222,7 @@ func TestStatsBucketSublayers(t *testing.T) {
// No custom aggregators only the defaults
aggr := []string{}
for _, s := range tr {
srb.HandleSpan(s, defaultEnv, aggr, &sublayers)
srb.HandleSpan(s, defaultEnv, aggr, root.Weight(), &sublayers)
}
sb := srb.Export()

Expand All @@ -226,18 +233,18 @@ func TestStatsBucketSublayers(t *testing.T) {
"A.foo|_sublayers.duration.by_type|env:default,resource:α,service:A,sublayer_type:sql": 8,
"A.foo|_sublayers.duration.by_type|env:default,resource:α,service:A,sublayer_type:web": 92,
"A.foo|_sublayers.span_count|env:default,resource:α,service:A,:": 4,
"A.foo|duration|env:default,resource:α,service:A": 100,
"A.foo|duration|env:default,resource:α,service:A": 200,
"A.foo|errors|env:default,resource:α,service:A": 0,
"A.foo|hits|env:default,resource:α,service:A": 1,
"A.foo|hits|env:default,resource:α,service:A": 2,
"B.bar|_sublayers.duration.by_service|env:default,resource:α,service:B,sublayer_service:A": 80,
"B.bar|_sublayers.duration.by_service|env:default,resource:α,service:B,sublayer_service:B": 12,
"B.bar|_sublayers.duration.by_service|env:default,resource:α,service:B,sublayer_service:C": 8,
"B.bar|_sublayers.duration.by_type|env:default,resource:α,service:B,sublayer_type:sql": 8,
"B.bar|_sublayers.duration.by_type|env:default,resource:α,service:B,sublayer_type:web": 92,
"B.bar|_sublayers.span_count|env:default,resource:α,service:B,:": 4,
"B.bar|duration|env:default,resource:α,service:B": 20,
"B.bar|duration|env:default,resource:α,service:B": 40,
"B.bar|errors|env:default,resource:α,service:B": 0,
"B.bar|hits|env:default,resource:α,service:B": 1,
"B.bar|hits|env:default,resource:α,service:B": 2,
"sql.query|_sublayers.duration.by_service|env:default,resource:SELECT ololololo... value FROM table,service:C,sublayer_service:A": 80,
"sql.query|_sublayers.duration.by_service|env:default,resource:SELECT ololololo... value FROM table,service:C,sublayer_service:B": 12,
"sql.query|_sublayers.duration.by_service|env:default,resource:SELECT ololololo... value FROM table,service:C,sublayer_service:C": 8,
Expand All @@ -250,12 +257,12 @@ func TestStatsBucketSublayers(t *testing.T) {
"sql.query|_sublayers.duration.by_type|env:default,resource:SELECT value FROM table,service:C,sublayer_type:web": 92,
"sql.query|_sublayers.span_count|env:default,resource:SELECT ololololo... value FROM table,service:C,:": 4,
"sql.query|_sublayers.span_count|env:default,resource:SELECT value FROM table,service:C,:": 4,
"sql.query|duration|env:default,resource:SELECT ololololo... value FROM table,service:C": 3,
"sql.query|duration|env:default,resource:SELECT value FROM table,service:C": 5,
"sql.query|errors|env:default,resource:SELECT ololololo... value FROM table,service:C": 1,
"sql.query|duration|env:default,resource:SELECT ololololo... value FROM table,service:C": 6,
"sql.query|duration|env:default,resource:SELECT value FROM table,service:C": 10,
"sql.query|errors|env:default,resource:SELECT ololololo... value FROM table,service:C": 2,
"sql.query|errors|env:default,resource:SELECT value FROM table,service:C": 0,
"sql.query|hits|env:default,resource:SELECT ololololo... value FROM table,service:C": 1,
"sql.query|hits|env:default,resource:SELECT value FROM table,service:C": 1,
"sql.query|hits|env:default,resource:SELECT ololololo... value FROM table,service:C": 2,
"sql.query|hits|env:default,resource:SELECT value FROM table,service:C": 2,
}

assert.Len(sb.Counts, len(expectedCounts), "Missing counts!")
Expand Down Expand Up @@ -324,7 +331,7 @@ func BenchmarkHandleSpan(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for _, s := range testSpans() {
srb.HandleSpan(s, defaultEnv, aggr, nil)
srb.HandleSpan(s, defaultEnv, aggr, 1.0, nil)
}
}
}
Expand All @@ -343,7 +350,7 @@ func BenchmarkHandleSpanSublayers(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for _, s := range tr {
srb.HandleSpan(s, defaultEnv, aggr, &sublayers)
srb.HandleSpan(s, defaultEnv, aggr, root.Weight(), &sublayers)
}
}
}
Expand Down
18 changes: 9 additions & 9 deletions model/statsraw.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (

type groupedStats struct {
tags TagSet
hits int64
errors int64
duration int64
hits float64

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the move from int64 to float64 going to impact anything in the serialization/payload to the API? Is the API compatible?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this in an internal format only that we then export to the API payload format.

Copy link
Author

@galdor galdor Feb 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These counters are for internal computations in StatsRawBucket. (*StatsRawBucket).Export() returns a StatsBucket which already stores data using Count structures which use float64 values.

errors float64
duration float64

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are ns here, we can stick to in64 without any significant lose of precision.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can use float64 without losing precision, since it will be exported to a float64 value anyway (see next comment).

durationDistribution *quantile.SliceSummary
}

Expand Down Expand Up @@ -163,7 +163,7 @@ func assembleGrain(b *bytes.Buffer, env, resource, service string, m map[string]
}

// HandleSpan adds the span to this bucket stats, aggregated with the finest grain matching given aggregators
func (sb *StatsRawBucket) HandleSpan(s Span, env string, aggregators []string, sublayers *[]SublayerValue) {
func (sb *StatsRawBucket) HandleSpan(s Span, env string, aggregators []string, weight float64, sublayers *[]SublayerValue) {
if env == "" {
panic("env should never be empty")
}
Expand All @@ -179,7 +179,7 @@ func (sb *StatsRawBucket) HandleSpan(s Span, env string, aggregators []string, s
}

grain, tags := assembleGrain(&sb.keyBuf, env, s.Resource, s.Service, m)
sb.add(s, grain, tags)
sb.add(s, weight, grain, tags)

// sublayers - special case
if sublayers != nil {
Expand All @@ -189,7 +189,7 @@ func (sb *StatsRawBucket) HandleSpan(s Span, env string, aggregators []string, s
}
}

func (sb *StatsRawBucket) add(s Span, aggr string, tags TagSet) {
func (sb *StatsRawBucket) add(s Span, weight float64, aggr string, tags TagSet) {
var gs groupedStats
var ok bool

Expand All @@ -198,11 +198,11 @@ func (sb *StatsRawBucket) add(s Span, aggr string, tags TagSet) {
gs = newGroupedStats(tags)
}

gs.hits++
gs.hits += weight
if s.Error != 0 {
gs.errors++
gs.errors += weight
}
gs.duration += s.Duration
gs.duration += float64(s.Duration) * weight

// TODO add for s.Metrics ability to define arbitrary counts and distros, check some config?
// alter resolution of duration distro
Expand Down
10 changes: 2 additions & 8 deletions sampler/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ import (
)

const (
// SampleRateMetricKey is the metric key holding the sample rate
SampleRateMetricKey = "_sample_rate"

// Sampler parameters not (yet?) configurable
defaultDecayPeriod time.Duration = 30 * time.Second
defaultSignatureScoreOffset float64 = 1
Expand Down Expand Up @@ -154,7 +151,7 @@ func ApplySampleRate(root *model.Span, sampleRate float64) bool {

// GetTraceAppliedSampleRate gets the sample rate the sample rate applied earlier in the pipeline.
func GetTraceAppliedSampleRate(root *model.Span) float64 {
if rate, ok := root.Metrics[SampleRateMetricKey]; ok {
if rate, ok := root.Metrics[model.SpanSampleRateMetricKey]; ok {
return rate
}

Expand All @@ -166,8 +163,5 @@ func SetTraceAppliedSampleRate(root *model.Span, sampleRate float64) {
if root.Metrics == nil {
root.Metrics = make(map[string]float64)
}
if _, ok := root.Metrics[SampleRateMetricKey]; !ok {
root.Metrics[SampleRateMetricKey] = 1.0
}
root.Metrics[SampleRateMetricKey] = sampleRate
root.Metrics[model.SpanSampleRateMetricKey] = sampleRate
}