Skip to content

Commit

Permalink
[apmprometheus] add histogram (#1145)
Browse files Browse the repository at this point in the history
add support for mapping prometheus histograms into elastic histograms
  • Loading branch information
stuartnelson3 authored Nov 2, 2021
1 parent a1133d6 commit ee25156
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ https://github.com/elastic/apm-agent-go/compare/v1.14.0...master[View commits]
- Collect and send span destination service timing statistics about the dropped spans to the apm-server. {pull}1132[#(1132)]
- Experimental support to compress short exit spans into a composite span. Disabled by default. {pull}1134[#(1134)]
- Discard exit spans shorter or equal than `ELASTIC_APM_EXIT_SPAN_MIN_DURATION`. Defaults to `1ms`. {pull}1138[#(1138)]
- module/apmprometheus: add support for mapping prometheus histograms. {pull}1145[#(1145)]
[[release-notes-1.x]]
=== Go Agent version 1.x
Expand Down
7 changes: 7 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ func (m *Metrics) Add(name string, labels []MetricLabel, value float64) {
m.addMetric(name, labels, model.Metric{Value: value})
}

// AddHistogram adds a histogram metric with the given name, labels, counts,
// and values. The labels are expected to be sorted lexicographically, and
// bucket values provided in ascending order.
func (m *Metrics) AddHistogram(name string, labels []MetricLabel, values []float64, counts []uint64) {
m.addMetric(name, labels, model.Metric{Values: values, Counts: counts, Type: "histogram"})
}

func (m *Metrics) addMetric(name string, labels []MetricLabel, metric model.Metric) {
if m.disabled.MatchAny(name) {
return
Expand Down
34 changes: 34 additions & 0 deletions model/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,3 +670,37 @@ func writeHex(w *fastjson.Writer, v []byte) {
w.RawByte(hextable[v&0x0f])
}
}

// MarshalFastJSON writes the JSON representation of v to w.
func (v *Metric) MarshalFastJSON(w *fastjson.Writer) error {
w.RawByte('{')
if len(v.Counts) > 0 {
w.RawString("\"values\":")
w.RawByte('[')
for i, v := range v.Values {
if i != 0 {
w.RawByte(',')
}
w.Float64(v)
}
w.RawByte(']')
w.RawString(",\"counts\":")
w.RawByte('[')
for i, v := range v.Counts {
if i != 0 {
w.RawByte(',')
}
w.Uint64(v)
}
w.RawByte(']')
} else {
w.RawString("\"value\":")
w.Float64(v.Value)
}
if v.Type != "" {
w.RawString(",\"type\":")
w.String(v.Type)
}
w.RawByte('}')
return nil
}
18 changes: 10 additions & 8 deletions model/marshal_fastjson.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions model/marshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,28 @@ func TestMarshalCloud(t *testing.T) {
assert.Equal(t, expect, decoded)
}

func TestMarshalMetric(t *testing.T) {
histogram := &model.Metric{
Type: "histogram",
Values: []float64{0.05, 0.1, 0.5, 1, 5},
Counts: []uint64{1, 1, 5, 10, 5},
}

var w fastjson.Writer
histogram.MarshalFastJSON(&w)
expect := `{"values":[0.05,0.1,0.5,1,5],"counts":[1,1,5,10,5],"type":"histogram"}`

assert.Equal(t, expect, string(w.Bytes()))

m := &model.Metric{Value: 1}

w.Reset()
m.MarshalFastJSON(&w)
expect = `{"value":1}`

assert.Equal(t, expect, string(w.Bytes()))
}

func fakeTransaction() model.Transaction {
return model.Transaction{
TraceID: model.TraceID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
Expand Down
5 changes: 5 additions & 0 deletions model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,11 @@ type MetricsSpan struct {

// Metric holds metric values.
type Metric struct {
Type string `json:"type,omitempty"`
// Value holds the metric value.
Value float64 `json:"value"`
// Buckets holds the metric bucket values.
Values []float64 `json:"values,omitempty"`
// Count holds the metric observation count for the bucket.
Counts []uint64 `json:"counts,omitempty"`
}
79 changes: 78 additions & 1 deletion module/apmprometheus/gatherer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,85 @@ func (g gatherer) GatherMetrics(ctx context.Context, out *apm.Metrics) error {
out.Add(name+".percentile."+strconv.Itoa(p), labels, q.GetValue())
}
}
case dto.MetricType_HISTOGRAM:
// For the bucket values, we follow the approach described by Prometheus's
// histogram_quantile function (https://prometheus.io/docs/prometheus/latest/querying/functions/#histogram_quantile)
// to achieve consistent percentile aggregation results:
//
// "The histogram_quantile() function interpolates quantile values by assuming a linear
// distribution within a bucket. (...) If a quantile is located in the highest bucket,
// the upper bound of the second highest bucket is returned. A lower limit of the lowest
// bucket is assumed to be 0 if the upper bound of that bucket is greater than 0. In that
// case, the usual linear interpolation is applied within that bucket. Otherwise, the upper
// bound of the lowest bucket is returned for quantiles located in the lowest bucket."
for _, m := range mf.GetMetric() {
h := m.GetHistogram()
// Total count for all values in this
// histogram. We want the per value count.
totalCount := h.GetSampleCount()
if totalCount == 0 {
continue
}
labels := makeLabels(m.GetLabel())
values := h.GetBucket()
// The +Inf bucket isn't encoded into the
// protobuf representation, but observations
// that fall within it are reflected in the
// histogram's SampleCount.
// We compare the totalCount to the bucketCount
// (sum of all CumulativeCount()s per bucket)
// to infer if an additional midpoint + count
// need to be added to their respective slices.
var bucketCount uint64
valuesLen := len(values)
midpoints := make([]float64, 0, valuesLen)
counts := make([]uint64, 0, valuesLen)
for i, b := range values {
count := b.GetCumulativeCount()
le := b.GetUpperBound()
if i == 0 {
if le > 0 {
le /= 2
}
} else {
// apm-server expects non-cumulative
// counts. prometheus counts each
// bucket cumulatively, ie. bucketN
// contains all counts for bucketN and
// all counts in preceding values. To
// get the current bucket's count we
// subtract bucketN-1 from bucketN,
// when N>0.
count = count - values[i-1].GetCumulativeCount()
le = values[i-1].GetUpperBound() + (le-values[i-1].GetUpperBound())/2.0
}
// we are excluding zero-count
// prometheus buckets.
// the cumulative count may have
// initially been non-zero, but when we
// subtract the preceding bucket, it
// may end up having a zero count.
if count == 0 {
continue
}
bucketCount += count
counts = append(counts, count)
midpoints = append(midpoints, le)
}
// Check if there were observations that fell
// outside of the defined histogram buckets, so
// we need to modify the current final bucket,
// and add an additional bucket with these
// observations.
if infBucketCount := totalCount - bucketCount; infBucketCount > 0 && valuesLen > 0 {
// Set the midpoint for the +Inf bucket
// to be the final defined bucket value.
midpoints = append(midpoints, values[valuesLen-1].GetUpperBound())
counts = append(counts, infBucketCount)
}
out.AddHistogram(name, labels, midpoints, counts)
}
default:
// TODO(axw) MetricType_HISTOGRAM
}
}
return nil
Expand Down
126 changes: 126 additions & 0 deletions module/apmprometheus/gatherer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,132 @@ func TestLabels(t *testing.T) {
}}, metrics)
}

func TestHistogram(t *testing.T) {
r := prometheus.NewRegistry()
h := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "histogram",
Help: ".",
Buckets: []float64{1, 3, 5, 10, 15},
},
[]string{"code", "method"},
)
r.MustRegister(h)

h.WithLabelValues("200", "GET").Observe(3.4)
h.WithLabelValues("200", "GET").Observe(3.4)
h.WithLabelValues("200", "GET").Observe(3.4)

h.WithLabelValues("302", "GET").Observe(5.5)
h.WithLabelValues("302", "GET").Observe(5.5)
h.WithLabelValues("302", "GET").Observe(5.5)

h.WithLabelValues("302", "GET").Observe(11.2)
h.WithLabelValues("302", "GET").Observe(11.2)
h.WithLabelValues("302", "GET").Observe(11.2)

g := apmprometheus.Wrap(r)
metrics := gatherMetrics(g)[1:]

assert.Equal(t, []model.Metrics{{
Labels: model.StringMap{
{Key: "code", Value: "200"},
{Key: "method", Value: "GET"},
},
Samples: map[string]model.Metric{
"histogram": {
Type: "histogram",
Values: []float64{4},
Counts: []uint64{3},
},
},
}, {
Labels: model.StringMap{
{Key: "code", Value: "302"},
{Key: "method", Value: "GET"},
},
Samples: map[string]model.Metric{
"histogram": {
Type: "histogram",
Values: []float64{7.5, 12.5},
Counts: []uint64{3, 3},
},
},
}}, metrics)
}

func TestHistogramInfBucket(t *testing.T) {
r := prometheus.NewRegistry()
h := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "histogram",
Help: ".",
Buckets: []float64{1, 3, 5, 10, 15},
},
[]string{"code", "method"},
)
r.MustRegister(h)

h.WithLabelValues("302", "GET").Observe(11.2)
h.WithLabelValues("302", "GET").Observe(11.2)
h.WithLabelValues("302", "GET").Observe(11.2)

// These observations fall outside the defined bucket range. They
// should be recorded as being in the final bucket.
h.WithLabelValues("302", "GET").Observe(17.2)
h.WithLabelValues("302", "GET").Observe(17.2)

g := apmprometheus.Wrap(r)
metrics := gatherMetrics(g)[1:]

assert.Equal(t, []model.Metrics{{
Labels: model.StringMap{
{Key: "code", Value: "302"},
{Key: "method", Value: "GET"},
},
Samples: map[string]model.Metric{
"histogram": {
Type: "histogram",
Values: []float64{12.5, 15},
Counts: []uint64{3, 2},
},
},
}}, metrics)
}

func TestHistogramNegativeValues(t *testing.T) {
r := prometheus.NewRegistry()
h := prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "histogram",
Help: ".",
Buckets: []float64{-1, 0, 1},
},
)
r.MustRegister(h)

h.Observe(-1.4)
h.Observe(-0.4)

g := apmprometheus.Wrap(r)
metrics := gatherMetrics(g)
for name := range metrics[0].Samples {
if !strings.HasPrefix(name, "histogram") {
delete(metrics[0].Samples, name)
}
}

assert.Equal(t, []model.Metrics{{
Samples: map[string]model.Metric{
"histogram": {
Type: "histogram",
Values: []float64{-1, -0.5},
Counts: []uint64{1, 1},
},
},
}}, metrics)
}

func gatherMetrics(g apm.MetricsGatherer) []model.Metrics {
tracer := apmtest.NewRecordingTracer()
defer tracer.Close()
Expand Down

0 comments on commit ee25156

Please sign in to comment.