From 2dee67652aec812903a6db53945f5c8a54d6cc47 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Mon, 18 May 2020 09:44:33 -0700 Subject: [PATCH] Histogram aggregator initial state (fix #735) (#736) * Add a test * Add comments and description options * Another test * Undo buffer re-use * Mod tidy * Precommit * Again * Copyright * Undo rename --- exporters/metric/prometheus/example_test.go | 97 +++++++++++++++++++ sdk/metric/aggregator/histogram/histogram.go | 43 ++++---- .../aggregator/histogram/histogram_test.go | 29 ++++-- 3 files changed, 139 insertions(+), 30 deletions(-) create mode 100644 exporters/metric/prometheus/example_test.go diff --git a/exporters/metric/prometheus/example_test.go b/exporters/metric/prometheus/example_test.go new file mode 100644 index 00000000000..81e741a3815 --- /dev/null +++ b/exporters/metric/prometheus/example_test.go @@ -0,0 +1,97 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheus_test + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + + "go.opentelemetry.io/otel/api/kv" + "go.opentelemetry.io/otel/api/metric" + "go.opentelemetry.io/otel/exporters/metric/prometheus" + sdk "go.opentelemetry.io/otel/sdk/metric" + integrator "go.opentelemetry.io/otel/sdk/metric/integrator/simple" + "go.opentelemetry.io/otel/sdk/metric/selector/simple" +) + +// This test demonstrates that it is relatively difficult to setup a +// Prometheus export pipeline: +// +// 1. The default boundaries are difficult to pass, should be []float instead of []metric.Number +// 2. The push controller doesn't make sense b/c Prometheus is pull-bsaed +// +// TODO: Address these issues; add Resources to the test. + +func ExampleNewExportPipeline() { + // Create a meter + selector := simple.NewWithHistogramDistribution(nil) + exporter, err := prometheus.NewRawExporter(prometheus.Config{}) + if err != nil { + panic(err) + } + integrator := integrator.New(selector, true) + meterImpl := sdk.NewAccumulator(integrator) + meter := metric.WrapMeterImpl(meterImpl, "example") + + ctx := context.Background() + + // Use two instruments + counter := metric.Must(meter).NewInt64Counter( + "a.counter", + metric.WithDescription("Counts things"), + ) + recorder := metric.Must(meter).NewInt64ValueRecorder( + "a.valuerecorder", + metric.WithDescription("Records values"), + ) + + counter.Add(ctx, 100, kv.String("key", "value")) + recorder.Record(ctx, 100, kv.String("key", "value")) + + // Simulate a push + meterImpl.Collect(ctx) + err = exporter.Export(ctx, nil, integrator.CheckpointSet()) + if err != nil { + panic(err) + } + + // GET the HTTP endpoint + var input bytes.Buffer + resp := httptest.NewRecorder() + req, err := http.NewRequest("GET", "/", &input) + if err != nil { + panic(err) + } + exporter.ServeHTTP(resp, req) + data, err := ioutil.ReadAll(resp.Result().Body) + if err != nil { + panic(err) + } + fmt.Print(string(data)) + + // Output: + // # HELP a_counter Counts things + // # TYPE a_counter counter + // a_counter{key="value"} 100 + // # HELP a_valuerecorder Records values + // # TYPE a_valuerecorder histogram + // a_valuerecorder_bucket{key="value",le="+Inf"} 1 + // a_valuerecorder_sum{key="value"} 100 + // a_valuerecorder_count{key="value"} 1 +} diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go index 6566dab91ce..ccb0c2d3c30 100644 --- a/sdk/metric/aggregator/histogram/histogram.go +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -24,6 +24,11 @@ import ( "go.opentelemetry.io/otel/sdk/export/metric/aggregator" ) +// Note: This code uses a Mutex to govern access to the exclusive +// aggregator state. This is in contrast to a lock-free approach +// (as in the Go prometheus client) that was reverted here: +// https://github.com/open-telemetry/opentelemetry-go/pull/669 + type ( // Aggregator observe events and counts them in pre-determined buckets. // It also calculates the sum and count of all events. @@ -39,10 +44,9 @@ type ( // the sum and counts for all observed values and // the less than equal bucket count for the pre-determined boundaries. state struct { - // all fields have to be aligned for 64-bit atomic operations. - buckets aggregator.Buckets - count metric.Number - sum metric.Number + bucketCounts []metric.Number + count metric.Number + sum metric.Number } ) @@ -71,17 +75,12 @@ func New(desc *metric.Descriptor, boundaries []metric.Number) *Aggregator { sort.Sort(&sortedBoundaries) boundaries = sortedBoundaries.numbers - agg := Aggregator{ + return &Aggregator{ kind: desc.NumberKind(), boundaries: boundaries, - current: state{ - buckets: aggregator.Buckets{ - Boundaries: boundaries, - Counts: make([]metric.Number, len(boundaries)+1), - }, - }, + current: emptyState(boundaries), + checkpoint: emptyState(boundaries), } - return &agg } // Sum returns the sum of all values in the checkpoint. @@ -102,7 +101,10 @@ func (c *Aggregator) Count() (int64, error) { func (c *Aggregator) Histogram() (aggregator.Buckets, error) { c.lock.Lock() defer c.lock.Unlock() - return c.checkpoint.buckets, nil + return aggregator.Buckets{ + Boundaries: c.boundaries, + Counts: c.checkpoint.bucketCounts, + }, nil } // Checkpoint saves the current state and resets the current state to @@ -111,16 +113,13 @@ func (c *Aggregator) Histogram() (aggregator.Buckets, error) { // other. func (c *Aggregator) Checkpoint(ctx context.Context, desc *metric.Descriptor) { c.lock.Lock() - c.checkpoint, c.current = c.current, c.emptyState() + c.checkpoint, c.current = c.current, emptyState(c.boundaries) c.lock.Unlock() } -func (c *Aggregator) emptyState() state { +func emptyState(boundaries []metric.Number) state { return state{ - buckets: aggregator.Buckets{ - Boundaries: c.boundaries, - Counts: make([]metric.Number, len(c.boundaries)+1), - }, + bucketCounts: make([]metric.Number, len(boundaries)+1), } } @@ -141,7 +140,7 @@ func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metri c.current.count.AddInt64(1) c.current.sum.AddNumber(kind, number) - c.current.buckets.Counts[bucketID].AddUint64(1) + c.current.bucketCounts[bucketID].AddUint64(1) return nil } @@ -156,8 +155,8 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *metric.Descriptor) error c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum) c.checkpoint.count.AddNumber(metric.Uint64NumberKind, o.checkpoint.count) - for i := 0; i < len(c.checkpoint.buckets.Counts); i++ { - c.checkpoint.buckets.Counts[i].AddNumber(metric.Uint64NumberKind, o.checkpoint.buckets.Counts[i]) + for i := 0; i < len(c.checkpoint.bucketCounts); i++ { + c.checkpoint.bucketCounts[i].AddNumber(metric.Uint64NumberKind, o.checkpoint.bucketCounts[i]) } return nil } diff --git a/sdk/metric/aggregator/histogram/histogram_test.go b/sdk/metric/aggregator/histogram/histogram_test.go index 6a559cec319..c1541ee2b7a 100644 --- a/sdk/metric/aggregator/histogram/histogram_test.go +++ b/sdk/metric/aggregator/histogram/histogram_test.go @@ -113,15 +113,28 @@ func histogram(t *testing.T, profile test.Profile, policy policy) { require.Equal(t, all.Count(), count, "Same count -"+policy.name) require.Nil(t, err) - require.Equal(t, len(agg.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries") + require.Equal(t, len(agg.checkpoint.bucketCounts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries") counts := calcBuckets(all.Points(), profile) for i, v := range counts { - bCount := agg.checkpoint.buckets.Counts[i].AsUint64() - require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg.checkpoint.buckets.Counts) + bCount := agg.checkpoint.bucketCounts[i].AsUint64() + require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg.checkpoint.bucketCounts) } } +func TestHistogramInitial(t *testing.T) { + test.RunProfiles(t, func(t *testing.T, profile test.Profile) { + descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind) + + agg := New(descriptor, boundaries[profile.NumberKind]) + buckets, err := agg.Histogram() + + require.NoError(t, err) + require.Equal(t, len(buckets.Counts), len(boundaries[profile.NumberKind])+1) + require.Equal(t, len(buckets.Boundaries), len(boundaries[profile.NumberKind])) + }) +} + func TestHistogramMerge(t *testing.T) { ctx := context.Background() @@ -164,12 +177,12 @@ func TestHistogramMerge(t *testing.T) { require.Equal(t, all.Count(), count, "Same count - absolute") require.Nil(t, err) - require.Equal(t, len(agg1.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries") + require.Equal(t, len(agg1.checkpoint.bucketCounts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries") counts := calcBuckets(all.Points(), profile) for i, v := range counts { - bCount := agg1.checkpoint.buckets.Counts[i].AsUint64() - require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg1.checkpoint.buckets.Counts) + bCount := agg1.checkpoint.bucketCounts[i].AsUint64() + require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg1.checkpoint.bucketCounts) } }) } @@ -191,8 +204,8 @@ func TestHistogramNotSet(t *testing.T) { require.Equal(t, int64(0), count, "Empty checkpoint count = 0") require.Nil(t, err) - require.Equal(t, len(agg.checkpoint.buckets.Counts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries") - for i, bCount := range agg.checkpoint.buckets.Counts { + require.Equal(t, len(agg.checkpoint.bucketCounts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries") + for i, bCount := range agg.checkpoint.bucketCounts { require.Equal(t, uint64(0), bCount.AsUint64(), "Bucket #%d must have 0 observed values", i) } })