Skip to content

Commit

Permalink
Add implementation of last-value aggregator (#3008)
Browse files Browse the repository at this point in the history
* Add last-value aggregator

* Add test of last-value reset of unseen attrs

* Add benchmark

* Use generic DataPoint value

* Fix assertion_fail_test.go

* Fix tests

* Remove unused test increment values
  • Loading branch information
MrAlias authored Jul 27, 2022
1 parent dc4899c commit 590fff9
Show file tree
Hide file tree
Showing 3 changed files with 288 additions and 5 deletions.
154 changes: 154 additions & 0 deletions sdk/metric/internal/aggregator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"strconv"
"sync"
"testing"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)

const (
defaultGoroutines = 5
defaultMeasurements = 30
defaultCycles = 3
)

var (
alice = attribute.NewSet(attribute.String("user", "alice"), attribute.Bool("admin", true))
bob = attribute.NewSet(attribute.String("user", "bob"), attribute.Bool("admin", false))
carol = attribute.NewSet(attribute.String("user", "carol"), attribute.Bool("admin", false))

monoIncr = setMap{alice: 1, bob: 10, carol: 2}

// Sat Jan 01 2000 00:00:00 GMT+0000.
staticTime = time.Unix(946684800, 0)
staticNowFunc = func() time.Time { return staticTime }
// Pass to t.Cleanup to override the now function with staticNowFunc and
// revert once the test completes. E.g. t.Cleanup(mockTime(now)).
mockTime = func(orig func() time.Time) (cleanup func()) {
now = staticNowFunc
return func() { now = orig }
}
)

// setMap maps attribute sets to a number.
type setMap map[attribute.Set]int

// expectFunc is a function that returns an Aggregation of expected values for
// a cycle that contains m measurements (total across all goroutines). Each
// call advances the cycle.
type expectFunc func(m int) metricdata.Aggregation

// aggregatorTester runs an acceptance test on an Aggregator. It will ask an
// Aggregator to aggregate a set of values as if they were real measurements
// made MeasurementN number of times. This will be done in GoroutineN number
// of different goroutines. After the Aggregator has been asked to aggregate
// all these measurements, it is validated using a passed expecterFunc. This
// set of operation is a signle cycle, and the the aggregatorTester will run
// CycleN number of cycles.
type aggregatorTester[N int64 | float64] struct {
// GoroutineN is the number of goroutines aggregatorTester will use to run
// the test with.
GoroutineN int
// MeasurementN is the number of measurements that are made each cycle a
// goroutine runs the test.
MeasurementN int
// CycleN is the number of times a goroutine will make a set of
// measurements.
CycleN int
}

func (at *aggregatorTester[N]) Run(a Aggregator[N], incr setMap, eFunc expectFunc) func(*testing.T) {
m := at.MeasurementN * at.GoroutineN
return func(t *testing.T) {
for i := 0; i < at.CycleN; i++ {
var wg sync.WaitGroup
wg.Add(at.GoroutineN)
for i := 0; i < at.GoroutineN; i++ {
go func() {
defer wg.Done()
for j := 0; j < at.MeasurementN; j++ {
for attrs, n := range incr {
a.Aggregate(N(n), attrs)
}
}
}()
}
wg.Wait()

metricdatatest.AssertAggregationsEqual(t, eFunc(m), a.Aggregation())
}
}
}

var bmarkResults metricdata.Aggregation

func benchmarkAggregatorN[N int64 | float64](b *testing.B, factory func() Aggregator[N], count int) {
attrs := make([]attribute.Set, count)
for i := range attrs {
attrs[i] = attribute.NewSet(attribute.Int("value", i))
}

b.Run("Aggregate", func(b *testing.B) {
agg := factory()
b.ReportAllocs()
b.ResetTimer()

for n := 0; n < b.N; n++ {
for _, attr := range attrs {
agg.Aggregate(1, attr)
}
}
bmarkResults = agg.Aggregation()
})

b.Run("Aggregations", func(b *testing.B) {
aggs := make([]Aggregator[N], b.N)
for n := range aggs {
a := factory()
for _, attr := range attrs {
a.Aggregate(1, attr)
}
aggs[n] = a
}

b.ReportAllocs()
b.ResetTimer()

for n := 0; n < b.N; n++ {
bmarkResults = aggs[n].Aggregation()
}
})
}

func benchmarkAggregator[N int64 | float64](factory func() Aggregator[N]) func(*testing.B) {
counts := []int{1, 10, 100}
return func(b *testing.B) {
for _, n := range counts {
b.Run(strconv.Itoa(n), func(b *testing.B) {
benchmarkAggregatorN(b, factory, n)
})
}
}
}
48 changes: 43 additions & 5 deletions sdk/metric/internal/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,64 @@
package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// now is used to return the current local time while allowing tests to
// override the the default time.Now function.
var now = time.Now

// datapoint is timestamped measurement data.
type datapoint[N int64 | float64] struct {
timestamp time.Time
value N
}

// lastValue summarizes a set of measurements as the last one made.
type lastValue[N int64 | float64] struct {
// TODO(#2971): implement.
sync.Mutex

values map[attribute.Set]datapoint[N]
}

// NewLastValue returns an Aggregator that summarizes a set of measurements as
// the last one made.
func NewLastValue[N int64 | float64]() Aggregator[N] {
return &lastValue[N]{}
return &lastValue[N]{values: make(map[attribute.Set]datapoint[N])}
}

func (s *lastValue[N]) Aggregate(value N, attr attribute.Set) {
// TODO(#2971): implement.
d := datapoint[N]{timestamp: now(), value: value}
s.Lock()
s.values[attr] = d
s.Unlock()
}

func (s *lastValue[N]) Aggregation() metricdata.Aggregation {
// TODO(#2971): implement.
return nil
gauge := metricdata.Gauge[N]{}

s.Lock()
defer s.Unlock()

if len(s.values) == 0 {
return gauge
}

gauge.DataPoints = make([]metricdata.DataPoint[N], 0, len(s.values))
for a, v := range s.values {
gauge.DataPoints = append(gauge.DataPoints, metricdata.DataPoint[N]{
Attributes: a,
// The event time is the only meaningful timestamp, StartTime is
// ignored.
Time: v.timestamp,
Value: v.value,
})
// Do not report stale values.
delete(s.values, a)
}
return gauge
}
91 changes: 91 additions & 0 deletions sdk/metric/internal/lastvalue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"testing"

"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)

func TestLastValue(t *testing.T) {
t.Cleanup(mockTime(now))

t.Run("Int64", testLastValue[int64]())
t.Run("Float64", testLastValue[float64]())
}

func testLastValue[N int64 | float64]() func(*testing.T) {
tester := &aggregatorTester[N]{
GoroutineN: defaultGoroutines,
MeasurementN: defaultMeasurements,
CycleN: defaultCycles,
}

eFunc := func(increments setMap) expectFunc {
data := make([]metricdata.DataPoint[N], 0, len(increments))
for a, v := range increments {
point := metricdata.DataPoint[N]{Attributes: a, Time: now(), Value: N(v)}
data = append(data, point)
}
gauge := metricdata.Gauge[N]{DataPoints: data}
return func(int) metricdata.Aggregation { return gauge }
}
incr := monoIncr
return tester.Run(NewLastValue[N](), incr, eFunc(incr))
}

func testLastValueReset[N int64 | float64](t *testing.T) {
t.Cleanup(mockTime(now))

a := NewLastValue[N]()
expect := metricdata.Gauge[N]{}
metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation())

a.Aggregate(1, alice)
expect.DataPoints = []metricdata.DataPoint[N]{{
Attributes: alice,
Time: now(),
Value: 1,
}}
metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation())

// The attr set should be forgotten once Aggregations is called.
expect.DataPoints = nil
metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation())

// Aggregating another set should not affect the original (alice).
a.Aggregate(1, bob)
expect.DataPoints = []metricdata.DataPoint[N]{{
Attributes: bob,
Time: now(),
Value: 1,
}}
metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation())
}

func TestLastValueReset(t *testing.T) {
t.Run("Int64", testLastValueReset[int64])
t.Run("Float64", testLastValueReset[float64])
}

func BenchmarkLastValue(b *testing.B) {
b.Run("Int64", benchmarkAggregator(NewLastValue[int64]))
b.Run("Float64", benchmarkAggregator(NewLastValue[float64]))
}

0 comments on commit 590fff9

Please sign in to comment.