Skip to content

Commit

Permalink
Histogram aggregator initial state (fix #735) (#736)
Browse files Browse the repository at this point in the history
* Add a test

* Add comments and description options

* Another test

* Undo buffer re-use

* Mod tidy

* Precommit

* Again

* Copyright

* Undo rename
  • Loading branch information
jmacd authored May 18, 2020
1 parent 6bc14ff commit 2dee676
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 30 deletions.
97 changes: 97 additions & 0 deletions exporters/metric/prometheus/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package prometheus_test

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"

"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/metric/prometheus"
sdk "go.opentelemetry.io/otel/sdk/metric"
integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
)

// This test demonstrates that it is relatively difficult to setup a
// Prometheus export pipeline:
//
// 1. The default boundaries are difficult to pass, should be []float instead of []metric.Number
// 2. The push controller doesn't make sense b/c Prometheus is pull-bsaed
//
// TODO: Address these issues; add Resources to the test.

func ExampleNewExportPipeline() {
// Create a meter
selector := simple.NewWithHistogramDistribution(nil)
exporter, err := prometheus.NewRawExporter(prometheus.Config{})
if err != nil {
panic(err)
}
integrator := integrator.New(selector, true)
meterImpl := sdk.NewAccumulator(integrator)
meter := metric.WrapMeterImpl(meterImpl, "example")

ctx := context.Background()

// Use two instruments
counter := metric.Must(meter).NewInt64Counter(
"a.counter",
metric.WithDescription("Counts things"),
)
recorder := metric.Must(meter).NewInt64ValueRecorder(
"a.valuerecorder",
metric.WithDescription("Records values"),
)

counter.Add(ctx, 100, kv.String("key", "value"))
recorder.Record(ctx, 100, kv.String("key", "value"))

// Simulate a push
meterImpl.Collect(ctx)
err = exporter.Export(ctx, nil, integrator.CheckpointSet())
if err != nil {
panic(err)
}

// GET the HTTP endpoint
var input bytes.Buffer
resp := httptest.NewRecorder()
req, err := http.NewRequest("GET", "/", &input)
if err != nil {
panic(err)
}
exporter.ServeHTTP(resp, req)
data, err := ioutil.ReadAll(resp.Result().Body)
if err != nil {
panic(err)
}
fmt.Print(string(data))

// Output:
// # HELP a_counter Counts things
// # TYPE a_counter counter
// a_counter{key="value"} 100
// # HELP a_valuerecorder Records values
// # TYPE a_valuerecorder histogram
// a_valuerecorder_bucket{key="value",le="+Inf"} 1
// a_valuerecorder_sum{key="value"} 100
// a_valuerecorder_count{key="value"} 1
}
43 changes: 21 additions & 22 deletions sdk/metric/aggregator/histogram/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ import (
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
)

// Note: This code uses a Mutex to govern access to the exclusive
// aggregator state. This is in contrast to a lock-free approach
// (as in the Go prometheus client) that was reverted here:
// https://github.com/open-telemetry/opentelemetry-go/pull/669

type (
// Aggregator observe events and counts them in pre-determined buckets.
// It also calculates the sum and count of all events.
Expand All @@ -39,10 +44,9 @@ type (
// the sum and counts for all observed values and
// the less than equal bucket count for the pre-determined boundaries.
state struct {
// all fields have to be aligned for 64-bit atomic operations.
buckets aggregator.Buckets
count metric.Number
sum metric.Number
bucketCounts []metric.Number
count metric.Number
sum metric.Number
}
)

Expand Down Expand Up @@ -71,17 +75,12 @@ func New(desc *metric.Descriptor, boundaries []metric.Number) *Aggregator {
sort.Sort(&sortedBoundaries)
boundaries = sortedBoundaries.numbers

agg := Aggregator{
return &Aggregator{
kind: desc.NumberKind(),
boundaries: boundaries,
current: state{
buckets: aggregator.Buckets{
Boundaries: boundaries,
Counts: make([]metric.Number, len(boundaries)+1),
},
},
current: emptyState(boundaries),
checkpoint: emptyState(boundaries),
}
return &agg
}

// Sum returns the sum of all values in the checkpoint.
Expand All @@ -102,7 +101,10 @@ func (c *Aggregator) Count() (int64, error) {
func (c *Aggregator) Histogram() (aggregator.Buckets, error) {
c.lock.Lock()
defer c.lock.Unlock()
return c.checkpoint.buckets, nil
return aggregator.Buckets{
Boundaries: c.boundaries,
Counts: c.checkpoint.bucketCounts,
}, nil
}

// Checkpoint saves the current state and resets the current state to
Expand All @@ -111,16 +113,13 @@ func (c *Aggregator) Histogram() (aggregator.Buckets, error) {
// other.
func (c *Aggregator) Checkpoint(ctx context.Context, desc *metric.Descriptor) {
c.lock.Lock()
c.checkpoint, c.current = c.current, c.emptyState()
c.checkpoint, c.current = c.current, emptyState(c.boundaries)
c.lock.Unlock()
}

func (c *Aggregator) emptyState() state {
func emptyState(boundaries []metric.Number) state {
return state{
buckets: aggregator.Buckets{
Boundaries: c.boundaries,
Counts: make([]metric.Number, len(c.boundaries)+1),
},
bucketCounts: make([]metric.Number, len(boundaries)+1),
}
}

Expand All @@ -141,7 +140,7 @@ func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metri

c.current.count.AddInt64(1)
c.current.sum.AddNumber(kind, number)
c.current.buckets.Counts[bucketID].AddUint64(1)
c.current.bucketCounts[bucketID].AddUint64(1)

return nil
}
Expand All @@ -156,8 +155,8 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *metric.Descriptor) error
c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum)
c.checkpoint.count.AddNumber(metric.Uint64NumberKind, o.checkpoint.count)

for i := 0; i < len(c.checkpoint.buckets.Counts); i++ {
c.checkpoint.buckets.Counts[i].AddNumber(metric.Uint64NumberKind, o.checkpoint.buckets.Counts[i])
for i := 0; i < len(c.checkpoint.bucketCounts); i++ {
c.checkpoint.bucketCounts[i].AddNumber(metric.Uint64NumberKind, o.checkpoint.bucketCounts[i])
}
return nil
}
Expand Down
29 changes: 21 additions & 8 deletions sdk/metric/aggregator/histogram/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,28 @@ func histogram(t *testing.T, profile test.Profile, policy policy) {
require.Equal(t, all.Count(), count, "Same count -"+policy.name)
require.Nil(t, err)

require.Equal(t, len(agg.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
require.Equal(t, len(agg.checkpoint.bucketCounts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")

counts := calcBuckets(all.Points(), profile)
for i, v := range counts {
bCount := agg.checkpoint.buckets.Counts[i].AsUint64()
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg.checkpoint.buckets.Counts)
bCount := agg.checkpoint.bucketCounts[i].AsUint64()
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg.checkpoint.bucketCounts)
}
}

func TestHistogramInitial(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)

agg := New(descriptor, boundaries[profile.NumberKind])
buckets, err := agg.Histogram()

require.NoError(t, err)
require.Equal(t, len(buckets.Counts), len(boundaries[profile.NumberKind])+1)
require.Equal(t, len(buckets.Boundaries), len(boundaries[profile.NumberKind]))
})
}

func TestHistogramMerge(t *testing.T) {
ctx := context.Background()

Expand Down Expand Up @@ -164,12 +177,12 @@ func TestHistogramMerge(t *testing.T) {
require.Equal(t, all.Count(), count, "Same count - absolute")
require.Nil(t, err)

require.Equal(t, len(agg1.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
require.Equal(t, len(agg1.checkpoint.bucketCounts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")

counts := calcBuckets(all.Points(), profile)
for i, v := range counts {
bCount := agg1.checkpoint.buckets.Counts[i].AsUint64()
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg1.checkpoint.buckets.Counts)
bCount := agg1.checkpoint.bucketCounts[i].AsUint64()
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg1.checkpoint.bucketCounts)
}
})
}
Expand All @@ -191,8 +204,8 @@ func TestHistogramNotSet(t *testing.T) {
require.Equal(t, int64(0), count, "Empty checkpoint count = 0")
require.Nil(t, err)

require.Equal(t, len(agg.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
for i, bCount := range agg.checkpoint.buckets.Counts {
require.Equal(t, len(agg.checkpoint.bucketCounts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
for i, bCount := range agg.checkpoint.bucketCounts {
require.Equal(t, uint64(0), bCount.AsUint64(), "Bucket #%d must have 0 observed values", i)
}
})
Expand Down

0 comments on commit 2dee676

Please sign in to comment.