Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add selector of exemplar reservoir providers to metric.Stream configuration #5861

Merged
merged 8 commits into from
Oct 18, 2024
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -12,6 +12,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Add `go.opentelemetry.io/otel/sdk/metric/exemplar.AlwaysOffFilter`, which can be used to disable exemplar recording. (#5850)
- Add `go.opentelemetry.io/otel/sdk/metric.WithExemplarFilter`, which can be used to configure the exemplar filter used by the metrics SDK. (#5850)
- Add `ExemplarReservoirProviderSelector` and `DefaultExemplarReservoirProviderSelector` to `go.opentelemetry.io/otel/sdk/metric`, which defines the exemplar reservoir to use based on the aggregation of the metric. (#5861)
- Add `ExemplarReservoirProviderSelector` to `go.opentelemetry.io/otel/sdk/metric.Stream` to allow using views to configure the exemplar reservoir to use for a metric. (#5861)
- Add `ReservoirProvider`, `HistogramReservoirProvider` and `FixedSizeReservoirProvider` to `go.opentelemetry.io/otel/sdk/metric/exemplar` to make it convenient to use providers of Reservoirs. (#5861)

<!-- Released section -->
<!-- Don't change this section unless doing release -->
25 changes: 25 additions & 0 deletions sdk/metric/example_test.go
Original file line number Diff line number Diff line change
@@ -242,6 +242,31 @@ func ExampleNewView_exponentialHistogram() {
)
}

func ExampleNewView_exemplarreservoirproviderselector() {
// Create a view that makes all metrics use a different exemplar reservoir.
view := metric.NewView(
metric.Instrument{Name: "*"},
metric.Stream{
ExemplarReservoirProviderSelector: func(agg metric.Aggregation) exemplar.ReservoirProvider {
// This example uses a fixed-size reservoir with a size of 10
// for explicit bucket histograms instead of the default
// bucket-aligned reservoir.
if _, ok := agg.(metric.AggregationExplicitBucketHistogram); ok {
dashpole marked this conversation as resolved.
Show resolved Hide resolved
return exemplar.FixedSizeReservoirProvider(10)
}
// Fall back to the default reservoir otherwise.
return metric.DefaultExemplarReservoirProviderSelector(agg)
},
},
)

// The created view can then be registered with the OpenTelemetry metric
// SDK using the WithView option.
_ = metric.NewMeterProvider(
metric.WithView(view),
)
}

func ExampleWithExemplarFilter_disabled() {
// Use exemplar.AlwaysOffFilter to disable exemplar collection.
_ = metric.NewMeterProvider(
41 changes: 31 additions & 10 deletions sdk/metric/exemplar.go
Original file line number Diff line number Diff line change
@@ -5,25 +5,48 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"runtime"
"slices"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
)

// ExemplarReservoirProviderSelector selects the
// [go.opentelemetry.io/otel/sdk/metric/exemplar.ReservoirProvider] to use
dashpole marked this conversation as resolved.
Show resolved Hide resolved
// based on the [Aggregation] of the metric.
type ExemplarReservoirProviderSelector func(Aggregation) exemplar.ReservoirProvider

// reservoirFunc returns the appropriately configured exemplar reservoir
// creation func based on the passed InstrumentKind and filter configuration.
func reservoirFunc[N int64 | float64](agg Aggregation, filter exemplar.Filter) func() aggregate.FilteredExemplarReservoir[N] {
func reservoirFunc[N int64 | float64](provider exemplar.ReservoirProvider, filter exemplar.Filter) func(attribute.Set) aggregate.FilteredExemplarReservoir[N] {
return func(attrs attribute.Set) aggregate.FilteredExemplarReservoir[N] {
return aggregate.NewFilteredExemplarReservoir[N](filter, provider(attrs))
}
}

// DefaultExemplarReservoirProviderSelector returns the default
// [go.opentelemetry.io/otel/sdk/metric/exemplar.ReservoirProvider] for the
// provided [Aggregation].
//
// For explicit bucket histograms with more than 1 bucket, it uses the
// [go.opentelemetry.io/otel/sdk/metric/exemplar.HistogramReservoirProvider].
// For exponential histograms, it uses the
// [go.opentelemetry.io/otel/sdk/metric/exemplar.FixedSizeReservoirProvider]
// with a size of min(20, max_buckets).
// For all other aggregations, it uses the
// [go.opentelemetry.io/otel/sdk/metric/exemplar.FixedSizeReservoirProvider]
// with a size equal to the number of CPUs.
dashpole marked this conversation as resolved.
Show resolved Hide resolved
//
// Exemplar default reservoirs MAY change in a minor version bump. No
// guarantees are made on the shape or statistical properties of returned
// exemplars.
func DefaultExemplarReservoirProviderSelector(agg Aggregation) exemplar.ReservoirProvider {
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
// Explicit bucket histogram aggregation with more than 1 bucket will
// use AlignedHistogramBucketExemplarReservoir.
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
cp := slices.Clone(a.Boundaries)
return func() aggregate.FilteredExemplarReservoir[N] {
bounds := cp
return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.NewHistogramReservoir(bounds))
}
return exemplar.HistogramReservoirProvider(a.Boundaries)
}

var n int
@@ -50,7 +73,5 @@ func reservoirFunc[N int64 | float64](agg Aggregation, filter exemplar.Filter) f
}
}

return func() aggregate.FilteredExemplarReservoir[N] {
return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.NewFixedSizeReservoir(n))
}
return exemplar.FixedSizeReservoirProvider(n)
}
7 changes: 7 additions & 0 deletions sdk/metric/exemplar/fixed_size_reservoir.go
Original file line number Diff line number Diff line change
@@ -12,6 +12,13 @@ import (
"go.opentelemetry.io/otel/attribute"
)

// FixedSizeReservoirProvider returns a provider of [FixedSizeReservoir].
func FixedSizeReservoirProvider(k int) ReservoirProvider {
return func(_ attribute.Set) Reservoir {
return NewFixedSizeReservoir(k)
}
}

// NewFixedSizeReservoir returns a [FixedSizeReservoir] that samples at most
// k exemplars. If there are k or less measurements made, the Reservoir will
// sample each one. If there are more than k, the Reservoir will then randomly
8 changes: 4 additions & 4 deletions sdk/metric/exemplar/fixed_size_reservoir_test.go
Original file line number Diff line number Diff line change
@@ -15,12 +15,12 @@ import (
)

func TestNewFixedSizeReservoir(t *testing.T) {
t.Run("Int64", ReservoirTest[int64](func(n int) (Reservoir, int) {
return NewFixedSizeReservoir(n), n
t.Run("Int64", ReservoirTest[int64](func(n int) (ReservoirProvider, int) {
return FixedSizeReservoirProvider(n), n
}))

t.Run("Float64", ReservoirTest[float64](func(n int) (Reservoir, int) {
return NewFixedSizeReservoir(n), n
t.Run("Float64", ReservoirTest[float64](func(n int) (ReservoirProvider, int) {
return FixedSizeReservoirProvider(n), n
}))
}

12 changes: 10 additions & 2 deletions sdk/metric/exemplar/histogram_reservoir.go
Original file line number Diff line number Diff line change
@@ -12,13 +12,21 @@ import (
"go.opentelemetry.io/otel/attribute"
)

// HistogramReservoirProvider is a provider of [HistogramReservoir].
func HistogramReservoirProvider(bounds []float64) ReservoirProvider {
cp := slices.Clone(bounds)
slices.Sort(cp)
return func(_ attribute.Set) Reservoir {
return NewHistogramReservoir(cp)
}
}

// NewHistogramReservoir returns a [HistogramReservoir] that samples the last
// measurement that falls within a histogram bucket. The histogram bucket
// upper-boundaries are define by bounds.
//
// The passed bounds will be sorted by this function.
// The passed bounds must be sorted before calling this function.
func NewHistogramReservoir(bounds []float64) *HistogramReservoir {
slices.Sort(bounds)
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
return &HistogramReservoir{
bounds: bounds,
storage: newStorage(len(bounds) + 1),
8 changes: 4 additions & 4 deletions sdk/metric/exemplar/histogram_reservoir_test.go
Original file line number Diff line number Diff line change
@@ -7,11 +7,11 @@ import "testing"

func TestHist(t *testing.T) {
bounds := []float64{0, 100}
t.Run("Int64", ReservoirTest[int64](func(int) (Reservoir, int) {
return NewHistogramReservoir(bounds), len(bounds)
t.Run("Int64", ReservoirTest[int64](func(int) (ReservoirProvider, int) {
return HistogramReservoirProvider(bounds), len(bounds)
}))

t.Run("Float64", ReservoirTest[float64](func(int) (Reservoir, int) {
return NewHistogramReservoir(bounds), len(bounds)
t.Run("Float64", ReservoirTest[float64](func(int) (ReservoirProvider, int) {
return HistogramReservoirProvider(bounds), len(bounds)
}))
}
8 changes: 8 additions & 0 deletions sdk/metric/exemplar/reservoir.go
Original file line number Diff line number Diff line change
@@ -30,3 +30,11 @@ type Reservoir interface {
// The Reservoir state is preserved after this call.
Collect(dest *[]Exemplar)
}

// ReservoirProvider creates new [Reservoir]s.
//
// The attributes provided are attributes which are kept by the aggregation, and
// are exclusive with attributes passed to Offer. The combination of these
// attributes and the attributes passed to Offer is the complete set of
// attributes a measurement was made with.
type ReservoirProvider func(attr attribute.Set) Reservoir
17 changes: 11 additions & 6 deletions sdk/metric/exemplar/reservoir_test.go
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ import (
// Sat Jan 01 2000 00:00:00 GMT+0000.
var staticTime = time.Unix(946684800, 0)

type factory func(requestedCap int) (r Reservoir, actualCap int)
type factory func(requestedCap int) (r ReservoirProvider, actualCap int)

func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
return func(t *testing.T) {
@@ -29,10 +29,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
t.Run("CaptureSpanContext", func(t *testing.T) {
t.Helper()

r, n := f(1)
rp, n := f(1)
if n < 1 {
t.Skip("skipping, reservoir capacity less than 1:", n)
}
r := rp(*attribute.EmptySet())

tID, sID := trace.TraceID{0x01}, trace.SpanID{0x01}
sc := trace.NewSpanContext(trace.SpanContextConfig{
@@ -60,10 +61,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
t.Run("FilterAttributes", func(t *testing.T) {
t.Helper()

r, n := f(1)
rp, n := f(1)
if n < 1 {
t.Skip("skipping, reservoir capacity less than 1:", n)
}
r := rp(*attribute.EmptySet())

adminTrue := attribute.Bool("admin", true)
r.Offer(ctx, staticTime, NewValue(N(10)), []attribute.KeyValue{adminTrue})
@@ -83,10 +85,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
t.Run("CollectLessThanN", func(t *testing.T) {
t.Helper()

r, n := f(2)
rp, n := f(2)
if n < 2 {
t.Skip("skipping, reservoir capacity less than 2:", n)
}
r := rp(*attribute.EmptySet())

r.Offer(ctx, staticTime, NewValue(N(10)), nil)

@@ -99,10 +102,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
t.Run("MultipleOffers", func(t *testing.T) {
t.Helper()

r, n := f(3)
rp, n := f(3)
if n < 1 {
t.Skip("skipping, reservoir capacity less than 1:", n)
}
r := rp(*attribute.EmptySet())

for i := 0; i < n+1; i++ {
v := NewValue(N(i))
@@ -127,10 +131,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
t.Run("DropAll", func(t *testing.T) {
t.Helper()

r, n := f(0)
rp, n := f(0)
if n > 0 {
t.Skip("skipping, reservoir capacity greater than 0:", n)
}
r := rp(*attribute.EmptySet())

r.Offer(context.Background(), staticTime, NewValue(N(10)), nil)

6 changes: 6 additions & 0 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
@@ -144,6 +144,12 @@ type Stream struct {
// Use NewAllowKeysFilter from "go.opentelemetry.io/otel/attribute" to
// provide an allow-list of attribute keys here.
AttributeFilter attribute.Filter
// ExemplarReservoirProvider selects the
// [go.opentelemetry.io/otel/sdk/metric/exemplar.ReservoirProvider] based
// on the [Aggregation].
//
// If unspecified, [DefaultExemplarReservoirProviderSelector] is used.
ExemplarReservoirProviderSelector ExemplarReservoirProviderSelector
}

// instID are the identifying properties of a instrument.
4 changes: 2 additions & 2 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
@@ -38,7 +38,7 @@ type Builder[N int64 | float64] struct {
//
// If this is not provided a default factory function that returns an
// dropReservoir reservoir will be used.
ReservoirFunc func() FilteredExemplarReservoir[N]
ReservoirFunc func(attribute.Set) FilteredExemplarReservoir[N]
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
@@ -49,7 +49,7 @@ type Builder[N int64 | float64] struct {
AggregationLimit int
}

func (b Builder[N]) resFunc() func() FilteredExemplarReservoir[N] {
func (b Builder[N]) resFunc() func(attribute.Set) FilteredExemplarReservoir[N] {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
}
4 changes: 2 additions & 2 deletions sdk/metric/internal/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
@@ -72,8 +72,8 @@ func (c *clock) Register() (unregister func()) {
return func() { now = orig }
}

func dropExemplars[N int64 | float64]() FilteredExemplarReservoir[N] {
return dropReservoir[N]()
func dropExemplars[N int64 | float64](attr attribute.Set) FilteredExemplarReservoir[N] {
return dropReservoir[N](attr)
}

func TestBuilderFilter(t *testing.T) {
4 changes: 3 additions & 1 deletion sdk/metric/internal/aggregate/drop.go
Original file line number Diff line number Diff line change
@@ -11,7 +11,9 @@ import (
)

// dropReservoir returns a [FilteredReservoir] that drops all measurements it is offered.
func dropReservoir[N int64 | float64]() FilteredExemplarReservoir[N] { return &dropRes[N]{} }
func dropReservoir[N int64 | float64](attribute.Set) FilteredExemplarReservoir[N] {
return &dropRes[N]{}
}

type dropRes[N int64 | float64] struct{}

3 changes: 2 additions & 1 deletion sdk/metric/internal/aggregate/drop_test.go
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ import (

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
)

@@ -17,7 +18,7 @@ func TestDrop(t *testing.T) {
}

func testDropFiltered[N int64 | float64](t *testing.T) {
r := dropReservoir[N]()
r := dropReservoir[N](*attribute.EmptySet())

var dest []exemplar.Exemplar
r.Collect(&dest)
6 changes: 3 additions & 3 deletions sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
@@ -283,7 +283,7 @@ func (b *expoBuckets) downscale(delta int32) {
// newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *expoHistogram[N] {
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *expoHistogram[N] {
return &expoHistogram[N]{
noSum: noSum,
noMinMax: noMinMax,
@@ -306,7 +306,7 @@ type expoHistogram[N int64 | float64] struct {
maxSize int
maxScale int32

newRes func() FilteredExemplarReservoir[N]
newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[*expoHistogramDataPoint[N]]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex
@@ -327,7 +327,7 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attrib
v, ok := e.values[attr.Equivalent()]
if !ok {
v = newExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale, e.noMinMax, e.noSum)
v.res = e.newRes()
v.res = e.newRes(attr)

e.values[attr.Equivalent()] = v
}
8 changes: 4 additions & 4 deletions sdk/metric/internal/aggregate/histogram.go
Original file line number Diff line number Diff line change
@@ -47,13 +47,13 @@ type histValues[N int64 | float64] struct {
noSum bool
bounds []float64

newRes func() FilteredExemplarReservoir[N]
newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[*buckets[N]]
values map[attribute.Distinct]*buckets[N]
valuesMu sync.Mutex
}

func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *histValues[N] {
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
@@ -93,7 +93,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute
//
// buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
b = newBuckets[N](attr, len(s.bounds)+1)
b.res = s.newRes()
b.res = s.newRes(attr)

// Ensure min and max are recorded values (not zero), for new buckets.
b.min, b.max = value, value
@@ -108,7 +108,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute

// newHistogram returns an Aggregator that summarizes a set of measurements as
// an histogram.
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *histogram[N] {
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histogram[N] {
return &histogram[N]{
histValues: newHistValues[N](boundaries, noSum, limit, r),
noMinMax: noMinMax,
8 changes: 4 additions & 4 deletions sdk/metric/internal/aggregate/lastvalue.go
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ type datapoint[N int64 | float64] struct {
res FilteredExemplarReservoir[N]
}

func newLastValue[N int64 | float64](limit int, r func() FilteredExemplarReservoir[N]) *lastValue[N] {
func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *lastValue[N] {
return &lastValue[N]{
newRes: r,
limit: newLimiter[datapoint[N]](limit),
@@ -32,7 +32,7 @@ func newLastValue[N int64 | float64](limit int, r func() FilteredExemplarReservo
type lastValue[N int64 | float64] struct {
sync.Mutex

newRes func() FilteredExemplarReservoir[N]
newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[datapoint[N]]
values map[attribute.Distinct]datapoint[N]
start time.Time
@@ -45,7 +45,7 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.
attr := s.limit.Attributes(fltrAttr, s.values)
d, ok := s.values[attr.Equivalent()]
if !ok {
d.res = s.newRes()
d.res = s.newRes(attr)
}

d.attrs = attr
@@ -114,7 +114,7 @@ func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) in

// newPrecomputedLastValue returns an aggregator that summarizes a set of
// observations as the last one made.
func newPrecomputedLastValue[N int64 | float64](limit int, r func() FilteredExemplarReservoir[N]) *precomputedLastValue[N] {
func newPrecomputedLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *precomputedLastValue[N] {
return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)}
}

10 changes: 5 additions & 5 deletions sdk/metric/internal/aggregate/sum.go
Original file line number Diff line number Diff line change
@@ -21,12 +21,12 @@ type sumValue[N int64 | float64] struct {
// valueMap is the storage for sums.
type valueMap[N int64 | float64] struct {
sync.Mutex
newRes func() FilteredExemplarReservoir[N]
newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[sumValue[N]]
values map[attribute.Distinct]sumValue[N]
}

func newValueMap[N int64 | float64](limit int, r func() FilteredExemplarReservoir[N]) *valueMap[N] {
func newValueMap[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *valueMap[N] {
return &valueMap[N]{
newRes: r,
limit: newLimiter[sumValue[N]](limit),
@@ -41,7 +41,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S
attr := s.limit.Attributes(fltrAttr, s.values)
v, ok := s.values[attr.Equivalent()]
if !ok {
v.res = s.newRes()
v.res = s.newRes(attr)
}

v.attrs = attr
@@ -54,7 +54,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S
// newSum returns an aggregator that summarizes a set of measurements as their
// arithmetic sum. Each sum is scoped by attributes and the aggregation cycle
// the measurements were made in.
func newSum[N int64 | float64](monotonic bool, limit int, r func() FilteredExemplarReservoir[N]) *sum[N] {
func newSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *sum[N] {
return &sum[N]{
valueMap: newValueMap[N](limit, r),
monotonic: monotonic,
@@ -143,7 +143,7 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int {
// newPrecomputedSum returns an aggregator that summarizes a set of
// observations as their arithmetic sum. Each sum is scoped by attributes and
// the aggregation cycle the measurements were made in.
func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() FilteredExemplarReservoir[N]) *precomputedSum[N] {
func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *precomputedSum[N] {
return &precomputedSum[N]{
valueMap: newValueMap[N](limit, r),
monotonic: monotonic,
5 changes: 4 additions & 1 deletion sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
@@ -332,6 +332,9 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
// The view explicitly requested the default aggregation.
stream.Aggregation = DefaultAggregationSelector(kind)
}
if stream.ExemplarReservoirProviderSelector == nil {
stream.ExemplarReservoirProviderSelector = DefaultExemplarReservoirProviderSelector
}

if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil {
return nil, 0, fmt.Errorf(
@@ -352,7 +355,7 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
cv := i.aggregators.Lookup(normID, func() aggVal[N] {
b := aggregate.Builder[N]{
Temporality: i.pipeline.reader.temporality(kind),
ReservoirFunc: reservoirFunc[N](stream.Aggregation, i.pipeline.exemplarFilter),
ReservoirFunc: reservoirFunc[N](stream.ExemplarReservoirProviderSelector(stream.Aggregation), i.pipeline.exemplarFilter),
}
b.Filter = stream.AttributeFilter
// A value less than or equal to zero will disable the aggregation
26 changes: 26 additions & 0 deletions sdk/metric/pipeline_test.go
Original file line number Diff line number Diff line change
@@ -491,4 +491,30 @@ func TestExemplars(t *testing.T) {
measure(sampled, m)
check(t, r, nCPU, 1, 20)
})

t.Run("Custom reservoir", func(t *testing.T) {
r := NewManualReader()
reservoirProviderSelector := func(agg Aggregation) exemplar.ReservoirProvider {
return exemplar.FixedSizeReservoirProvider(2)
}
v1 := NewView(Instrument{Name: "int64-expo-histogram"}, Stream{
Aggregation: AggregationBase2ExponentialHistogram{
MaxSize: 160, // > 20, reservoir size should default to 20.
MaxScale: 20,
},
ExemplarReservoirProviderSelector: reservoirProviderSelector,
})
v2 := NewView(Instrument{Name: "int64-counter"}, Stream{
ExemplarReservoirProviderSelector: reservoirProviderSelector,
})
v3 := NewView(Instrument{Name: "int64-histogram"}, Stream{
ExemplarReservoirProviderSelector: reservoirProviderSelector,
})
m := NewMeterProvider(WithReader(r), WithView(v1, v2, v3)).Meter("custom-reservoir")
measure(ctx, m)
check(t, r, 0, 0, 0)

measure(sampled, m)
check(t, r, 2, 2, 2)
})
}
11 changes: 6 additions & 5 deletions sdk/metric/view.go
Original file line number Diff line number Diff line change
@@ -96,11 +96,12 @@ func NewView(criteria Instrument, mask Stream) View {
return func(i Instrument) (Stream, bool) {
if matchFunc(i) {
return Stream{
Name: nonZero(mask.Name, i.Name),
Description: nonZero(mask.Description, i.Description),
Unit: nonZero(mask.Unit, i.Unit),
Aggregation: agg,
AttributeFilter: mask.AttributeFilter,
Name: nonZero(mask.Name, i.Name),
Description: nonZero(mask.Description, i.Description),
Unit: nonZero(mask.Unit, i.Unit),
Aggregation: agg,
AttributeFilter: mask.AttributeFilter,
ExemplarReservoirProviderSelector: mask.ExemplarReservoirProviderSelector,
}, true
}
return Stream{}, false