From 590fff901ba031de0e34a1da0ddd716ff96b0cf1 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 27 Jul 2022 07:29:26 -0700 Subject: [PATCH] Add implementation of last-value aggregator (#3008) * Add last-value aggregator * Add test of last-value reset of unseen attrs * Add benchmark * Use generic DataPoint value * Fix assertion_fail_test.go * Fix tests * Remove unused test increment values --- sdk/metric/internal/aggregator_test.go | 154 +++++++++++++++++++++++++ sdk/metric/internal/lastvalue.go | 48 +++++++- sdk/metric/internal/lastvalue_test.go | 91 +++++++++++++++ 3 files changed, 288 insertions(+), 5 deletions(-) create mode 100644 sdk/metric/internal/aggregator_test.go create mode 100644 sdk/metric/internal/lastvalue_test.go diff --git a/sdk/metric/internal/aggregator_test.go b/sdk/metric/internal/aggregator_test.go new file mode 100644 index 00000000000..645352153c1 --- /dev/null +++ b/sdk/metric/internal/aggregator_test.go @@ -0,0 +1,154 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" + +import ( + "strconv" + "sync" + "testing" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" +) + +const ( + defaultGoroutines = 5 + defaultMeasurements = 30 + defaultCycles = 3 +) + +var ( + alice = attribute.NewSet(attribute.String("user", "alice"), attribute.Bool("admin", true)) + bob = attribute.NewSet(attribute.String("user", "bob"), attribute.Bool("admin", false)) + carol = attribute.NewSet(attribute.String("user", "carol"), attribute.Bool("admin", false)) + + monoIncr = setMap{alice: 1, bob: 10, carol: 2} + + // Sat Jan 01 2000 00:00:00 GMT+0000. + staticTime = time.Unix(946684800, 0) + staticNowFunc = func() time.Time { return staticTime } + // Pass to t.Cleanup to override the now function with staticNowFunc and + // revert once the test completes. E.g. t.Cleanup(mockTime(now)). + mockTime = func(orig func() time.Time) (cleanup func()) { + now = staticNowFunc + return func() { now = orig } + } +) + +// setMap maps attribute sets to a number. +type setMap map[attribute.Set]int + +// expectFunc is a function that returns an Aggregation of expected values for +// a cycle that contains m measurements (total across all goroutines). Each +// call advances the cycle. +type expectFunc func(m int) metricdata.Aggregation + +// aggregatorTester runs an acceptance test on an Aggregator. It will ask an +// Aggregator to aggregate a set of values as if they were real measurements +// made MeasurementN number of times. This will be done in GoroutineN number +// of different goroutines. After the Aggregator has been asked to aggregate +// all these measurements, it is validated using a passed expecterFunc. This +// set of operation is a signle cycle, and the the aggregatorTester will run +// CycleN number of cycles. +type aggregatorTester[N int64 | float64] struct { + // GoroutineN is the number of goroutines aggregatorTester will use to run + // the test with. + GoroutineN int + // MeasurementN is the number of measurements that are made each cycle a + // goroutine runs the test. + MeasurementN int + // CycleN is the number of times a goroutine will make a set of + // measurements. + CycleN int +} + +func (at *aggregatorTester[N]) Run(a Aggregator[N], incr setMap, eFunc expectFunc) func(*testing.T) { + m := at.MeasurementN * at.GoroutineN + return func(t *testing.T) { + for i := 0; i < at.CycleN; i++ { + var wg sync.WaitGroup + wg.Add(at.GoroutineN) + for i := 0; i < at.GoroutineN; i++ { + go func() { + defer wg.Done() + for j := 0; j < at.MeasurementN; j++ { + for attrs, n := range incr { + a.Aggregate(N(n), attrs) + } + } + }() + } + wg.Wait() + + metricdatatest.AssertAggregationsEqual(t, eFunc(m), a.Aggregation()) + } + } +} + +var bmarkResults metricdata.Aggregation + +func benchmarkAggregatorN[N int64 | float64](b *testing.B, factory func() Aggregator[N], count int) { + attrs := make([]attribute.Set, count) + for i := range attrs { + attrs[i] = attribute.NewSet(attribute.Int("value", i)) + } + + b.Run("Aggregate", func(b *testing.B) { + agg := factory() + b.ReportAllocs() + b.ResetTimer() + + for n := 0; n < b.N; n++ { + for _, attr := range attrs { + agg.Aggregate(1, attr) + } + } + bmarkResults = agg.Aggregation() + }) + + b.Run("Aggregations", func(b *testing.B) { + aggs := make([]Aggregator[N], b.N) + for n := range aggs { + a := factory() + for _, attr := range attrs { + a.Aggregate(1, attr) + } + aggs[n] = a + } + + b.ReportAllocs() + b.ResetTimer() + + for n := 0; n < b.N; n++ { + bmarkResults = aggs[n].Aggregation() + } + }) +} + +func benchmarkAggregator[N int64 | float64](factory func() Aggregator[N]) func(*testing.B) { + counts := []int{1, 10, 100} + return func(b *testing.B) { + for _, n := range counts { + b.Run(strconv.Itoa(n), func(b *testing.B) { + benchmarkAggregatorN(b, factory, n) + }) + } + } +} diff --git a/sdk/metric/internal/lastvalue.go b/sdk/metric/internal/lastvalue.go index 74291711b14..1cfeeb59a91 100644 --- a/sdk/metric/internal/lastvalue.go +++ b/sdk/metric/internal/lastvalue.go @@ -18,26 +18,64 @@ package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" import ( + "sync" + "time" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) +// now is used to return the current local time while allowing tests to +// override the the default time.Now function. +var now = time.Now + +// datapoint is timestamped measurement data. +type datapoint[N int64 | float64] struct { + timestamp time.Time + value N +} + // lastValue summarizes a set of measurements as the last one made. type lastValue[N int64 | float64] struct { - // TODO(#2971): implement. + sync.Mutex + + values map[attribute.Set]datapoint[N] } // NewLastValue returns an Aggregator that summarizes a set of measurements as // the last one made. func NewLastValue[N int64 | float64]() Aggregator[N] { - return &lastValue[N]{} + return &lastValue[N]{values: make(map[attribute.Set]datapoint[N])} } func (s *lastValue[N]) Aggregate(value N, attr attribute.Set) { - // TODO(#2971): implement. + d := datapoint[N]{timestamp: now(), value: value} + s.Lock() + s.values[attr] = d + s.Unlock() } func (s *lastValue[N]) Aggregation() metricdata.Aggregation { - // TODO(#2971): implement. - return nil + gauge := metricdata.Gauge[N]{} + + s.Lock() + defer s.Unlock() + + if len(s.values) == 0 { + return gauge + } + + gauge.DataPoints = make([]metricdata.DataPoint[N], 0, len(s.values)) + for a, v := range s.values { + gauge.DataPoints = append(gauge.DataPoints, metricdata.DataPoint[N]{ + Attributes: a, + // The event time is the only meaningful timestamp, StartTime is + // ignored. + Time: v.timestamp, + Value: v.value, + }) + // Do not report stale values. + delete(s.values, a) + } + return gauge } diff --git a/sdk/metric/internal/lastvalue_test.go b/sdk/metric/internal/lastvalue_test.go new file mode 100644 index 00000000000..41b75877fe3 --- /dev/null +++ b/sdk/metric/internal/lastvalue_test.go @@ -0,0 +1,91 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" + +import ( + "testing" + + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" +) + +func TestLastValue(t *testing.T) { + t.Cleanup(mockTime(now)) + + t.Run("Int64", testLastValue[int64]()) + t.Run("Float64", testLastValue[float64]()) +} + +func testLastValue[N int64 | float64]() func(*testing.T) { + tester := &aggregatorTester[N]{ + GoroutineN: defaultGoroutines, + MeasurementN: defaultMeasurements, + CycleN: defaultCycles, + } + + eFunc := func(increments setMap) expectFunc { + data := make([]metricdata.DataPoint[N], 0, len(increments)) + for a, v := range increments { + point := metricdata.DataPoint[N]{Attributes: a, Time: now(), Value: N(v)} + data = append(data, point) + } + gauge := metricdata.Gauge[N]{DataPoints: data} + return func(int) metricdata.Aggregation { return gauge } + } + incr := monoIncr + return tester.Run(NewLastValue[N](), incr, eFunc(incr)) +} + +func testLastValueReset[N int64 | float64](t *testing.T) { + t.Cleanup(mockTime(now)) + + a := NewLastValue[N]() + expect := metricdata.Gauge[N]{} + metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) + + a.Aggregate(1, alice) + expect.DataPoints = []metricdata.DataPoint[N]{{ + Attributes: alice, + Time: now(), + Value: 1, + }} + metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) + + // The attr set should be forgotten once Aggregations is called. + expect.DataPoints = nil + metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) + + // Aggregating another set should not affect the original (alice). + a.Aggregate(1, bob) + expect.DataPoints = []metricdata.DataPoint[N]{{ + Attributes: bob, + Time: now(), + Value: 1, + }} + metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) +} + +func TestLastValueReset(t *testing.T) { + t.Run("Int64", testLastValueReset[int64]) + t.Run("Float64", testLastValueReset[float64]) +} + +func BenchmarkLastValue(b *testing.B) { + b.Run("Int64", benchmarkAggregator(NewLastValue[int64])) + b.Run("Float64", benchmarkAggregator(NewLastValue[float64])) +}