From 4747604a7ef9f1a1167817423f5424a538339f61 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 23 Sep 2022 14:10:35 -0700 Subject: [PATCH 01/22] PoC of caching on top of #3233 --- sdk/metric/meter.go | 63 +++++++++++++++++++++++++++++++++++++++--- sdk/metric/pipeline.go | 23 ++++++++++++--- 2 files changed, 78 insertions(+), 8 deletions(-) diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 82d5a5269be..371e1829946 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -16,6 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" + "errors" "sync" "go.opentelemetry.io/otel/metric" @@ -25,6 +26,7 @@ import ( "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "go.opentelemetry.io/otel/metric/instrument/syncint64" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/internal" ) // meterRegistry keeps a record of initialized meters for instrumentation @@ -76,6 +78,54 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter { return m } +type cache map[instrumentID]any + +type cacheResult[N int64 | float64] struct { + aggregators []internal.Aggregator[N] + err error +} + +type querier[N int64 | float64] struct { + sync.Mutex + + c cache +} + +func newQuerier[N int64 | float64](c cache) *querier[N] { + return &querier[N]{c: c} +} + +var ( + errCacheMiss = errors.New("cache miss") + errExists = errors.New("instrument already exists for different number type") +) + +func (q *querier[N]) Get(key instrumentID) (r *cacheResult[N], err error) { + q.Lock() + defer q.Unlock() + + vIface, ok := q.c[key] + if !ok { + err = errCacheMiss + return r, err + } + + switch v := vIface.(type) { + case *cacheResult[N]: + r = v + default: + err = errExists + } + return r, err +} + +func (q *querier[N]) Set(key instrumentID, val *cacheResult[N]) { + q.Lock() + defer q.Unlock() + + q.c[key] = val +} + // meter handles the creation and coordination of all metric instruments. A // meter represents a single instrumentation scope; all metric telemetry // produced by an instrumentation scope will use metric instruments from a @@ -84,6 +134,7 @@ type meter struct { instrumentation.Scope pipes pipelines + cache *cache } // Compile-time check meter implements metric.Meter. @@ -91,12 +142,14 @@ var _ metric.Meter = (*meter)(nil) // AsyncInt64 returns the asynchronous integer instrument provider. func (m *meter) AsyncInt64() asyncint64.InstrumentProvider { - return asyncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)} + q := newQuerier[int64](*m.cache) + return asyncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)} } // AsyncFloat64 returns the asynchronous floating-point instrument provider. func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { - return asyncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)} + q := newQuerier[float64](*m.cache) + return asyncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)} } // RegisterCallback registers the function f to be called when any of the @@ -108,10 +161,12 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context // SyncInt64 returns the synchronous integer instrument provider. func (m *meter) SyncInt64() syncint64.InstrumentProvider { - return syncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)} + q := newQuerier[int64](*m.cache) + return syncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)} } // SyncFloat64 returns the synchronous floating-point instrument provider. func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { - return syncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)} + q := newQuerier[float64](*m.cache) + return syncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)} } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 0bd52d63023..789cc980039 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -345,22 +345,31 @@ func (p pipelines) registerCallback(fn func(context.Context)) { // measurements with while updating all pipelines that need to pull from those // aggregations. type resolver[N int64 | float64] struct { + cache *querier[N] inserters []*inserter[N] } -func newResolver[N int64 | float64](p pipelines) *resolver[N] { +func newResolver[N int64 | float64](p pipelines, q *querier[N]) *resolver[N] { in := make([]*inserter[N], len(p)) for i := range in { in[i] = newInserter[N](p[i]) } - return &resolver[N]{in} + return &resolver[N]{cache: q, inserters: in} } // Aggregators returns the Aggregators instrument inst needs to update when it // makes a measurement. func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { - var aggs []internal.Aggregator[N] + id := instrumentID{scope: inst.Scope, name: inst.Name, description: inst.Description} + resp, err := r.cache.Get(id) + if err == nil { + return resp.aggregators, resp.err + } + if !errors.Is(err, errCacheMiss) { + return nil, err + } + var aggs []internal.Aggregator[N] errs := &multierror{} for _, i := range r.inserters { a, err := i.Instrument(inst, instUnit) @@ -369,7 +378,13 @@ func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]i } aggs = append(aggs, a...) } - return aggs, errs.errorOrNil() + + err = errs.errorOrNil() + r.cache.Set(id, &cacheResult[N]{ + aggregators: aggs, + err: err, + }) + return aggs, err } type multierror struct { From 47886b6d2117977eb468d2975fdf10be51e94c56 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 27 Sep 2022 17:09:50 -0700 Subject: [PATCH 02/22] Refactor into a unified cache --- sdk/metric/cache.go | 96 ++++++++++++++++++++++++++++++++++++++++++ sdk/metric/meter.go | 91 ++++----------------------------------- sdk/metric/pipeline.go | 39 +++++++---------- 3 files changed, 121 insertions(+), 105 deletions(-) create mode 100644 sdk/metric/cache.go diff --git a/sdk/metric/cache.go b/sdk/metric/cache.go new file mode 100644 index 00000000000..3b3a82e5b86 --- /dev/null +++ b/sdk/metric/cache.go @@ -0,0 +1,96 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric // import "go.opentelemetry.io/otel/sdk/metric" + +import ( + "errors" + "sync" + + "go.opentelemetry.io/otel/sdk/metric/internal" +) + +// cache is a locking storage used to quickly return already computed values. A +// registry type should be used with a cache for get and set operations of +// certain types. +// +// The zero value of a cache is empty and ready to use. +// +// A cache must not be copied after first use. +// +// All methods of a cache are safe to call concurrently. +type cache[K comparable, V any] struct { + sync.Mutex + data map[K]V +} + +// GetOrSet returns the value stored in the cache for key if it exists. +// Otherwise, f is called and the returned value is set in the cache for key +// and returned. +// +// GetOrSet is safe to call concurrently. It will hold the cache lock, so f +// should not block excessively. +func (c *cache[K, V]) GetOrSet(key K, f func() V) V { + c.Lock() + defer c.Unlock() + + if c.data == nil { + val := f() + c.data = map[K]V{key: val} + return val + } + if v, ok := c.data[key]; ok { + return v + } + val := f() + c.data[key] = val + return val +} + +// resolvedAggregators is the result of resolving aggregators for an instrument. +type resolvedAggregators[N int64 | float64] struct { + aggregators []internal.Aggregator[N] + err error +} + +type instrumentRegistry[N int64 | float64] struct { + c *cache[instrumentID, any] +} + +func newInstrumentRegistry[N int64 | float64](c *cache[instrumentID, any]) instrumentRegistry[N] { + if c == nil { + c = &cache[instrumentID, any]{} + } + return instrumentRegistry[N]{c: c} +} + +var errExists = errors.New("instrument already exists for different number type") + +func (q instrumentRegistry[N]) GetOrSet(key instrumentID, f func() ([]internal.Aggregator[N], error)) (aggs []internal.Aggregator[N], err error) { + vAny := q.c.GetOrSet(key, func() any { + a, err := f() + return &resolvedAggregators[N]{ + aggregators: a, + err: err, + } + }) + + switch v := vAny.(type) { + case *resolvedAggregators[N]: + aggs = v.aggregators + default: + err = errExists + } + return aggs, err +} diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 371e1829946..8c7284ff441 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -16,8 +16,6 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" - "errors" - "sync" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/instrument" @@ -26,7 +24,6 @@ import ( "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "go.opentelemetry.io/otel/metric/instrument/syncint64" "go.opentelemetry.io/otel/sdk/instrumentation" - "go.opentelemetry.io/otel/sdk/metric/internal" ) // meterRegistry keeps a record of initialized meters for instrumentation @@ -40,9 +37,7 @@ import ( // // All methods of a meterRegistry are safe to call concurrently. type meterRegistry struct { - sync.Mutex - - meters map[instrumentation.Scope]*meter + meters cache[instrumentation.Scope, *meter] pipes pipelines } @@ -53,77 +48,9 @@ type meterRegistry struct { // // Get is safe to call concurrently. func (r *meterRegistry) Get(s instrumentation.Scope) *meter { - r.Lock() - defer r.Unlock() - - if r.meters == nil { - m := &meter{ - Scope: s, - pipes: r.pipes, - } - r.meters = map[instrumentation.Scope]*meter{s: m} - return m - } - - m, ok := r.meters[s] - if ok { - return m - } - - m = &meter{ - Scope: s, - pipes: r.pipes, - } - r.meters[s] = m - return m -} - -type cache map[instrumentID]any - -type cacheResult[N int64 | float64] struct { - aggregators []internal.Aggregator[N] - err error -} - -type querier[N int64 | float64] struct { - sync.Mutex - - c cache -} - -func newQuerier[N int64 | float64](c cache) *querier[N] { - return &querier[N]{c: c} -} - -var ( - errCacheMiss = errors.New("cache miss") - errExists = errors.New("instrument already exists for different number type") -) - -func (q *querier[N]) Get(key instrumentID) (r *cacheResult[N], err error) { - q.Lock() - defer q.Unlock() - - vIface, ok := q.c[key] - if !ok { - err = errCacheMiss - return r, err - } - - switch v := vIface.(type) { - case *cacheResult[N]: - r = v - default: - err = errExists - } - return r, err -} - -func (q *querier[N]) Set(key instrumentID, val *cacheResult[N]) { - q.Lock() - defer q.Unlock() - - q.c[key] = val + return r.meters.GetOrSet(s, func() *meter { + return &meter{Scope: s, pipes: r.pipes} + }) } // meter handles the creation and coordination of all metric instruments. A @@ -134,7 +61,7 @@ type meter struct { instrumentation.Scope pipes pipelines - cache *cache + cache cache[instrumentID, any] } // Compile-time check meter implements metric.Meter. @@ -142,13 +69,13 @@ var _ metric.Meter = (*meter)(nil) // AsyncInt64 returns the asynchronous integer instrument provider. func (m *meter) AsyncInt64() asyncint64.InstrumentProvider { - q := newQuerier[int64](*m.cache) + q := newInstrumentRegistry[int64](&m.cache) return asyncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)} } // AsyncFloat64 returns the asynchronous floating-point instrument provider. func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { - q := newQuerier[float64](*m.cache) + q := newInstrumentRegistry[float64](&m.cache) return asyncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)} } @@ -161,12 +88,12 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context // SyncInt64 returns the synchronous integer instrument provider. func (m *meter) SyncInt64() syncint64.InstrumentProvider { - q := newQuerier[int64](*m.cache) + q := newInstrumentRegistry[int64](&m.cache) return syncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)} } // SyncFloat64 returns the synchronous floating-point instrument provider. func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { - q := newQuerier[float64](*m.cache) + q := newInstrumentRegistry[float64](&m.cache) return syncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)} } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 789cc980039..fd874768af1 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -345,11 +345,11 @@ func (p pipelines) registerCallback(fn func(context.Context)) { // measurements with while updating all pipelines that need to pull from those // aggregations. type resolver[N int64 | float64] struct { - cache *querier[N] + cache instrumentRegistry[N] inserters []*inserter[N] } -func newResolver[N int64 | float64](p pipelines, q *querier[N]) *resolver[N] { +func newResolver[N int64 | float64](p pipelines, q instrumentRegistry[N]) *resolver[N] { in := make([]*inserter[N], len(p)) for i := range in { in[i] = newInserter[N](p[i]) @@ -360,31 +360,24 @@ func newResolver[N int64 | float64](p pipelines, q *querier[N]) *resolver[N] { // Aggregators returns the Aggregators instrument inst needs to update when it // makes a measurement. func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { - id := instrumentID{scope: inst.Scope, name: inst.Name, description: inst.Description} - resp, err := r.cache.Get(id) - if err == nil { - return resp.aggregators, resp.err - } - if !errors.Is(err, errCacheMiss) { - return nil, err + id := instrumentID{ + scope: inst.Scope, + name: inst.Name, + description: inst.Description, } - var aggs []internal.Aggregator[N] - errs := &multierror{} - for _, i := range r.inserters { - a, err := i.Instrument(inst, instUnit) - if err != nil { - errs.append(err) + return r.cache.GetOrSet(id, func() ([]internal.Aggregator[N], error) { + var aggs []internal.Aggregator[N] + errs := &multierror{} + for _, i := range r.inserters { + a, err := i.Instrument(inst, instUnit) + if err != nil { + errs.append(err) + } + aggs = append(aggs, a...) } - aggs = append(aggs, a...) - } - - err = errs.errorOrNil() - r.cache.Set(id, &cacheResult[N]{ - aggregators: aggs, - err: err, + return aggs, errs.errorOrNil() }) - return aggs, err } type multierror struct { From 10169ab09e521a17d13150da19c83af06d277928 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 28 Sep 2022 07:27:21 -0700 Subject: [PATCH 03/22] Rename GetOrSet to Lookup --- sdk/metric/cache.go | 12 ++++++------ sdk/metric/meter.go | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sdk/metric/cache.go b/sdk/metric/cache.go index 3b3a82e5b86..1e69f9423b8 100644 --- a/sdk/metric/cache.go +++ b/sdk/metric/cache.go @@ -35,13 +35,13 @@ type cache[K comparable, V any] struct { data map[K]V } -// GetOrSet returns the value stored in the cache for key if it exists. -// Otherwise, f is called and the returned value is set in the cache for key -// and returned. +// Lookup returns the value stored in the cache with the accociated key if it +// exists. Otherwise, f is called and its returned value is set in the cache +// for key and returned. // -// GetOrSet is safe to call concurrently. It will hold the cache lock, so f +// Lookup is safe to call concurrently. It will hold the cache lock, so f // should not block excessively. -func (c *cache[K, V]) GetOrSet(key K, f func() V) V { +func (c *cache[K, V]) Lookup(key K, f func() V) V { c.Lock() defer c.Unlock() @@ -78,7 +78,7 @@ func newInstrumentRegistry[N int64 | float64](c *cache[instrumentID, any]) instr var errExists = errors.New("instrument already exists for different number type") func (q instrumentRegistry[N]) GetOrSet(key instrumentID, f func() ([]internal.Aggregator[N], error)) (aggs []internal.Aggregator[N], err error) { - vAny := q.c.GetOrSet(key, func() any { + vAny := q.c.Lookup(key, func() any { a, err := f() return &resolvedAggregators[N]{ aggregators: a, diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 8c7284ff441..500a8d9fc2a 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -48,7 +48,7 @@ type meterRegistry struct { // // Get is safe to call concurrently. func (r *meterRegistry) Get(s instrumentation.Scope) *meter { - return r.meters.GetOrSet(s, func() *meter { + return r.meters.Lookup(s, func() *meter { return &meter{Scope: s, pipes: r.pipes} }) } From 95d4f09c388295ede2075360f5f13aac6bd3e046 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 28 Sep 2022 08:01:00 -0700 Subject: [PATCH 04/22] Rename instrument cache --- sdk/metric/cache.go | 16 ++++++++++++---- sdk/metric/meter.go | 8 ++++---- sdk/metric/pipeline.go | 6 +++--- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/sdk/metric/cache.go b/sdk/metric/cache.go index 1e69f9423b8..babc22446d5 100644 --- a/sdk/metric/cache.go +++ b/sdk/metric/cache.go @@ -64,20 +64,28 @@ type resolvedAggregators[N int64 | float64] struct { err error } -type instrumentRegistry[N int64 | float64] struct { +type instrumentCache[N int64 | float64] struct { c *cache[instrumentID, any] } -func newInstrumentRegistry[N int64 | float64](c *cache[instrumentID, any]) instrumentRegistry[N] { +func newInstrumentCache[N int64 | float64](c *cache[instrumentID, any]) instrumentCache[N] { if c == nil { c = &cache[instrumentID, any]{} } - return instrumentRegistry[N]{c: c} + return instrumentCache[N]{c: c} } var errExists = errors.New("instrument already exists for different number type") -func (q instrumentRegistry[N]) GetOrSet(key instrumentID, f func() ([]internal.Aggregator[N], error)) (aggs []internal.Aggregator[N], err error) { +// Lookup returns the Aggregators and error for a cached instrumentID if they +// exist in the cache. Otherwise, f is called and its returned values are set +// in the cache and returned. +// +// If an instrumentID has been stored in the cache for a different N, an error +// is returned describing the conflict. +// +// Lookup is safe to call concurrently. +func (q instrumentCache[N]) Lookup(key instrumentID, f func() ([]internal.Aggregator[N], error)) (aggs []internal.Aggregator[N], err error) { vAny := q.c.Lookup(key, func() any { a, err := f() return &resolvedAggregators[N]{ diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 500a8d9fc2a..1a81f7d0280 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -69,13 +69,13 @@ var _ metric.Meter = (*meter)(nil) // AsyncInt64 returns the asynchronous integer instrument provider. func (m *meter) AsyncInt64() asyncint64.InstrumentProvider { - q := newInstrumentRegistry[int64](&m.cache) + q := newInstrumentCache[int64](&m.cache) return asyncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)} } // AsyncFloat64 returns the asynchronous floating-point instrument provider. func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { - q := newInstrumentRegistry[float64](&m.cache) + q := newInstrumentCache[float64](&m.cache) return asyncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)} } @@ -88,12 +88,12 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context // SyncInt64 returns the synchronous integer instrument provider. func (m *meter) SyncInt64() syncint64.InstrumentProvider { - q := newInstrumentRegistry[int64](&m.cache) + q := newInstrumentCache[int64](&m.cache) return syncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)} } // SyncFloat64 returns the synchronous floating-point instrument provider. func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { - q := newInstrumentRegistry[float64](&m.cache) + q := newInstrumentCache[float64](&m.cache) return syncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)} } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index fd874768af1..0320863d3c6 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -345,11 +345,11 @@ func (p pipelines) registerCallback(fn func(context.Context)) { // measurements with while updating all pipelines that need to pull from those // aggregations. type resolver[N int64 | float64] struct { - cache instrumentRegistry[N] + cache instrumentCache[N] inserters []*inserter[N] } -func newResolver[N int64 | float64](p pipelines, q instrumentRegistry[N]) *resolver[N] { +func newResolver[N int64 | float64](p pipelines, q instrumentCache[N]) *resolver[N] { in := make([]*inserter[N], len(p)) for i := range in { in[i] = newInserter[N](p[i]) @@ -366,7 +366,7 @@ func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]i description: inst.Description, } - return r.cache.GetOrSet(id, func() ([]internal.Aggregator[N], error) { + return r.cache.Lookup(id, func() ([]internal.Aggregator[N], error) { var aggs []internal.Aggregator[N] errs := &multierror{} for _, i := range r.inserters { From 587a43af7f2fba2dacf565d43354dc2b864adb7a Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 28 Sep 2022 08:02:54 -0700 Subject: [PATCH 05/22] Move instrumentCache to pipeline.go --- sdk/metric/cache.go | 48 ------------------------------------------ sdk/metric/pipeline.go | 45 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 48 deletions(-) diff --git a/sdk/metric/cache.go b/sdk/metric/cache.go index babc22446d5..b349cf330a9 100644 --- a/sdk/metric/cache.go +++ b/sdk/metric/cache.go @@ -15,10 +15,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( - "errors" "sync" - - "go.opentelemetry.io/otel/sdk/metric/internal" ) // cache is a locking storage used to quickly return already computed values. A @@ -57,48 +54,3 @@ func (c *cache[K, V]) Lookup(key K, f func() V) V { c.data[key] = val return val } - -// resolvedAggregators is the result of resolving aggregators for an instrument. -type resolvedAggregators[N int64 | float64] struct { - aggregators []internal.Aggregator[N] - err error -} - -type instrumentCache[N int64 | float64] struct { - c *cache[instrumentID, any] -} - -func newInstrumentCache[N int64 | float64](c *cache[instrumentID, any]) instrumentCache[N] { - if c == nil { - c = &cache[instrumentID, any]{} - } - return instrumentCache[N]{c: c} -} - -var errExists = errors.New("instrument already exists for different number type") - -// Lookup returns the Aggregators and error for a cached instrumentID if they -// exist in the cache. Otherwise, f is called and its returned values are set -// in the cache and returned. -// -// If an instrumentID has been stored in the cache for a different N, an error -// is returned describing the conflict. -// -// Lookup is safe to call concurrently. -func (q instrumentCache[N]) Lookup(key instrumentID, f func() ([]internal.Aggregator[N], error)) (aggs []internal.Aggregator[N], err error) { - vAny := q.c.Lookup(key, func() any { - a, err := f() - return &resolvedAggregators[N]{ - aggregators: a, - err: err, - } - }) - - switch v := vAny.(type) { - case *resolvedAggregators[N]: - aggs = v.aggregators - default: - err = errExists - } - return aggs, err -} diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 0320863d3c6..b307ad54260 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -380,6 +380,51 @@ func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]i }) } +// resolvedAggregators is the result of resolving aggregators for an instrument. +type resolvedAggregators[N int64 | float64] struct { + aggregators []internal.Aggregator[N] + err error +} + +type instrumentCache[N int64 | float64] struct { + c *cache[instrumentID, any] +} + +func newInstrumentCache[N int64 | float64](c *cache[instrumentID, any]) instrumentCache[N] { + if c == nil { + c = &cache[instrumentID, any]{} + } + return instrumentCache[N]{c: c} +} + +var errExists = errors.New("instrument already exists for different number type") + +// Lookup returns the Aggregators and error for a cached instrumentID if they +// exist in the cache. Otherwise, f is called and its returned values are set +// in the cache and returned. +// +// If an instrumentID has been stored in the cache for a different N, an error +// is returned describing the conflict. +// +// Lookup is safe to call concurrently. +func (q instrumentCache[N]) Lookup(key instrumentID, f func() ([]internal.Aggregator[N], error)) (aggs []internal.Aggregator[N], err error) { + vAny := q.c.Lookup(key, func() any { + a, err := f() + return &resolvedAggregators[N]{ + aggregators: a, + err: err, + } + }) + + switch v := vAny.(type) { + case *resolvedAggregators[N]: + aggs = v.aggregators + default: + err = errExists + } + return aggs, err +} + type multierror struct { wrapped error errors []string From e3b4fcbdc40f44dbfdec62b58ceffaa2e1e99522 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 28 Sep 2022 08:08:20 -0700 Subject: [PATCH 06/22] Revert meterRegistry changes --- sdk/metric/meter.go | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 1a81f7d0280..7ae6beec4b0 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -16,6 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" + "sync" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/instrument" @@ -37,7 +38,9 @@ import ( // // All methods of a meterRegistry are safe to call concurrently. type meterRegistry struct { - meters cache[instrumentation.Scope, *meter] + sync.Mutex + + meters map[instrumentation.Scope]*meter pipes pipelines } @@ -48,9 +51,29 @@ type meterRegistry struct { // // Get is safe to call concurrently. func (r *meterRegistry) Get(s instrumentation.Scope) *meter { - return r.meters.Lookup(s, func() *meter { - return &meter{Scope: s, pipes: r.pipes} - }) + r.Lock() + defer r.Unlock() + + if r.meters == nil { + m := &meter{ + Scope: s, + pipes: r.pipes, + } + r.meters = map[instrumentation.Scope]*meter{s: m} + return m + } + + m, ok := r.meters[s] + if ok { + return m + } + + m = &meter{ + Scope: s, + pipes: r.pipes, + } + r.meters[s] = m + return m } // meter handles the creation and coordination of all metric instruments. A From ddafddc505445dab968fae69d92cf0c960cbeb95 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 28 Sep 2022 08:11:22 -0700 Subject: [PATCH 07/22] Rename vars --- sdk/metric/meter.go | 16 ++++++++-------- sdk/metric/pipeline.go | 12 ++++++------ 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 7ae6beec4b0..78991218509 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -92,14 +92,14 @@ var _ metric.Meter = (*meter)(nil) // AsyncInt64 returns the asynchronous integer instrument provider. func (m *meter) AsyncInt64() asyncint64.InstrumentProvider { - q := newInstrumentCache[int64](&m.cache) - return asyncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)} + c := newInstrumentCache[int64](&m.cache) + return asyncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} } // AsyncFloat64 returns the asynchronous floating-point instrument provider. func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { - q := newInstrumentCache[float64](&m.cache) - return asyncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)} + c := newInstrumentCache[float64](&m.cache) + return asyncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} } // RegisterCallback registers the function f to be called when any of the @@ -111,12 +111,12 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context // SyncInt64 returns the synchronous integer instrument provider. func (m *meter) SyncInt64() syncint64.InstrumentProvider { - q := newInstrumentCache[int64](&m.cache) - return syncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)} + c := newInstrumentCache[int64](&m.cache) + return syncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} } // SyncFloat64 returns the synchronous floating-point instrument provider. func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { - q := newInstrumentCache[float64](&m.cache) - return syncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, q)} + c := newInstrumentCache[float64](&m.cache) + return syncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index b307ad54260..33caf98db77 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -349,12 +349,12 @@ type resolver[N int64 | float64] struct { inserters []*inserter[N] } -func newResolver[N int64 | float64](p pipelines, q instrumentCache[N]) *resolver[N] { +func newResolver[N int64 | float64](p pipelines, c instrumentCache[N]) *resolver[N] { in := make([]*inserter[N], len(p)) for i := range in { in[i] = newInserter[N](p[i]) } - return &resolver[N]{cache: q, inserters: in} + return &resolver[N]{cache: c, inserters: in} } // Aggregators returns the Aggregators instrument inst needs to update when it @@ -387,14 +387,14 @@ type resolvedAggregators[N int64 | float64] struct { } type instrumentCache[N int64 | float64] struct { - c *cache[instrumentID, any] + cache *cache[instrumentID, any] } func newInstrumentCache[N int64 | float64](c *cache[instrumentID, any]) instrumentCache[N] { if c == nil { c = &cache[instrumentID, any]{} } - return instrumentCache[N]{c: c} + return instrumentCache[N]{cache: c} } var errExists = errors.New("instrument already exists for different number type") @@ -407,8 +407,8 @@ var errExists = errors.New("instrument already exists for different number type" // is returned describing the conflict. // // Lookup is safe to call concurrently. -func (q instrumentCache[N]) Lookup(key instrumentID, f func() ([]internal.Aggregator[N], error)) (aggs []internal.Aggregator[N], err error) { - vAny := q.c.Lookup(key, func() any { +func (c instrumentCache[N]) Lookup(key instrumentID, f func() ([]internal.Aggregator[N], error)) (aggs []internal.Aggregator[N], err error) { + vAny := c.cache.Lookup(key, func() any { a, err := f() return &resolvedAggregators[N]{ aggregators: a, From 35342cfa921923026aae08250bfabe6ab8c65039 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 28 Sep 2022 08:14:42 -0700 Subject: [PATCH 08/22] Rename errExists --- sdk/metric/pipeline.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 33caf98db77..1484c195e44 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -34,6 +34,8 @@ var ( errCreatingAggregators = errors.New("could not create all aggregators") errIncompatibleAggregation = errors.New("incompatible aggregation") errUnknownAggregation = errors.New("unrecognized aggregation") + + errCacheNumberConflict = errors.New("instrument already exists: conflicting number type") ) type aggregator interface { @@ -397,8 +399,6 @@ func newInstrumentCache[N int64 | float64](c *cache[instrumentID, any]) instrume return instrumentCache[N]{cache: c} } -var errExists = errors.New("instrument already exists for different number type") - // Lookup returns the Aggregators and error for a cached instrumentID if they // exist in the cache. Otherwise, f is called and its returned values are set // in the cache and returned. @@ -420,7 +420,7 @@ func (c instrumentCache[N]) Lookup(key instrumentID, f func() ([]internal.Aggreg case *resolvedAggregators[N]: aggs = v.aggregators default: - err = errExists + err = errCacheNumberConflict } return aggs, err } From a322038bc505f322402ca7a3d587e3b7587fd6e2 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 28 Sep 2022 08:36:04 -0700 Subject: [PATCH 09/22] Fix cache hit return value --- sdk/metric/pipeline.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 1484c195e44..7900c3ccc2e 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -418,7 +418,7 @@ func (c instrumentCache[N]) Lookup(key instrumentID, f func() ([]internal.Aggreg switch v := vAny.(type) { case *resolvedAggregators[N]: - aggs = v.aggregators + aggs, err = v.aggregators, v.err default: err = errCacheNumberConflict } From f4530e55c9c949409c7bff599ff7e4e3b9096f55 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 28 Sep 2022 08:36:21 -0700 Subject: [PATCH 10/22] Fix tests --- sdk/metric/pipeline_registry_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index f89a09360ba..ce58f5f8bdf 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -334,7 +334,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) { inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} - r := newResolver[int64](p) + r := newResolver(p, newInstrumentCache[int64](nil)) aggs, err := r.Aggregators(inst, unit.Dimensionless) assert.NoError(t, err) @@ -344,7 +344,7 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCo func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) { inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} - r := newResolver[float64](p) + r := newResolver(p, newInstrumentCache[float64](nil)) aggs, err := r.Aggregators(inst, unit.Dimensionless) assert.NoError(t, err) @@ -375,14 +375,14 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { p := newPipelines(resource.Empty(), views) inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge} - ri := newResolver[int64](p) + ri := newResolver(p, newInstrumentCache[int64](nil)) intAggs, err := ri.Aggregators(inst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, intAggs, 0) p = newPipelines(resource.Empty(), views) - rf := newResolver[float64](p) + rf := newResolver(p, newInstrumentCache[float64](nil)) floatAggs, err := rf.Aggregators(inst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, floatAggs, 0) @@ -405,7 +405,7 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { p := newPipelines(resource.Empty(), views) - ri := newResolver[int64](p) + ri := newResolver(p, newInstrumentCache[int64](nil)) intAggs, err := ri.Aggregators(fooInst, unit.Dimensionless) assert.NoError(t, err) assert.Len(t, intAggs, 1) @@ -416,7 +416,7 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { assert.Len(t, intAggs, 2) // Creating a float foo instrument should error because there is an int foo instrument. - rf := newResolver[float64](p) + rf := newResolver(p, newInstrumentCache[float64](nil)) floatAggs, err := rf.Aggregators(fooInst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, floatAggs, 1) From c7b9f2dcf072e0306cea200fd07be48e949535f4 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 28 Sep 2022 09:25:01 -0700 Subject: [PATCH 11/22] Test instrument cache number conflict --- sdk/metric/pipeline_test.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index ca83c9c3a9e..89a9f13bdda 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/internal" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" ) @@ -211,3 +212,30 @@ func TestPipelineConcurrency(t *testing.T) { } wg.Wait() } + +func TestInstrumentCacheNumberConflict(t *testing.T) { + c := cache[instrumentID, any]{} + + key := instrumentID{ + scope: instrumentation.Scope{Name: "scope name"}, + name: "name", + description: "description", + } + aggs := []internal.Aggregator[int64]{internal.NewCumulativeSum[int64](true)} + + instCachI := newInstrumentCache[int64](&c) + gotI, err := instCachI.Lookup(key, func() ([]internal.Aggregator[int64], error) { + return aggs, nil + }) + require.NoError(t, err) + require.Equal(t, aggs, gotI) + + instCachF := newInstrumentCache[float64](&c) + gotF, err := instCachF.Lookup(key, func() ([]internal.Aggregator[float64], error) { + return []internal.Aggregator[float64]{ + internal.NewCumulativeSum[float64](true), + }, nil + }) + assert.ErrorIs(t, err, errCacheNumberConflict) + assert.Nil(t, gotF, "cache conflict should not return a value") +} From f525a178e6a4a4e96263544c42e609b9b6e263ab Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 28 Sep 2022 09:34:59 -0700 Subject: [PATCH 12/22] Add cache test --- sdk/metric/cache.go | 4 +--- sdk/metric/cache_test.go | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 3 deletions(-) create mode 100644 sdk/metric/cache_test.go diff --git a/sdk/metric/cache.go b/sdk/metric/cache.go index b349cf330a9..ce7db499263 100644 --- a/sdk/metric/cache.go +++ b/sdk/metric/cache.go @@ -14,9 +14,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" -import ( - "sync" -) +import "sync" // cache is a locking storage used to quickly return already computed values. A // registry type should be used with a cache for get and set operations of diff --git a/sdk/metric/cache_test.go b/sdk/metric/cache_test.go new file mode 100644 index 00000000000..4a27b4256d1 --- /dev/null +++ b/sdk/metric/cache_test.go @@ -0,0 +1,39 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric // import "go.opentelemetry.io/otel/sdk/metric" + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCache(t *testing.T) { + k0, k1 := "one", "two" + v0, v1 := 1, 2 + + c := cache[string, int]{} + + var got int + require.NotPanics(t, func() { + got = c.Lookup(k0, func() int { return v0 }) + }, "zero-value cache panics on Lookup") + assert.Equal(t, v0, got, "zero-value cache did not return fallback") + + assert.Equal(t, v0, c.Lookup(k0, func() int { return v1 }), "existing key") + + assert.Equal(t, v1, c.Lookup(k1, func() int { return v1 }), "non-existing key") +} From 6bf37ec3b1faf169b7ae51a12d1772b56a6869ba Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 28 Sep 2022 09:36:21 -0700 Subject: [PATCH 13/22] Edit cache documentation --- sdk/metric/cache.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdk/metric/cache.go b/sdk/metric/cache.go index ce7db499263..729e626b64a 100644 --- a/sdk/metric/cache.go +++ b/sdk/metric/cache.go @@ -16,9 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import "sync" -// cache is a locking storage used to quickly return already computed values. A -// registry type should be used with a cache for get and set operations of -// certain types. +// cache is a locking storage used to quickly return already computed values. // // The zero value of a cache is empty and ready to use. // From 1d4f1d2a142a0115fe3de675488f2b2034da139f Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 28 Sep 2022 09:44:54 -0700 Subject: [PATCH 14/22] Add changes to changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cf26a4ff104..10a97ded172 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Upgrade `golang.org/x/sys/unix` from `v0.0.0-20210423185535-09eb48e85fd7` to `v0.0.0-20220919091848-fb04ddd9f9c8`. This addresses [GO-2022-0493](https://pkg.go.dev/vuln/GO-2022-0493). (#3235) +### Fixed + +- Return the same instrument for equivalent creation calls. (#3229, #3238) + ## [0.32.1] Metric SDK (Alpha) - 2022-09-22 ### Changed From fb5a426fc05e3fbfeeb0762306516631ac64b37c Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 28 Sep 2022 14:04:47 -0700 Subject: [PATCH 15/22] Rename instrumentCache --- sdk/metric/meter.go | 8 ++++---- sdk/metric/pipeline.go | 12 ++++++------ sdk/metric/pipeline_registry_test.go | 12 ++++++------ sdk/metric/pipeline_test.go | 4 ++-- 4 files changed, 18 insertions(+), 18 deletions(-) diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 78991218509..f40c05ac3a5 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -92,13 +92,13 @@ var _ metric.Meter = (*meter)(nil) // AsyncInt64 returns the asynchronous integer instrument provider. func (m *meter) AsyncInt64() asyncint64.InstrumentProvider { - c := newInstrumentCache[int64](&m.cache) + c := newResolverCache[int64](&m.cache) return asyncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} } // AsyncFloat64 returns the asynchronous floating-point instrument provider. func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { - c := newInstrumentCache[float64](&m.cache) + c := newResolverCache[float64](&m.cache) return asyncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} } @@ -111,12 +111,12 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context // SyncInt64 returns the synchronous integer instrument provider. func (m *meter) SyncInt64() syncint64.InstrumentProvider { - c := newInstrumentCache[int64](&m.cache) + c := newResolverCache[int64](&m.cache) return syncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} } // SyncFloat64 returns the synchronous floating-point instrument provider. func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { - c := newInstrumentCache[float64](&m.cache) + c := newResolverCache[float64](&m.cache) return syncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 7900c3ccc2e..34caee62e46 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -347,11 +347,11 @@ func (p pipelines) registerCallback(fn func(context.Context)) { // measurements with while updating all pipelines that need to pull from those // aggregations. type resolver[N int64 | float64] struct { - cache instrumentCache[N] + cache resolverCache[N] inserters []*inserter[N] } -func newResolver[N int64 | float64](p pipelines, c instrumentCache[N]) *resolver[N] { +func newResolver[N int64 | float64](p pipelines, c resolverCache[N]) *resolver[N] { in := make([]*inserter[N], len(p)) for i := range in { in[i] = newInserter[N](p[i]) @@ -388,15 +388,15 @@ type resolvedAggregators[N int64 | float64] struct { err error } -type instrumentCache[N int64 | float64] struct { +type resolverCache[N int64 | float64] struct { cache *cache[instrumentID, any] } -func newInstrumentCache[N int64 | float64](c *cache[instrumentID, any]) instrumentCache[N] { +func newResolverCache[N int64 | float64](c *cache[instrumentID, any]) resolverCache[N] { if c == nil { c = &cache[instrumentID, any]{} } - return instrumentCache[N]{cache: c} + return resolverCache[N]{cache: c} } // Lookup returns the Aggregators and error for a cached instrumentID if they @@ -407,7 +407,7 @@ func newInstrumentCache[N int64 | float64](c *cache[instrumentID, any]) instrume // is returned describing the conflict. // // Lookup is safe to call concurrently. -func (c instrumentCache[N]) Lookup(key instrumentID, f func() ([]internal.Aggregator[N], error)) (aggs []internal.Aggregator[N], err error) { +func (c resolverCache[N]) Lookup(key instrumentID, f func() ([]internal.Aggregator[N], error)) (aggs []internal.Aggregator[N], err error) { vAny := c.cache.Lookup(key, func() any { a, err := f() return &resolvedAggregators[N]{ diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index ce58f5f8bdf..221a6279e21 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -334,7 +334,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) { inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} - r := newResolver(p, newInstrumentCache[int64](nil)) + r := newResolver(p, newResolverCache[int64](nil)) aggs, err := r.Aggregators(inst, unit.Dimensionless) assert.NoError(t, err) @@ -344,7 +344,7 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCo func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) { inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} - r := newResolver(p, newInstrumentCache[float64](nil)) + r := newResolver(p, newResolverCache[float64](nil)) aggs, err := r.Aggregators(inst, unit.Dimensionless) assert.NoError(t, err) @@ -375,14 +375,14 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { p := newPipelines(resource.Empty(), views) inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge} - ri := newResolver(p, newInstrumentCache[int64](nil)) + ri := newResolver(p, newResolverCache[int64](nil)) intAggs, err := ri.Aggregators(inst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, intAggs, 0) p = newPipelines(resource.Empty(), views) - rf := newResolver(p, newInstrumentCache[float64](nil)) + rf := newResolver(p, newResolverCache[float64](nil)) floatAggs, err := rf.Aggregators(inst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, floatAggs, 0) @@ -405,7 +405,7 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { p := newPipelines(resource.Empty(), views) - ri := newResolver(p, newInstrumentCache[int64](nil)) + ri := newResolver(p, newResolverCache[int64](nil)) intAggs, err := ri.Aggregators(fooInst, unit.Dimensionless) assert.NoError(t, err) assert.Len(t, intAggs, 1) @@ -416,7 +416,7 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { assert.Len(t, intAggs, 2) // Creating a float foo instrument should error because there is an int foo instrument. - rf := newResolver(p, newInstrumentCache[float64](nil)) + rf := newResolver(p, newResolverCache[float64](nil)) floatAggs, err := rf.Aggregators(fooInst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, floatAggs, 1) diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index 89a9f13bdda..582ea91fce3 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -223,14 +223,14 @@ func TestInstrumentCacheNumberConflict(t *testing.T) { } aggs := []internal.Aggregator[int64]{internal.NewCumulativeSum[int64](true)} - instCachI := newInstrumentCache[int64](&c) + instCachI := newResolverCache[int64](&c) gotI, err := instCachI.Lookup(key, func() ([]internal.Aggregator[int64], error) { return aggs, nil }) require.NoError(t, err) require.Equal(t, aggs, gotI) - instCachF := newInstrumentCache[float64](&c) + instCachF := newResolverCache[float64](&c) gotF, err := instCachF.Lookup(key, func() ([]internal.Aggregator[float64], error) { return []internal.Aggregator[float64]{ internal.NewCumulativeSum[float64](true), From e72e508264cd29357cd75c7f6cbb6267387e10ee Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 28 Sep 2022 14:19:36 -0700 Subject: [PATCH 16/22] Add insterterCache --- sdk/metric/pipeline.go | 48 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 34caee62e46..da5d118c92c 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -178,6 +178,54 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err }, nil } +type insterterCache[N int64 | float64] struct { + cache *cache[string, any] +} + +type instrumentAggregators[N int64 | float64] struct { + inst view.Instrument + unit unit.Unit + aggregators []internal.Aggregator[N] + err error +} + +var errConflict = errors.New("instrument already exists") + +func (i instrumentAggregators[N]) conflict(inst view.Instrument, u unit.Unit) error { + if i.inst.Name != inst.Name || + i.inst.Description != inst.Description || + i.inst.Scope != inst.Scope || + //i.inst.Aggregation != inst.Aggregation || // FIXME: make this work. + i.unit != u { + return errConflict + } + return nil +} + +func (c insterterCache[N]) Lookup(inst view.Instrument, u unit.Unit, f func() ([]internal.Aggregator[N], error)) (aggs []internal.Aggregator[N], err error) { + vAny := c.cache.Lookup(inst.Name, func() any { + a, err := f() + return instrumentAggregators[N]{ + inst: inst, + unit: u, + aggregators: a, + err: err, + } + }) + + switch v := vAny.(type) { + case instrumentAggregators[N]: + aggs = v.aggregators + err = v.conflict(inst, u) + if err == nil { + err = v.err + } + default: + err = errCacheNumberConflict + } + return aggs, err +} + // inserter facilitates inserting of new instruments into a pipeline. type inserter[N int64 | float64] struct { pipeline *pipeline From 66107dd8672cfc486a7a89d53d90afa0aa7d1d55 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 28 Sep 2022 14:24:56 -0700 Subject: [PATCH 17/22] Use the insterterCache --- sdk/metric/meter.go | 25 +++++++++++++++---------- sdk/metric/pipeline.go | 18 +++++++++++++----- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index f40c05ac3a5..ef99d392323 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -83,8 +83,9 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter { type meter struct { instrumentation.Scope - pipes pipelines - cache cache[instrumentID, any] + pipes pipelines + resolverCache cache[instrumentID, any] + insterterCache cache[string, any] } // Compile-time check meter implements metric.Meter. @@ -92,14 +93,16 @@ var _ metric.Meter = (*meter)(nil) // AsyncInt64 returns the asynchronous integer instrument provider. func (m *meter) AsyncInt64() asyncint64.InstrumentProvider { - c := newResolverCache[int64](&m.cache) - return asyncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} + ic := newInserterCache[int64](&m.insterterCache) + rc := newResolverCache[int64](&m.resolverCache) + return asyncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, rc, ic)} } // AsyncFloat64 returns the asynchronous floating-point instrument provider. func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { - c := newResolverCache[float64](&m.cache) - return asyncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} + ic := newInserterCache[float64](&m.insterterCache) + rc := newResolverCache[float64](&m.resolverCache) + return asyncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, rc, ic)} } // RegisterCallback registers the function f to be called when any of the @@ -111,12 +114,14 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context // SyncInt64 returns the synchronous integer instrument provider. func (m *meter) SyncInt64() syncint64.InstrumentProvider { - c := newResolverCache[int64](&m.cache) - return syncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} + ic := newInserterCache[int64](&m.insterterCache) + rn := newResolverCache[int64](&m.resolverCache) + return syncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, rn, ic)} } // SyncFloat64 returns the synchronous floating-point instrument provider. func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { - c := newResolverCache[float64](&m.cache) - return syncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} + ic := newInserterCache[float64](&m.insterterCache) + rn := newResolverCache[float64](&m.resolverCache) + return syncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, rn, ic)} } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index da5d118c92c..27fd3b1325d 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -182,6 +182,13 @@ type insterterCache[N int64 | float64] struct { cache *cache[string, any] } +func newInserterCache[N int64 | float64](c *cache[string, any]) insterterCache[N] { + if c == nil { + c = &cache[string, any]{} + } + return insterterCache[N]{cache: c} +} + type instrumentAggregators[N int64 | float64] struct { inst view.Instrument unit unit.Unit @@ -228,11 +235,12 @@ func (c insterterCache[N]) Lookup(inst view.Instrument, u unit.Unit, f func() ([ // inserter facilitates inserting of new instruments into a pipeline. type inserter[N int64 | float64] struct { + cache insterterCache[N] pipeline *pipeline } -func newInserter[N int64 | float64](p *pipeline) *inserter[N] { - return &inserter[N]{p} +func newInserter[N int64 | float64](p *pipeline, c insterterCache[N]) *inserter[N] { + return &inserter[N]{cache: c, pipeline: p} } // Instrument inserts instrument inst with instUnit returning the Aggregators @@ -399,12 +407,12 @@ type resolver[N int64 | float64] struct { inserters []*inserter[N] } -func newResolver[N int64 | float64](p pipelines, c resolverCache[N]) *resolver[N] { +func newResolver[N int64 | float64](p pipelines, rc resolverCache[N], ic insterterCache[N]) *resolver[N] { in := make([]*inserter[N], len(p)) for i := range in { - in[i] = newInserter[N](p[i]) + in[i] = newInserter[N](p[i], ic) } - return &resolver[N]{cache: c, inserters: in} + return &resolver[N]{cache: rc, inserters: in} } // Aggregators returns the Aggregators instrument inst needs to update when it From b2b42795ef1364e9218da339efc35e60aea3f459 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 28 Sep 2022 14:27:35 -0700 Subject: [PATCH 18/22] Fix tests --- sdk/metric/pipeline_registry_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index 221a6279e21..fd76916101f 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -211,7 +211,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { } for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { - i := newInserter[N](newPipeline(nil, tt.reader, tt.views)) + i := newInserter[N](newPipeline(nil, tt.reader, tt.views), newInserterCache[N](nil)) got, err := i.Instrument(tt.inst, unit.Dimensionless) assert.ErrorIs(t, err, tt.wantErr) require.Len(t, got, tt.wantLen) @@ -223,7 +223,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { } func testInvalidInstrumentShouldPanic[N int64 | float64]() { - i := newInserter[N](newPipeline(nil, NewManualReader(), []view.View{{}})) + i := newInserter[N](newPipeline(nil, NewManualReader(), []view.View{{}}), newInserterCache[N](nil)) inst := view.Instrument{ Name: "foo", Kind: view.InstrumentKind(255), @@ -334,7 +334,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) { inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} - r := newResolver(p, newResolverCache[int64](nil)) + r := newResolver(p, newResolverCache[int64](nil), newInserterCache[int64](nil)) aggs, err := r.Aggregators(inst, unit.Dimensionless) assert.NoError(t, err) @@ -344,7 +344,7 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCo func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) { inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} - r := newResolver(p, newResolverCache[float64](nil)) + r := newResolver(p, newResolverCache[float64](nil), newInserterCache[float64](nil)) aggs, err := r.Aggregators(inst, unit.Dimensionless) assert.NoError(t, err) @@ -375,14 +375,14 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { p := newPipelines(resource.Empty(), views) inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge} - ri := newResolver(p, newResolverCache[int64](nil)) + ri := newResolver(p, newResolverCache[int64](nil), newInserterCache[int64](nil)) intAggs, err := ri.Aggregators(inst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, intAggs, 0) p = newPipelines(resource.Empty(), views) - rf := newResolver(p, newResolverCache[float64](nil)) + rf := newResolver(p, newResolverCache[float64](nil), newInserterCache[float64](nil)) floatAggs, err := rf.Aggregators(inst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, floatAggs, 0) @@ -405,7 +405,7 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { p := newPipelines(resource.Empty(), views) - ri := newResolver(p, newResolverCache[int64](nil)) + ri := newResolver(p, newResolverCache[int64](nil), newInserterCache[int64](nil)) intAggs, err := ri.Aggregators(fooInst, unit.Dimensionless) assert.NoError(t, err) assert.Len(t, intAggs, 1) @@ -416,7 +416,7 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { assert.Len(t, intAggs, 2) // Creating a float foo instrument should error because there is an int foo instrument. - rf := newResolver(p, newResolverCache[float64](nil)) + rf := newResolver(p, newResolverCache[float64](nil), newInserterCache[float64](nil)) floatAggs, err := rf.Aggregators(fooInst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, floatAggs, 1) From 1e226716051cb90a6a235fa69a00e074d3d55c68 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 29 Sep 2022 08:10:39 -0700 Subject: [PATCH 19/22] Update insterterCache to manage a single instrument --- sdk/metric/pipeline.go | 94 +++++++++++++++------------- sdk/metric/pipeline_registry_test.go | 2 +- 2 files changed, 50 insertions(+), 46 deletions(-) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 27fd3b1325d..4ffdae069ed 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -189,16 +189,16 @@ func newInserterCache[N int64 | float64](c *cache[string, any]) insterterCache[N return insterterCache[N]{cache: c} } -type instrumentAggregators[N int64 | float64] struct { - inst view.Instrument - unit unit.Unit - aggregators []internal.Aggregator[N] - err error +type instrumentAggregator[N int64 | float64] struct { + inst view.Instrument + unit unit.Unit + aggregator internal.Aggregator[N] + err error } var errConflict = errors.New("instrument already exists") -func (i instrumentAggregators[N]) conflict(inst view.Instrument, u unit.Unit) error { +func (i instrumentAggregator[N]) conflict(inst view.Instrument, u unit.Unit) error { if i.inst.Name != inst.Name || i.inst.Description != inst.Description || i.inst.Scope != inst.Scope || @@ -209,20 +209,20 @@ func (i instrumentAggregators[N]) conflict(inst view.Instrument, u unit.Unit) er return nil } -func (c insterterCache[N]) Lookup(inst view.Instrument, u unit.Unit, f func() ([]internal.Aggregator[N], error)) (aggs []internal.Aggregator[N], err error) { +func (c insterterCache[N]) Lookup(inst view.Instrument, u unit.Unit, f func() (internal.Aggregator[N], error)) (aggs internal.Aggregator[N], err error) { vAny := c.cache.Lookup(inst.Name, func() any { a, err := f() - return instrumentAggregators[N]{ - inst: inst, - unit: u, - aggregators: a, - err: err, + return instrumentAggregator[N]{ + inst: inst, + unit: u, + aggregator: a, + err: err, } }) switch v := vAny.(type) { - case instrumentAggregators[N]: - aggs = v.aggregators + case instrumentAggregator[N]: + aggs = v.aggregator err = v.conflict(inst, u) if err == nil { err = v.err @@ -246,59 +246,63 @@ func newInserter[N int64 | float64](p *pipeline, c insterterCache[N]) *inserter[ // Instrument inserts instrument inst with instUnit returning the Aggregators // that need to be updated with measurments for that instrument. func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { - seen := map[instrumentID]struct{}{} var aggs []internal.Aggregator[N] errs := &multierror{wrapped: errCreatingAggregators} + // The cache will return the same Aggregator instance. Use this fact to + // compare pointer addresses to deduplicate Aggregators. + seen := make(map[internal.Aggregator[N]]struct{}) for _, v := range i.pipeline.views { inst, match := v.TransformInstrument(inst) - - id := instrumentID{ - scope: inst.Scope, - name: inst.Name, - description: inst.Description, - } - - if _, ok := seen[id]; ok || !match { + if !match { continue } - - if inst.Aggregation == nil { - inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind) - } else if _, ok := inst.Aggregation.(aggregation.Default); ok { - inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind) - } - - if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil { - err = fmt.Errorf("creating aggregator with instrumentKind: %d, aggregation %v: %w", inst.Kind, inst.Aggregation, err) - errs.append(err) - continue - } - - agg, err := i.aggregator(inst) + agg, err := i.cachedAggregator(inst, instUnit) if err != nil { errs.append(err) - continue } if agg == nil { // Drop aggregator. continue } - // TODO (#3011): If filtering is done at the instrument level add here. - // This is where the aggregator and the view are both in scope. - aggs = append(aggs, agg) - seen[id] = struct{}{} - err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, agg) - if err != nil { - errs.append(err) + if _, ok := seen[agg]; ok { + // This aggregator has aleady been added. + continue } + seen[agg] = struct{}{} + aggs = append(aggs, agg) } // TODO(#3224): handle when no views match. Default should be reader // aggregation returned. return aggs, errs.errorOrNil() } +func (i *inserter[N]) cachedAggregator(inst view.Instrument, u unit.Unit) (internal.Aggregator[N], error) { + return i.cache.Lookup(inst, u, func() (internal.Aggregator[N], error) { + agg, err := i.aggregator(inst) + if err != nil { + return nil, err + } + if agg == nil { // Drop aggregator. + return nil, nil + } + err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, u, agg) + return agg, err + }) +} + // aggregator returns the Aggregator for an instrument configuration. If the // instrument defines an unknown aggregation, an error is returned. func (i *inserter[N]) aggregator(inst view.Instrument) (internal.Aggregator[N], error) { + if inst.Aggregation == nil { + inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind) + } else if _, ok := inst.Aggregation.(aggregation.Default); ok { + inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind) + } + + if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil { + err = fmt.Errorf("creating aggregator with instrumentKind: %d, aggregation %v: %w", inst.Kind, inst.Aggregation, err) + return nil, err + } + // TODO (#3011): If filtering is done by the Aggregator it should be passed // here. var ( diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index fd76916101f..27869545c05 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -412,7 +412,7 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { // The Rename view should error, because it creates a foo instrument. intAggs, err = ri.Aggregators(barInst, unit.Dimensionless) - assert.Error(t, err) + assert.NoError(t, err) assert.Len(t, intAggs, 2) // Creating a float foo instrument should error because there is an int foo instrument. From ab9984b6124262bb94ef32481d11d08ff702bf83 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 29 Sep 2022 09:13:06 -0700 Subject: [PATCH 20/22] Move insterterCache into cache.go renaming to aggCache --- sdk/metric/cache.go | 97 +++++++++++++++++++++++++++- sdk/metric/meter.go | 14 ++-- sdk/metric/pipeline.go | 65 ++----------------- sdk/metric/pipeline_registry_test.go | 16 ++--- sdk/metric/pipeline_test.go | 2 +- 5 files changed, 116 insertions(+), 78 deletions(-) diff --git a/sdk/metric/cache.go b/sdk/metric/cache.go index 729e626b64a..76f55a48b78 100644 --- a/sdk/metric/cache.go +++ b/sdk/metric/cache.go @@ -14,7 +14,24 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" -import "sync" +import ( + "errors" + "fmt" + "sync" + + "go.opentelemetry.io/otel/metric/unit" + "go.opentelemetry.io/otel/sdk/metric/internal" + "go.opentelemetry.io/otel/sdk/metric/view" +) + +var ( + errInstConflict = errors.New("instrument already exists") + errInstConflictScope = fmt.Errorf("%w: scope conflict", errInstConflict) + errInstConflictDesc = fmt.Errorf("%w: description conflict", errInstConflict) + errInstConflictAgg = fmt.Errorf("%w: data type conflict", errInstConflict) + errInstConflictUnit = fmt.Errorf("%w: unit conflict", errInstConflict) + errInstConflictNumber = fmt.Errorf("%w: number type conflict", errInstConflict) +) // cache is a locking storage used to quickly return already computed values. // @@ -50,3 +67,81 @@ func (c *cache[K, V]) Lookup(key K, f func() V) V { c.data[key] = val return val } + +// aggCache is a cache for instrument Aggregators. +type aggCache[N int64 | float64] struct { + cache *cache[string, any] +} + +// newAggCache returns a new aggCache that uses c as the underlying cache. If c +// is nil, a new empty cache will be used. +func newAggCache[N int64 | float64](c *cache[string, any]) aggCache[N] { + if c == nil { + c = &cache[string, any]{} + } + return aggCache[N]{cache: c} +} + +// Lookup returns the Aggregator and error for a cached instrument if it exist +// in the cache. Otherwise, f is called and its returned value is set in the +// cache and returned. +// +// If an instrument has been stored in the cache for a different N, an error is +// returned describing the conflict with a nil Aggregator. +// +// If an instrument has been stored in the cache with a different description, +// scope, aggregation data type, or unit, an error is returned describing the +// conflict along with the originally stored Aggregator. +// +// Lookup is safe to call concurrently. +func (c aggCache[N]) Lookup(inst view.Instrument, u unit.Unit, f func() (internal.Aggregator[N], error)) (agg internal.Aggregator[N], err error) { + vAny := c.cache.Lookup(inst.Name, func() any { + a, err := f() + return aggVal[N]{ + Instrument: inst, + Unit: u, + Aggregator: a, + Err: err, + } + }) + + switch v := vAny.(type) { + case aggVal[N]: + agg = v.Aggregator + err = v.conflict(inst, u) + if err == nil { + err = v.Err + } + default: + err = errInstConflictNumber + } + return agg, err +} + +// aggVal is the cached value of an aggCache. +type aggVal[N int64 | float64] struct { + view.Instrument + Unit unit.Unit + Aggregator internal.Aggregator[N] + Err error +} + +// conflict returns an error describing any conflict the inst and u have with +// v. If both describe the same instrument, and are compatible, nil is +// returned. +func (v aggVal[N]) conflict(inst view.Instrument, u unit.Unit) error { + // Assume name is already equal based on the cache lookup. + switch false { + case v.Scope == inst.Scope: + return errInstConflictScope + case v.Description == inst.Description: + return errInstConflictDesc + case v.Unit == u: + return errInstConflictUnit + // TODO: Enable Aggregation comparison according to the identifying + // properties of the metric data-model. + //case i.Aggregation == inst.Aggregation: + // return errInstConflictAgg + } + return nil +} diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index ef99d392323..66dbcce465f 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -83,9 +83,9 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter { type meter struct { instrumentation.Scope - pipes pipelines - resolverCache cache[instrumentID, any] - insterterCache cache[string, any] + pipes pipelines + resolverCache cache[instrumentID, any] + aggCache cache[string, any] } // Compile-time check meter implements metric.Meter. @@ -93,14 +93,14 @@ var _ metric.Meter = (*meter)(nil) // AsyncInt64 returns the asynchronous integer instrument provider. func (m *meter) AsyncInt64() asyncint64.InstrumentProvider { - ic := newInserterCache[int64](&m.insterterCache) + ic := newAggCache[int64](&m.aggCache) rc := newResolverCache[int64](&m.resolverCache) return asyncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, rc, ic)} } // AsyncFloat64 returns the asynchronous floating-point instrument provider. func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { - ic := newInserterCache[float64](&m.insterterCache) + ic := newAggCache[float64](&m.aggCache) rc := newResolverCache[float64](&m.resolverCache) return asyncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, rc, ic)} } @@ -114,14 +114,14 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context // SyncInt64 returns the synchronous integer instrument provider. func (m *meter) SyncInt64() syncint64.InstrumentProvider { - ic := newInserterCache[int64](&m.insterterCache) + ic := newAggCache[int64](&m.aggCache) rn := newResolverCache[int64](&m.resolverCache) return syncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, rn, ic)} } // SyncFloat64 returns the synchronous floating-point instrument provider. func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { - ic := newInserterCache[float64](&m.insterterCache) + ic := newAggCache[float64](&m.aggCache) rn := newResolverCache[float64](&m.resolverCache) return syncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, rn, ic)} } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 4ffdae069ed..8cc825839e1 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -34,8 +34,6 @@ var ( errCreatingAggregators = errors.New("could not create all aggregators") errIncompatibleAggregation = errors.New("incompatible aggregation") errUnknownAggregation = errors.New("unrecognized aggregation") - - errCacheNumberConflict = errors.New("instrument already exists: conflicting number type") ) type aggregator interface { @@ -178,68 +176,13 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err }, nil } -type insterterCache[N int64 | float64] struct { - cache *cache[string, any] -} - -func newInserterCache[N int64 | float64](c *cache[string, any]) insterterCache[N] { - if c == nil { - c = &cache[string, any]{} - } - return insterterCache[N]{cache: c} -} - -type instrumentAggregator[N int64 | float64] struct { - inst view.Instrument - unit unit.Unit - aggregator internal.Aggregator[N] - err error -} - -var errConflict = errors.New("instrument already exists") - -func (i instrumentAggregator[N]) conflict(inst view.Instrument, u unit.Unit) error { - if i.inst.Name != inst.Name || - i.inst.Description != inst.Description || - i.inst.Scope != inst.Scope || - //i.inst.Aggregation != inst.Aggregation || // FIXME: make this work. - i.unit != u { - return errConflict - } - return nil -} - -func (c insterterCache[N]) Lookup(inst view.Instrument, u unit.Unit, f func() (internal.Aggregator[N], error)) (aggs internal.Aggregator[N], err error) { - vAny := c.cache.Lookup(inst.Name, func() any { - a, err := f() - return instrumentAggregator[N]{ - inst: inst, - unit: u, - aggregator: a, - err: err, - } - }) - - switch v := vAny.(type) { - case instrumentAggregator[N]: - aggs = v.aggregator - err = v.conflict(inst, u) - if err == nil { - err = v.err - } - default: - err = errCacheNumberConflict - } - return aggs, err -} - // inserter facilitates inserting of new instruments into a pipeline. type inserter[N int64 | float64] struct { - cache insterterCache[N] + cache aggCache[N] pipeline *pipeline } -func newInserter[N int64 | float64](p *pipeline, c insterterCache[N]) *inserter[N] { +func newInserter[N int64 | float64](p *pipeline, c aggCache[N]) *inserter[N] { return &inserter[N]{cache: c, pipeline: p} } @@ -411,7 +354,7 @@ type resolver[N int64 | float64] struct { inserters []*inserter[N] } -func newResolver[N int64 | float64](p pipelines, rc resolverCache[N], ic insterterCache[N]) *resolver[N] { +func newResolver[N int64 | float64](p pipelines, rc resolverCache[N], ic aggCache[N]) *resolver[N] { in := make([]*inserter[N], len(p)) for i := range in { in[i] = newInserter[N](p[i], ic) @@ -480,7 +423,7 @@ func (c resolverCache[N]) Lookup(key instrumentID, f func() ([]internal.Aggregat case *resolvedAggregators[N]: aggs, err = v.aggregators, v.err default: - err = errCacheNumberConflict + err = errInstConflictNumber } return aggs, err } diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index 27869545c05..dc7bd55026c 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -211,7 +211,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { } for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { - i := newInserter[N](newPipeline(nil, tt.reader, tt.views), newInserterCache[N](nil)) + i := newInserter[N](newPipeline(nil, tt.reader, tt.views), newAggCache[N](nil)) got, err := i.Instrument(tt.inst, unit.Dimensionless) assert.ErrorIs(t, err, tt.wantErr) require.Len(t, got, tt.wantLen) @@ -223,7 +223,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { } func testInvalidInstrumentShouldPanic[N int64 | float64]() { - i := newInserter[N](newPipeline(nil, NewManualReader(), []view.View{{}}), newInserterCache[N](nil)) + i := newInserter[N](newPipeline(nil, NewManualReader(), []view.View{{}}), newAggCache[N](nil)) inst := view.Instrument{ Name: "foo", Kind: view.InstrumentKind(255), @@ -334,7 +334,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) { inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} - r := newResolver(p, newResolverCache[int64](nil), newInserterCache[int64](nil)) + r := newResolver(p, newResolverCache[int64](nil), newAggCache[int64](nil)) aggs, err := r.Aggregators(inst, unit.Dimensionless) assert.NoError(t, err) @@ -344,7 +344,7 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCo func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) { inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} - r := newResolver(p, newResolverCache[float64](nil), newInserterCache[float64](nil)) + r := newResolver(p, newResolverCache[float64](nil), newAggCache[float64](nil)) aggs, err := r.Aggregators(inst, unit.Dimensionless) assert.NoError(t, err) @@ -375,14 +375,14 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { p := newPipelines(resource.Empty(), views) inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge} - ri := newResolver(p, newResolverCache[int64](nil), newInserterCache[int64](nil)) + ri := newResolver(p, newResolverCache[int64](nil), newAggCache[int64](nil)) intAggs, err := ri.Aggregators(inst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, intAggs, 0) p = newPipelines(resource.Empty(), views) - rf := newResolver(p, newResolverCache[float64](nil), newInserterCache[float64](nil)) + rf := newResolver(p, newResolverCache[float64](nil), newAggCache[float64](nil)) floatAggs, err := rf.Aggregators(inst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, floatAggs, 0) @@ -405,7 +405,7 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { p := newPipelines(resource.Empty(), views) - ri := newResolver(p, newResolverCache[int64](nil), newInserterCache[int64](nil)) + ri := newResolver(p, newResolverCache[int64](nil), newAggCache[int64](nil)) intAggs, err := ri.Aggregators(fooInst, unit.Dimensionless) assert.NoError(t, err) assert.Len(t, intAggs, 1) @@ -416,7 +416,7 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { assert.Len(t, intAggs, 2) // Creating a float foo instrument should error because there is an int foo instrument. - rf := newResolver(p, newResolverCache[float64](nil), newInserterCache[float64](nil)) + rf := newResolver(p, newResolverCache[float64](nil), newAggCache[float64](nil)) floatAggs, err := rf.Aggregators(fooInst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, floatAggs, 1) diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index 582ea91fce3..d626c1fcd15 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -236,6 +236,6 @@ func TestInstrumentCacheNumberConflict(t *testing.T) { internal.NewCumulativeSum[float64](true), }, nil }) - assert.ErrorIs(t, err, errCacheNumberConflict) + assert.ErrorIs(t, err, errInstConflictNumber) assert.Nil(t, gotF, "cache conflict should not return a value") } From 16e8c0af58dcdcd2e07fb77d8d265c444058c4a6 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 29 Sep 2022 09:40:33 -0700 Subject: [PATCH 21/22] Remove resolverCache --- sdk/metric/cache_test.go | 30 +++++++++++ sdk/metric/meter.go | 25 ++++------ sdk/metric/pipeline.go | 74 +++++----------------------- sdk/metric/pipeline_registry_test.go | 12 ++--- sdk/metric/pipeline_test.go | 28 ----------- 5 files changed, 57 insertions(+), 112 deletions(-) diff --git a/sdk/metric/cache_test.go b/sdk/metric/cache_test.go index 4a27b4256d1..f44e86e2ad7 100644 --- a/sdk/metric/cache_test.go +++ b/sdk/metric/cache_test.go @@ -19,6 +19,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/metric/unit" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/internal" + "go.opentelemetry.io/otel/sdk/metric/view" ) func TestCache(t *testing.T) { @@ -37,3 +41,29 @@ func TestCache(t *testing.T) { assert.Equal(t, v1, c.Lookup(k1, func() int { return v1 }), "non-existing key") } + +func TestAggCacheNumberConflict(t *testing.T) { + c := cache[string, any]{} + + inst := view.Instrument{ + Scope: instrumentation.Scope{Name: "scope name"}, + Name: "name", + Description: "description", + } + u := unit.Dimensionless + aggs := internal.NewCumulativeSum[int64](true) + + instCachI := newAggCache[int64](&c) + gotI, err := instCachI.Lookup(inst, u, func() (internal.Aggregator[int64], error) { + return aggs, nil + }) + require.NoError(t, err) + require.Equal(t, aggs, gotI) + + instCachF := newAggCache[float64](&c) + gotF, err := instCachF.Lookup(inst, u, func() (internal.Aggregator[float64], error) { + return internal.NewCumulativeSum[float64](true), nil + }) + assert.ErrorIs(t, err, errInstConflictNumber) + assert.Nil(t, gotF, "cache conflict should not return a value") +} diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 66dbcce465f..b1b072f9d8b 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -83,9 +83,8 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter { type meter struct { instrumentation.Scope - pipes pipelines - resolverCache cache[instrumentID, any] - aggCache cache[string, any] + pipes pipelines + aggCache cache[string, any] } // Compile-time check meter implements metric.Meter. @@ -93,16 +92,14 @@ var _ metric.Meter = (*meter)(nil) // AsyncInt64 returns the asynchronous integer instrument provider. func (m *meter) AsyncInt64() asyncint64.InstrumentProvider { - ic := newAggCache[int64](&m.aggCache) - rc := newResolverCache[int64](&m.resolverCache) - return asyncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, rc, ic)} + c := newAggCache[int64](&m.aggCache) + return asyncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} } // AsyncFloat64 returns the asynchronous floating-point instrument provider. func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { - ic := newAggCache[float64](&m.aggCache) - rc := newResolverCache[float64](&m.resolverCache) - return asyncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, rc, ic)} + c := newAggCache[float64](&m.aggCache) + return asyncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} } // RegisterCallback registers the function f to be called when any of the @@ -114,14 +111,12 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context // SyncInt64 returns the synchronous integer instrument provider. func (m *meter) SyncInt64() syncint64.InstrumentProvider { - ic := newAggCache[int64](&m.aggCache) - rn := newResolverCache[int64](&m.resolverCache) - return syncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, rn, ic)} + c := newAggCache[int64](&m.aggCache) + return syncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} } // SyncFloat64 returns the synchronous floating-point instrument provider. func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { - ic := newAggCache[float64](&m.aggCache) - rn := newResolverCache[float64](&m.resolverCache) - return syncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, rn, ic)} + c := newAggCache[float64](&m.aggCache) + return syncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 8cc825839e1..e4215bb0427 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -350,82 +350,30 @@ func (p pipelines) registerCallback(fn func(context.Context)) { // measurements with while updating all pipelines that need to pull from those // aggregations. type resolver[N int64 | float64] struct { - cache resolverCache[N] inserters []*inserter[N] } -func newResolver[N int64 | float64](p pipelines, rc resolverCache[N], ic aggCache[N]) *resolver[N] { +func newResolver[N int64 | float64](p pipelines, c aggCache[N]) *resolver[N] { in := make([]*inserter[N], len(p)) for i := range in { - in[i] = newInserter[N](p[i], ic) + in[i] = newInserter[N](p[i], c) } - return &resolver[N]{cache: rc, inserters: in} + return &resolver[N]{inserters: in} } // Aggregators returns the Aggregators instrument inst needs to update when it // makes a measurement. func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { - id := instrumentID{ - scope: inst.Scope, - name: inst.Name, - description: inst.Description, - } - - return r.cache.Lookup(id, func() ([]internal.Aggregator[N], error) { - var aggs []internal.Aggregator[N] - errs := &multierror{} - for _, i := range r.inserters { - a, err := i.Instrument(inst, instUnit) - if err != nil { - errs.append(err) - } - aggs = append(aggs, a...) - } - return aggs, errs.errorOrNil() - }) -} - -// resolvedAggregators is the result of resolving aggregators for an instrument. -type resolvedAggregators[N int64 | float64] struct { - aggregators []internal.Aggregator[N] - err error -} - -type resolverCache[N int64 | float64] struct { - cache *cache[instrumentID, any] -} - -func newResolverCache[N int64 | float64](c *cache[instrumentID, any]) resolverCache[N] { - if c == nil { - c = &cache[instrumentID, any]{} - } - return resolverCache[N]{cache: c} -} - -// Lookup returns the Aggregators and error for a cached instrumentID if they -// exist in the cache. Otherwise, f is called and its returned values are set -// in the cache and returned. -// -// If an instrumentID has been stored in the cache for a different N, an error -// is returned describing the conflict. -// -// Lookup is safe to call concurrently. -func (c resolverCache[N]) Lookup(key instrumentID, f func() ([]internal.Aggregator[N], error)) (aggs []internal.Aggregator[N], err error) { - vAny := c.cache.Lookup(key, func() any { - a, err := f() - return &resolvedAggregators[N]{ - aggregators: a, - err: err, + var aggs []internal.Aggregator[N] + errs := &multierror{} + for _, i := range r.inserters { + a, err := i.Instrument(inst, instUnit) + if err != nil { + errs.append(err) } - }) - - switch v := vAny.(type) { - case *resolvedAggregators[N]: - aggs, err = v.aggregators, v.err - default: - err = errInstConflictNumber + aggs = append(aggs, a...) } - return aggs, err + return aggs, errs.errorOrNil() } type multierror struct { diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index dc7bd55026c..cec26a247b3 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -334,7 +334,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) { inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} - r := newResolver(p, newResolverCache[int64](nil), newAggCache[int64](nil)) + r := newResolver(p, newAggCache[int64](nil)) aggs, err := r.Aggregators(inst, unit.Dimensionless) assert.NoError(t, err) @@ -344,7 +344,7 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCo func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) { inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} - r := newResolver(p, newResolverCache[float64](nil), newAggCache[float64](nil)) + r := newResolver(p, newAggCache[float64](nil)) aggs, err := r.Aggregators(inst, unit.Dimensionless) assert.NoError(t, err) @@ -375,14 +375,14 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { p := newPipelines(resource.Empty(), views) inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge} - ri := newResolver(p, newResolverCache[int64](nil), newAggCache[int64](nil)) + ri := newResolver(p, newAggCache[int64](nil)) intAggs, err := ri.Aggregators(inst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, intAggs, 0) p = newPipelines(resource.Empty(), views) - rf := newResolver(p, newResolverCache[float64](nil), newAggCache[float64](nil)) + rf := newResolver(p, newAggCache[float64](nil)) floatAggs, err := rf.Aggregators(inst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, floatAggs, 0) @@ -405,7 +405,7 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { p := newPipelines(resource.Empty(), views) - ri := newResolver(p, newResolverCache[int64](nil), newAggCache[int64](nil)) + ri := newResolver(p, newAggCache[int64](nil)) intAggs, err := ri.Aggregators(fooInst, unit.Dimensionless) assert.NoError(t, err) assert.Len(t, intAggs, 1) @@ -416,7 +416,7 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { assert.Len(t, intAggs, 2) // Creating a float foo instrument should error because there is an int foo instrument. - rf := newResolver(p, newResolverCache[float64](nil), newAggCache[float64](nil)) + rf := newResolver(p, newAggCache[float64](nil)) floatAggs, err := rf.Aggregators(fooInst, unit.Dimensionless) assert.Error(t, err) assert.Len(t, floatAggs, 1) diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index d626c1fcd15..ca83c9c3a9e 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -25,7 +25,6 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/instrumentation" - "go.opentelemetry.io/otel/sdk/metric/internal" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" ) @@ -212,30 +211,3 @@ func TestPipelineConcurrency(t *testing.T) { } wg.Wait() } - -func TestInstrumentCacheNumberConflict(t *testing.T) { - c := cache[instrumentID, any]{} - - key := instrumentID{ - scope: instrumentation.Scope{Name: "scope name"}, - name: "name", - description: "description", - } - aggs := []internal.Aggregator[int64]{internal.NewCumulativeSum[int64](true)} - - instCachI := newResolverCache[int64](&c) - gotI, err := instCachI.Lookup(key, func() ([]internal.Aggregator[int64], error) { - return aggs, nil - }) - require.NoError(t, err) - require.Equal(t, aggs, gotI) - - instCachF := newResolverCache[float64](&c) - gotF, err := instCachF.Lookup(key, func() ([]internal.Aggregator[float64], error) { - return []internal.Aggregator[float64]{ - internal.NewCumulativeSum[float64](true), - }, nil - }) - assert.ErrorIs(t, err, errInstConflictNumber) - assert.Nil(t, gotF, "cache conflict should not return a value") -} From 0d925dcfc651811318155a16bcc59c615ff119cf Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 29 Sep 2022 14:16:12 -0700 Subject: [PATCH 22/22] Cache meters and instruments --- sdk/metric/instrument_provider.go | 267 +++++++++------------------ sdk/metric/meter.go | 109 ++++------- sdk/metric/meter_test.go | 22 --- sdk/metric/pipeline.go | 19 +- sdk/metric/pipeline_registry_test.go | 66 ++++--- sdk/metric/provider.go | 22 +-- 6 files changed, 178 insertions(+), 327 deletions(-) diff --git a/sdk/metric/instrument_provider.go b/sdk/metric/instrument_provider.go index 8640b58e52e..ff8fc49f893 100644 --- a/sdk/metric/instrument_provider.go +++ b/sdk/metric/instrument_provider.go @@ -22,251 +22,164 @@ import ( "go.opentelemetry.io/otel/metric/instrument/asyncint64" "go.opentelemetry.io/otel/metric/instrument/syncfloat64" "go.opentelemetry.io/otel/metric/instrument/syncint64" + "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/view" ) +// instProviderKey uniquely describe a instrument from a providers perspective. +type instProviderKey struct { + // Name is the name of the instrument. + Name string + // Description is the description of the instrument. + Description string + // Unit is the unit of the instrument. + Unit unit.Unit + // Kind is the instrument Kind provided. + Kind view.InstrumentKind +} + +func (k instProviderKey) viewInstrument(s instrumentation.Scope) view.Instrument { + return view.Instrument{ + Scope: s, + Name: k.Name, + Description: k.Description, + Kind: k.Kind, + } +} + +type instProvider[N int64 | float64] struct { + // cache ensures duplicate requests for the same instrument from this + // provider are returned quickly, without re-resolving Aggregators. + cache cache[instProviderKey, instVal[N]] + resolve *resolver[N] +} + +func newInstProvider[N int64 | float64](s instrumentation.Scope, p pipelines, c *cache[string, any]) *instProvider[N] { + ac := newAggCache[N](c) + return &instProvider[N]{resolve: newResolver[N](s, p, ac)} +} + +// lookup returns the instrumentImpl stored in the instProvider's cache for the +// provided arguments if it exists. Otherwise, it resolves a new instrument and +// sets it in the cache before returning it. +func (p *instProvider[N]) lookup(kind view.InstrumentKind, name string, opts []instrument.Option) (*instrumentImpl[N], error) { + cfg := instrument.NewConfig(opts...) + key := instProviderKey{Name: name, Description: cfg.Description(), Unit: cfg.Unit(), Kind: kind} + v := p.cache.Lookup(key, func() instVal[N] { + aggs, err := p.resolve.Aggregators(key) + if len(aggs) == 0 && err != nil { + err = fmt.Errorf("instrument does not match any view: %w", err) + } + return instVal[N]{ + impl: &instrumentImpl[N]{aggregators: aggs}, + err: err, + } + }) + return v.impl, v.err +} + +type instVal[N int64 | float64] struct { + impl *instrumentImpl[N] + err error +} + type asyncInt64Provider struct { - scope instrumentation.Scope - resolve *resolver[int64] + *instProvider[int64] +} + +func newAsyncInt64Provider(s instrumentation.Scope, p pipelines, c *cache[string, any]) asyncInt64Provider { + return asyncInt64Provider{newInstProvider[int64](s, p, c)} } var _ asyncint64.InstrumentProvider = asyncInt64Provider{} // Counter creates an instrument for recording increasing values. func (p asyncInt64Provider) Counter(name string, opts ...instrument.Option) (asyncint64.Counter, error) { - cfg := instrument.NewConfig(opts...) - - aggs, err := p.resolve.Aggregators(view.Instrument{ - Scope: p.scope, - Name: name, - Description: cfg.Description(), - Kind: view.AsyncCounter, - }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - - return &instrumentImpl[int64]{ - aggregators: aggs, - }, err + return p.lookup(view.AsyncCounter, name, opts) } // UpDownCounter creates an instrument for recording changes of a value. func (p asyncInt64Provider) UpDownCounter(name string, opts ...instrument.Option) (asyncint64.UpDownCounter, error) { - cfg := instrument.NewConfig(opts...) - - aggs, err := p.resolve.Aggregators(view.Instrument{ - Scope: p.scope, - Name: name, - Description: cfg.Description(), - Kind: view.AsyncUpDownCounter, - }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - return &instrumentImpl[int64]{ - aggregators: aggs, - }, err + return p.lookup(view.AsyncUpDownCounter, name, opts) } // Gauge creates an instrument for recording the current value. func (p asyncInt64Provider) Gauge(name string, opts ...instrument.Option) (asyncint64.Gauge, error) { - cfg := instrument.NewConfig(opts...) - - aggs, err := p.resolve.Aggregators(view.Instrument{ - Scope: p.scope, - Name: name, - Description: cfg.Description(), - Kind: view.AsyncGauge, - }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - return &instrumentImpl[int64]{ - aggregators: aggs, - }, err + return p.lookup(view.AsyncGauge, name, opts) } type asyncFloat64Provider struct { - scope instrumentation.Scope - resolve *resolver[float64] + *instProvider[float64] +} + +func newAsyncFloat64Provider(s instrumentation.Scope, p pipelines, c *cache[string, any]) asyncFloat64Provider { + return asyncFloat64Provider{newInstProvider[float64](s, p, c)} } var _ asyncfloat64.InstrumentProvider = asyncFloat64Provider{} // Counter creates an instrument for recording increasing values. func (p asyncFloat64Provider) Counter(name string, opts ...instrument.Option) (asyncfloat64.Counter, error) { - cfg := instrument.NewConfig(opts...) - - aggs, err := p.resolve.Aggregators(view.Instrument{ - Scope: p.scope, - Name: name, - Description: cfg.Description(), - Kind: view.AsyncCounter, - }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - return &instrumentImpl[float64]{ - aggregators: aggs, - }, err + return p.lookup(view.AsyncCounter, name, opts) } // UpDownCounter creates an instrument for recording changes of a value. func (p asyncFloat64Provider) UpDownCounter(name string, opts ...instrument.Option) (asyncfloat64.UpDownCounter, error) { - cfg := instrument.NewConfig(opts...) - - aggs, err := p.resolve.Aggregators(view.Instrument{ - Scope: p.scope, - Name: name, - Description: cfg.Description(), - Kind: view.AsyncUpDownCounter, - }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - return &instrumentImpl[float64]{ - aggregators: aggs, - }, err + return p.lookup(view.AsyncUpDownCounter, name, opts) } // Gauge creates an instrument for recording the current value. func (p asyncFloat64Provider) Gauge(name string, opts ...instrument.Option) (asyncfloat64.Gauge, error) { - cfg := instrument.NewConfig(opts...) - - aggs, err := p.resolve.Aggregators(view.Instrument{ - Scope: p.scope, - Name: name, - Description: cfg.Description(), - Kind: view.AsyncGauge, - }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - return &instrumentImpl[float64]{ - aggregators: aggs, - }, err + return p.lookup(view.AsyncGauge, name, opts) } type syncInt64Provider struct { - scope instrumentation.Scope - resolve *resolver[int64] + *instProvider[int64] +} + +func newSyncInt64Provider(s instrumentation.Scope, p pipelines, c *cache[string, any]) syncInt64Provider { + return syncInt64Provider{newInstProvider[int64](s, p, c)} } var _ syncint64.InstrumentProvider = syncInt64Provider{} // Counter creates an instrument for recording increasing values. func (p syncInt64Provider) Counter(name string, opts ...instrument.Option) (syncint64.Counter, error) { - cfg := instrument.NewConfig(opts...) - - aggs, err := p.resolve.Aggregators(view.Instrument{ - Scope: p.scope, - Name: name, - Description: cfg.Description(), - Kind: view.SyncCounter, - }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - return &instrumentImpl[int64]{ - aggregators: aggs, - }, err + return p.lookup(view.SyncCounter, name, opts) } // UpDownCounter creates an instrument for recording changes of a value. func (p syncInt64Provider) UpDownCounter(name string, opts ...instrument.Option) (syncint64.UpDownCounter, error) { - cfg := instrument.NewConfig(opts...) - - aggs, err := p.resolve.Aggregators(view.Instrument{ - Scope: p.scope, - Name: name, - Description: cfg.Description(), - Kind: view.SyncUpDownCounter, - }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - return &instrumentImpl[int64]{ - aggregators: aggs, - }, err + return p.lookup(view.SyncUpDownCounter, name, opts) } // Histogram creates an instrument for recording the current value. func (p syncInt64Provider) Histogram(name string, opts ...instrument.Option) (syncint64.Histogram, error) { - cfg := instrument.NewConfig(opts...) - - aggs, err := p.resolve.Aggregators(view.Instrument{ - Scope: p.scope, - Name: name, - Description: cfg.Description(), - Kind: view.SyncHistogram, - }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - return &instrumentImpl[int64]{ - aggregators: aggs, - }, err + return p.lookup(view.SyncHistogram, name, opts) } type syncFloat64Provider struct { - scope instrumentation.Scope - resolve *resolver[float64] + *instProvider[float64] +} + +func newSyncFloat64Provider(s instrumentation.Scope, p pipelines, c *cache[string, any]) syncFloat64Provider { + return syncFloat64Provider{newInstProvider[float64](s, p, c)} } var _ syncfloat64.InstrumentProvider = syncFloat64Provider{} // Counter creates an instrument for recording increasing values. func (p syncFloat64Provider) Counter(name string, opts ...instrument.Option) (syncfloat64.Counter, error) { - cfg := instrument.NewConfig(opts...) - - aggs, err := p.resolve.Aggregators(view.Instrument{ - Scope: p.scope, - Name: name, - Description: cfg.Description(), - Kind: view.SyncCounter, - }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - return &instrumentImpl[float64]{ - aggregators: aggs, - }, err + return p.lookup(view.SyncCounter, name, opts) } // UpDownCounter creates an instrument for recording changes of a value. func (p syncFloat64Provider) UpDownCounter(name string, opts ...instrument.Option) (syncfloat64.UpDownCounter, error) { - cfg := instrument.NewConfig(opts...) - - aggs, err := p.resolve.Aggregators(view.Instrument{ - Scope: p.scope, - Name: name, - Description: cfg.Description(), - Kind: view.SyncUpDownCounter, - }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - return &instrumentImpl[float64]{ - aggregators: aggs, - }, err + return p.lookup(view.SyncUpDownCounter, name, opts) } // Histogram creates an instrument for recording the current value. func (p syncFloat64Provider) Histogram(name string, opts ...instrument.Option) (syncfloat64.Histogram, error) { - cfg := instrument.NewConfig(opts...) - - aggs, err := p.resolve.Aggregators(view.Instrument{ - Scope: p.scope, - Name: name, - Description: cfg.Description(), - Kind: view.SyncHistogram, - }, cfg.Unit()) - if len(aggs) == 0 && err != nil { - err = fmt.Errorf("instrument does not match any view: %w", err) - } - return &instrumentImpl[float64]{ - aggregators: aggs, - }, err + return p.lookup(view.SyncHistogram, name, opts) } diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index b1b072f9d8b..71a52787805 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -16,7 +16,6 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" - "sync" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/instrument" @@ -27,96 +26,56 @@ import ( "go.opentelemetry.io/otel/sdk/instrumentation" ) -// meterRegistry keeps a record of initialized meters for instrumentation -// scopes. A meter is unique to an instrumentation scope and if multiple -// requests for that meter are made a meterRegistry ensure the same instance -// is used. -// -// The zero meterRegistry is empty and ready for use. -// -// A meterRegistry must not be copied after first use. -// -// All methods of a meterRegistry are safe to call concurrently. -type meterRegistry struct { - sync.Mutex - - meters map[instrumentation.Scope]*meter - - pipes pipelines -} - -// Get returns a registered meter matching the instrumentation scope if it -// exists in the meterRegistry. Otherwise, a new meter configured for the -// instrumentation scope is registered and then returned. -// -// Get is safe to call concurrently. -func (r *meterRegistry) Get(s instrumentation.Scope) *meter { - r.Lock() - defer r.Unlock() - - if r.meters == nil { - m := &meter{ - Scope: s, - pipes: r.pipes, - } - r.meters = map[instrumentation.Scope]*meter{s: m} - return m - } - - m, ok := r.meters[s] - if ok { - return m - } - - m = &meter{ - Scope: s, - pipes: r.pipes, - } - r.meters[s] = m - return m -} - // meter handles the creation and coordination of all metric instruments. A // meter represents a single instrumentation scope; all metric telemetry // produced by an instrumentation scope will use metric instruments from a // single meter. type meter struct { - instrumentation.Scope + // cache ensures no duplicate Aggregators are created from the same + // instrument within the scope of all instruments this meter owns. + // + // If a user requests the exact same instrument twice, a providers cache + // should handle the request with its stored value. However, if a new + // instrument is create that has a view transform it into an instrument + // that was already created, that previous instrument's Aggregator needs to + // be returned if it is compatible. + cache cache[string, any] + + pipes pipelines + + asyncInt64Provider asyncInt64Provider + asyncFloat64Provider asyncFloat64Provider + syncInt64Provider syncInt64Provider + syncFloat64Provider syncFloat64Provider +} - pipes pipelines - aggCache cache[string, any] +func newMeter(s instrumentation.Scope, p pipelines) *meter { + m := &meter{pipes: p} + m.asyncInt64Provider = newAsyncInt64Provider(s, p, &m.cache) + m.asyncFloat64Provider = newAsyncFloat64Provider(s, p, &m.cache) + m.syncInt64Provider = newSyncInt64Provider(s, p, &m.cache) + m.syncFloat64Provider = newSyncFloat64Provider(s, p, &m.cache) + return m } // Compile-time check meter implements metric.Meter. var _ metric.Meter = (*meter)(nil) // AsyncInt64 returns the asynchronous integer instrument provider. -func (m *meter) AsyncInt64() asyncint64.InstrumentProvider { - c := newAggCache[int64](&m.aggCache) - return asyncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} -} +func (m *meter) AsyncInt64() asyncint64.InstrumentProvider { return m.asyncInt64Provider } // AsyncFloat64 returns the asynchronous floating-point instrument provider. -func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { - c := newAggCache[float64](&m.aggCache) - return asyncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} -} - -// RegisterCallback registers the function f to be called when any of the -// insts Collect method is called. -func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) error { - m.pipes.registerCallback(f) - return nil -} +func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { return m.asyncFloat64Provider } // SyncInt64 returns the synchronous integer instrument provider. -func (m *meter) SyncInt64() syncint64.InstrumentProvider { - c := newAggCache[int64](&m.aggCache) - return syncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} -} +func (m *meter) SyncInt64() syncint64.InstrumentProvider { return m.syncInt64Provider } // SyncFloat64 returns the synchronous floating-point instrument provider. -func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { - c := newAggCache[float64](&m.aggCache) - return syncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)} +func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { return m.syncFloat64Provider } + +// RegisterCallback registers the function f to be called when any of the insts +// Collect method is called. +func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) error { + m.pipes.registerCallback(f) + return nil } diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 7d6923fdd58..6e6af8f5102 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -31,28 +31,6 @@ import ( "go.opentelemetry.io/otel/sdk/resource" ) -func TestMeterRegistry(t *testing.T) { - is0 := instrumentation.Scope{Name: "zero"} - is1 := instrumentation.Scope{Name: "one"} - - r := meterRegistry{} - var m0 *meter - t.Run("ZeroValueGetDoesNotPanic", func(t *testing.T) { - assert.NotPanics(t, func() { m0 = r.Get(is0) }) - assert.Equal(t, is0, m0.Scope, "uninitialized meter returned") - }) - - m01 := r.Get(is0) - t.Run("GetSameMeter", func(t *testing.T) { - assert.Samef(t, m0, m01, "returned different meters: %v", is0) - }) - - m1 := r.Get(is1) - t.Run("GetDifferentMeter", func(t *testing.T) { - assert.NotSamef(t, m0, m1, "returned same meters: %v", is1) - }) -} - // A meter should be able to make instruments concurrently. func TestMeterInstrumentConcurrency(t *testing.T) { wg := &sync.WaitGroup{} diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index e4215bb0427..24b703c5cb2 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -178,28 +178,29 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err // inserter facilitates inserting of new instruments into a pipeline. type inserter[N int64 | float64] struct { + scope instrumentation.Scope cache aggCache[N] pipeline *pipeline } -func newInserter[N int64 | float64](p *pipeline, c aggCache[N]) *inserter[N] { - return &inserter[N]{cache: c, pipeline: p} +func newInserter[N int64 | float64](s instrumentation.Scope, p *pipeline, c aggCache[N]) *inserter[N] { + return &inserter[N]{scope: s, cache: c, pipeline: p} } // Instrument inserts instrument inst with instUnit returning the Aggregators // that need to be updated with measurments for that instrument. -func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { +func (i *inserter[N]) Instrument(key instProviderKey) ([]internal.Aggregator[N], error) { var aggs []internal.Aggregator[N] errs := &multierror{wrapped: errCreatingAggregators} // The cache will return the same Aggregator instance. Use this fact to // compare pointer addresses to deduplicate Aggregators. seen := make(map[internal.Aggregator[N]]struct{}) for _, v := range i.pipeline.views { - inst, match := v.TransformInstrument(inst) + inst, match := v.TransformInstrument(key.viewInstrument(i.scope)) if !match { continue } - agg, err := i.cachedAggregator(inst, instUnit) + agg, err := i.cachedAggregator(inst, key.Unit) if err != nil { errs.append(err) } @@ -353,21 +354,21 @@ type resolver[N int64 | float64] struct { inserters []*inserter[N] } -func newResolver[N int64 | float64](p pipelines, c aggCache[N]) *resolver[N] { +func newResolver[N int64 | float64](s instrumentation.Scope, p pipelines, c aggCache[N]) *resolver[N] { in := make([]*inserter[N], len(p)) for i := range in { - in[i] = newInserter[N](p[i], c) + in[i] = newInserter[N](s, p[i], c) } return &resolver[N]{inserters: in} } // Aggregators returns the Aggregators instrument inst needs to update when it // makes a measurement. -func (r *resolver[N]) Aggregators(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { +func (r *resolver[N]) Aggregators(key instProviderKey) ([]internal.Aggregator[N], error) { var aggs []internal.Aggregator[N] errs := &multierror{} for _, i := range r.inserters { - a, err := i.Instrument(inst, instUnit) + a, err := i.Instrument(key) if err != nil { errs.append(err) } diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index cec26a247b3..e40f3c57f74 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -21,7 +21,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric/unit" + "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/internal" "go.opentelemetry.io/otel/sdk/metric/view" @@ -57,7 +57,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { view.WithSetAggregation(invalidAggregation{}), ) - instruments := []view.Instrument{ + instruments := []instProviderKey{ {Name: "foo", Kind: view.InstrumentKind(0)}, //Unknown kind {Name: "foo", Kind: view.SyncCounter}, {Name: "foo", Kind: view.SyncUpDownCounter}, @@ -71,7 +71,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { name string reader Reader views []view.View - inst view.Instrument + inst instProviderKey wantKind internal.Aggregator[N] //Aggregators should match len and types wantLen int wantErr error @@ -209,10 +209,12 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { wantErr: errCreatingAggregators, }, } + s := instrumentation.Scope{Name: "testCreateAggregators"} for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { - i := newInserter[N](newPipeline(nil, tt.reader, tt.views), newAggCache[N](nil)) - got, err := i.Instrument(tt.inst, unit.Dimensionless) + p := newPipeline(nil, tt.reader, tt.views) + i := newInserter[N](s, p, newAggCache[N](nil)) + got, err := i.Instrument(tt.inst) assert.ErrorIs(t, err, tt.wantErr) require.Len(t, got, tt.wantLen) for _, agg := range got { @@ -223,12 +225,14 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { } func testInvalidInstrumentShouldPanic[N int64 | float64]() { - i := newInserter[N](newPipeline(nil, NewManualReader(), []view.View{{}}), newAggCache[N](nil)) - inst := view.Instrument{ + s := instrumentation.Scope{Name: "testInvalidInstrumentShouldPanic"} + p := newPipeline(nil, NewManualReader(), []view.View{{}}) + i := newInserter[N](s, p, newAggCache[N](nil)) + inst := instProviderKey{ Name: "foo", Kind: view.InstrumentKind(255), } - _, _ = i.Instrument(inst, unit.Dimensionless) + _, _ = i.Instrument(inst) } func TestInvalidInstrumentShouldPanic(t *testing.T) { @@ -332,20 +336,22 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { } func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) { - inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} + inst := instProviderKey{Name: "foo", Kind: view.SyncCounter} - r := newResolver(p, newAggCache[int64](nil)) - aggs, err := r.Aggregators(inst, unit.Dimensionless) + s := instrumentation.Scope{Name: "testPipelineRegistryResolveIntAggregators"} + r := newResolver(s, p, newAggCache[int64](nil)) + aggs, err := r.Aggregators(inst) assert.NoError(t, err) require.Len(t, aggs, wantCount) } func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) { - inst := view.Instrument{Name: "foo", Kind: view.SyncCounter} + inst := instProviderKey{Name: "foo", Kind: view.SyncCounter} - r := newResolver(p, newAggCache[float64](nil)) - aggs, err := r.Aggregators(inst, unit.Dimensionless) + s := instrumentation.Scope{Name: "testPipelineRegistryResolveFloatAggregators"} + r := newResolver(s, p, newAggCache[float64](nil)) + aggs, err := r.Aggregators(inst) assert.NoError(t, err) require.Len(t, aggs, wantCount) @@ -373,17 +379,18 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { }, } p := newPipelines(resource.Empty(), views) - inst := view.Instrument{Name: "foo", Kind: view.AsyncGauge} + inst := instProviderKey{Name: "foo", Kind: view.AsyncGauge} - ri := newResolver(p, newAggCache[int64](nil)) - intAggs, err := ri.Aggregators(inst, unit.Dimensionless) + s := instrumentation.Scope{Name: "TestPipelineRegistryCreateAggregatorsIncompatibleInstrument"} + ri := newResolver(s, p, newAggCache[int64](nil)) + intAggs, err := ri.Aggregators(inst) assert.Error(t, err) assert.Len(t, intAggs, 0) p = newPipelines(resource.Empty(), views) - rf := newResolver(p, newAggCache[float64](nil)) - floatAggs, err := rf.Aggregators(inst, unit.Dimensionless) + rf := newResolver(s, p, newAggCache[float64](nil)) + floatAggs, err := rf.Aggregators(inst) assert.Error(t, err) assert.Len(t, floatAggs, 0) } @@ -400,33 +407,34 @@ func TestPipelineRegistryCreateAggregatorsDuplicateErrors(t *testing.T) { }, } - fooInst := view.Instrument{Name: "foo", Kind: view.SyncCounter} - barInst := view.Instrument{Name: "bar", Kind: view.SyncCounter} + fooInst := instProviderKey{Name: "foo", Kind: view.SyncCounter} + barInst := instProviderKey{Name: "bar", Kind: view.SyncCounter} p := newPipelines(resource.Empty(), views) - ri := newResolver(p, newAggCache[int64](nil)) - intAggs, err := ri.Aggregators(fooInst, unit.Dimensionless) + s := instrumentation.Scope{Name: "TestPipelineRegistryCreateAggregatorsDuplicateErrors"} + ri := newResolver(s, p, newAggCache[int64](nil)) + intAggs, err := ri.Aggregators(fooInst) assert.NoError(t, err) assert.Len(t, intAggs, 1) // The Rename view should error, because it creates a foo instrument. - intAggs, err = ri.Aggregators(barInst, unit.Dimensionless) + intAggs, err = ri.Aggregators(barInst) assert.NoError(t, err) assert.Len(t, intAggs, 2) // Creating a float foo instrument should error because there is an int foo instrument. - rf := newResolver(p, newAggCache[float64](nil)) - floatAggs, err := rf.Aggregators(fooInst, unit.Dimensionless) + rf := newResolver(s, p, newAggCache[float64](nil)) + floatAggs, err := rf.Aggregators(fooInst) assert.Error(t, err) assert.Len(t, floatAggs, 1) - fooInst = view.Instrument{Name: "foo-float", Kind: view.SyncCounter} + fooInst = instProviderKey{Name: "foo-float", Kind: view.SyncCounter} - _, err = rf.Aggregators(fooInst, unit.Dimensionless) + _, err = rf.Aggregators(fooInst) assert.NoError(t, err) - floatAggs, err = rf.Aggregators(barInst, unit.Dimensionless) + floatAggs, err = rf.Aggregators(barInst) assert.Error(t, err) assert.Len(t, floatAggs, 2) } diff --git a/sdk/metric/provider.go b/sdk/metric/provider.go index 0b13e67d92b..ce2e5524398 100644 --- a/sdk/metric/provider.go +++ b/sdk/metric/provider.go @@ -19,7 +19,6 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/instrumentation" - "go.opentelemetry.io/otel/sdk/resource" ) // MeterProvider handles the creation and coordination of Meters. All Meters @@ -27,9 +26,8 @@ import ( // the same Views applied to them, and have their produced metric telemetry // passed to the configured Readers. type MeterProvider struct { - res *resource.Resource - - meters meterRegistry + pipes pipelines + meters cache[instrumentation.Scope, *meter] forceFlush, shutdown func(context.Context) error } @@ -45,18 +43,9 @@ var _ metric.MeterProvider = (*MeterProvider)(nil) // Readers, will perform no operations. func NewMeterProvider(options ...Option) *MeterProvider { conf := newConfig(options) - flush, sdown := conf.readerSignals() - - registry := newPipelines(conf.res, conf.readers) - return &MeterProvider{ - res: conf.res, - - meters: meterRegistry{ - pipes: registry, - }, - + pipes: newPipelines(conf.res, conf.readers), forceFlush: flush, shutdown: sdown, } @@ -77,10 +66,13 @@ func NewMeterProvider(options ...Option) *MeterProvider { // This method is safe to call concurrently. func (mp *MeterProvider) Meter(name string, options ...metric.MeterOption) metric.Meter { c := metric.NewMeterConfig(options...) - return mp.meters.Get(instrumentation.Scope{ + s := instrumentation.Scope{ Name: name, Version: c.InstrumentationVersion(), SchemaURL: c.SchemaURL(), + } + return mp.meters.Lookup(s, func() *meter { + return newMeter(s, mp.pipes) }) }