Skip to content

Commit

Permalink
Introduce metric constructor errors, MeterMust wrapper (#529)
Browse files Browse the repository at this point in the history
* Update api for Must constructors, with SDK helpers

* Update for Must constructors, leaving TODOs about global errors

* Add tests

* Move Must methods into metric.Must

* Apply the feedback

* Remove interfaces

* Remove more interfaces

* Again...

* Remove a sentence about a dead inteface
  • Loading branch information
jmacd authored Mar 11, 2020
1 parent 288821c commit 4047c08
Show file tree
Hide file tree
Showing 20 changed files with 384 additions and 147 deletions.
6 changes: 4 additions & 2 deletions api/global/internal/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

var Must = metric.Must

// benchFixture is copied from sdk/metric/benchmark_test.go.
// TODO refactor to share this code.
type benchFixture struct {
Expand Down Expand Up @@ -72,7 +74,7 @@ func BenchmarkGlobalInt64CounterAddNoSDK(b *testing.B) {
ctx := context.Background()
sdk := global.Meter("test")
labs := sdk.Labels(key.String("A", "B"))
cnt := sdk.NewInt64Counter("int64.counter")
cnt := Must(sdk).NewInt64Counter("int64.counter")

b.ResetTimer()

Expand All @@ -91,7 +93,7 @@ func BenchmarkGlobalInt64CounterAddWithSDK(b *testing.B) {
global.SetMeterProvider(fix)

labs := sdk.Labels(key.String("A", "B"))
cnt := sdk.NewInt64Counter("int64.counter")
cnt := Must(sdk).NewInt64Counter("int64.counter")

b.ResetTimer()

Expand Down
85 changes: 66 additions & 19 deletions api/global/internal/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"context"
"errors"
"sync"
"sync/atomic"
"unsafe"
Expand Down Expand Up @@ -80,6 +81,10 @@ type obsImpl struct {
callback interface{}
}

type hasImpl interface {
Impl() metric.InstrumentImpl
}

type int64ObsImpl struct {
observer *obsImpl
}
Expand Down Expand Up @@ -122,6 +127,8 @@ var _ metric.Float64Observer = float64ObsImpl{}
var _ observerUnregister = (metric.Int64Observer)(nil)
var _ observerUnregister = (metric.Float64Observer)(nil)

var errInvalidMetricKind = errors.New("Invalid Metric kind")

// Provider interface and delegation

func (p *meterProvider) setDelegate(provider metric.Provider) {
Expand Down Expand Up @@ -174,7 +181,7 @@ func (m *meter) setDelegate(provider metric.Provider) {
m.orderedObservers = nil
}

func (m *meter) newInst(name string, mkind metricKind, nkind core.NumberKind, opts interface{}) metric.InstrumentImpl {
func (m *meter) newInst(name string, mkind metricKind, nkind core.NumberKind, opts interface{}) (metric.InstrumentImpl, error) {
m.lock.Lock()
defer m.lock.Unlock()

Expand All @@ -189,31 +196,50 @@ func (m *meter) newInst(name string, mkind metricKind, nkind core.NumberKind, op
opts: opts,
}
m.instruments = append(m.instruments, inst)
return inst
return inst, nil
}

func newInstDelegate(m metric.Meter, name string, mkind metricKind, nkind core.NumberKind, opts interface{}) metric.InstrumentImpl {
func delegateCheck(has hasImpl, err error) (metric.InstrumentImpl, error) {
if has != nil {
return has.Impl(), err
}
if err == nil {
err = metric.ErrSDKReturnedNilImpl
}
return nil, err
}

func newInstDelegate(m metric.Meter, name string, mkind metricKind, nkind core.NumberKind, opts interface{}) (metric.InstrumentImpl, error) {
switch mkind {
case counterKind:
if nkind == core.Int64NumberKind {
return m.NewInt64Counter(name, opts.([]metric.CounterOptionApplier)...).Impl()
return delegateCheck(m.NewInt64Counter(name, opts.([]metric.CounterOptionApplier)...))
}
return m.NewFloat64Counter(name, opts.([]metric.CounterOptionApplier)...).Impl()
return delegateCheck(m.NewFloat64Counter(name, opts.([]metric.CounterOptionApplier)...))
case measureKind:
if nkind == core.Int64NumberKind {
return m.NewInt64Measure(name, opts.([]metric.MeasureOptionApplier)...).Impl()
return delegateCheck(m.NewInt64Measure(name, opts.([]metric.MeasureOptionApplier)...))
}
return m.NewFloat64Measure(name, opts.([]metric.MeasureOptionApplier)...).Impl()
return delegateCheck(m.NewFloat64Measure(name, opts.([]metric.MeasureOptionApplier)...))
}
return nil
return nil, errInvalidMetricKind
}

// Instrument delegation

func (inst *instImpl) setDelegate(d metric.Meter) {
implPtr := new(metric.InstrumentImpl)

*implPtr = newInstDelegate(d, inst.name, inst.mkind, inst.nkind, inst.opts)
var err error
*implPtr, err = newInstDelegate(d, inst.name, inst.mkind, inst.nkind, inst.opts)

if err != nil {
// TODO: There is no standard way to deliver this error to the user.
// See https://github.com/open-telemetry/opentelemetry-go/issues/514
// Note that the default SDK will not generate any errors yet, this is
// only for added safety.
panic(err)
}

atomic.StorePointer(&inst.delegate, unsafe.Pointer(implPtr))
}
Expand Down Expand Up @@ -281,7 +307,18 @@ func (obs *obsImpl) getUnregister() observerUnregister {
func (obs *obsImpl) setInt64Delegate(d metric.Meter) {
obsPtr := new(metric.Int64Observer)
cb := obs.callback.(metric.Int64ObserverCallback)
*obsPtr = d.RegisterInt64Observer(obs.name, cb, obs.opts...)

var err error
*obsPtr, err = d.RegisterInt64Observer(obs.name, cb, obs.opts...)

if err != nil {
// TODO: There is no standard way to deliver this error to the user.
// See https://github.com/open-telemetry/opentelemetry-go/issues/514
// Note that the default SDK will not generate any errors yet, this is
// only for added safety.
panic(err)
}

atomic.StorePointer(&obs.delegate, unsafe.Pointer(obsPtr))
}

Expand All @@ -294,7 +331,17 @@ func (obs int64ObsImpl) Unregister() {
func (obs *obsImpl) setFloat64Delegate(d metric.Meter) {
obsPtr := new(metric.Float64Observer)
cb := obs.callback.(metric.Float64ObserverCallback)
*obsPtr = d.RegisterFloat64Observer(obs.name, cb, obs.opts...)

var err error
*obsPtr, err = d.RegisterFloat64Observer(obs.name, cb, obs.opts...)
if err != nil {
// TODO: There is no standard way to deliver this error to the user.
// See https://github.com/open-telemetry/opentelemetry-go/issues/514
// Note that the default SDK will not generate any errors yet, this is
// only for added safety.
panic(err)
}

atomic.StorePointer(&obs.delegate, unsafe.Pointer(obsPtr))
}

Expand Down Expand Up @@ -372,23 +419,23 @@ func (labels *labelSet) Delegate() metric.LabelSet {

// Constructors

func (m *meter) NewInt64Counter(name string, opts ...metric.CounterOptionApplier) metric.Int64Counter {
func (m *meter) NewInt64Counter(name string, opts ...metric.CounterOptionApplier) (metric.Int64Counter, error) {
return metric.WrapInt64CounterInstrument(m.newInst(name, counterKind, core.Int64NumberKind, opts))
}

func (m *meter) NewFloat64Counter(name string, opts ...metric.CounterOptionApplier) metric.Float64Counter {
func (m *meter) NewFloat64Counter(name string, opts ...metric.CounterOptionApplier) (metric.Float64Counter, error) {
return metric.WrapFloat64CounterInstrument(m.newInst(name, counterKind, core.Float64NumberKind, opts))
}

func (m *meter) NewInt64Measure(name string, opts ...metric.MeasureOptionApplier) metric.Int64Measure {
func (m *meter) NewInt64Measure(name string, opts ...metric.MeasureOptionApplier) (metric.Int64Measure, error) {
return metric.WrapInt64MeasureInstrument(m.newInst(name, measureKind, core.Int64NumberKind, opts))
}

func (m *meter) NewFloat64Measure(name string, opts ...metric.MeasureOptionApplier) metric.Float64Measure {
func (m *meter) NewFloat64Measure(name string, opts ...metric.MeasureOptionApplier) (metric.Float64Measure, error) {
return metric.WrapFloat64MeasureInstrument(m.newInst(name, measureKind, core.Float64NumberKind, opts))
}

func (m *meter) RegisterInt64Observer(name string, callback metric.Int64ObserverCallback, oos ...metric.ObserverOptionApplier) metric.Int64Observer {
func (m *meter) RegisterInt64Observer(name string, callback metric.Int64ObserverCallback, oos ...metric.ObserverOptionApplier) (metric.Int64Observer, error) {
m.lock.Lock()
defer m.lock.Unlock()

Expand All @@ -406,10 +453,10 @@ func (m *meter) RegisterInt64Observer(name string, callback metric.Int64Observer
m.addObserver(obs)
return int64ObsImpl{
observer: obs,
}
}, nil
}

func (m *meter) RegisterFloat64Observer(name string, callback metric.Float64ObserverCallback, oos ...metric.ObserverOptionApplier) metric.Float64Observer {
func (m *meter) RegisterFloat64Observer(name string, callback metric.Float64ObserverCallback, oos ...metric.ObserverOptionApplier) (metric.Float64Observer, error) {
m.lock.Lock()
defer m.lock.Unlock()

Expand All @@ -427,7 +474,7 @@ func (m *meter) RegisterFloat64Observer(name string, callback metric.Float64Obse
m.addObserver(obs)
return float64ObsImpl{
observer: obs,
}
}, nil
}

func (m *meter) addObserver(obs *obsImpl) {
Expand Down
63 changes: 50 additions & 13 deletions api/global/internal/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal_test

import (
"context"
"errors"
"io"
"io/ioutil"
"testing"
Expand Down Expand Up @@ -30,25 +31,25 @@ func TestDirect(t *testing.T) {
lvals3 := key.String("E", "F")
labels3 := meter2.Labels(lvals3)

counter := meter1.NewInt64Counter("test.counter")
counter := Must(meter1).NewInt64Counter("test.counter")
counter.Add(ctx, 1, labels1)
counter.Add(ctx, 1, labels1)

measure := meter1.NewFloat64Measure("test.measure")
measure := Must(meter1).NewFloat64Measure("test.measure")
measure.Record(ctx, 1, labels1)
measure.Record(ctx, 2, labels1)

_ = meter1.RegisterFloat64Observer("test.observer.float", func(result metric.Float64ObserverResult) {
_ = Must(meter1).RegisterFloat64Observer("test.observer.float", func(result metric.Float64ObserverResult) {
result.Observe(1., labels1)
result.Observe(2., labels2)
})

_ = meter1.RegisterInt64Observer("test.observer.int", func(result metric.Int64ObserverResult) {
_ = Must(meter1).RegisterInt64Observer("test.observer.int", func(result metric.Int64ObserverResult) {
result.Observe(1, labels1)
result.Observe(2, labels2)
})

second := meter2.NewFloat64Measure("test.second")
second := Must(meter2).NewFloat64Measure("test.second")
second.Record(ctx, 1, labels3)
second.Record(ctx, 2, labels3)

Expand Down Expand Up @@ -145,12 +146,12 @@ func TestBound(t *testing.T) {
lvals1 := key.String("A", "B")
labels1 := glob.Labels(lvals1)

counter := glob.NewFloat64Counter("test.counter")
counter := Must(glob).NewFloat64Counter("test.counter")
boundC := counter.Bind(labels1)
boundC.Add(ctx, 1)
boundC.Add(ctx, 1)

measure := glob.NewInt64Measure("test.measure")
measure := Must(glob).NewInt64Measure("test.measure")
boundM := measure.Bind(labels1)
boundM.Record(ctx, 1)
boundM.Record(ctx, 2)
Expand Down Expand Up @@ -195,14 +196,14 @@ func TestUnbind(t *testing.T) {
lvals1 := key.New("A").String("B")
labels1 := glob.Labels(lvals1)

counter := glob.NewFloat64Counter("test.counter")
counter := Must(glob).NewFloat64Counter("test.counter")
boundC := counter.Bind(labels1)

measure := glob.NewInt64Measure("test.measure")
measure := Must(glob).NewInt64Measure("test.measure")
boundM := measure.Bind(labels1)

observerInt := glob.RegisterInt64Observer("test.observer.int", nil)
observerFloat := glob.RegisterFloat64Observer("test.observer.float", nil)
observerInt := Must(glob).RegisterInt64Observer("test.observer.int", nil)
observerFloat := Must(glob).RegisterFloat64Observer("test.observer.float", nil)

boundC.Unbind()
boundM.Unbind()
Expand All @@ -218,7 +219,7 @@ func TestDefaultSDK(t *testing.T) {
lvals1 := key.String("A", "B")
labels1 := meter1.Labels(lvals1)

counter := meter1.NewInt64Counter("test.builtin")
counter := Must(meter1).NewInt64Counter("test.builtin")
counter.Add(ctx, 1, labels1)
counter.Add(ctx, 1, labels1)

Expand Down Expand Up @@ -251,8 +252,9 @@ func TestUnbindThenRecordOne(t *testing.T) {

ctx := context.Background()
sdk := metrictest.NewProvider()

meter := global.Meter("test")
counter := meter.NewInt64Counter("test.counter")
counter := Must(meter).NewInt64Counter("test.counter")
boundC := counter.Bind(meter.Labels())
global.SetMeterProvider(sdk)
boundC.Unbind()
Expand All @@ -263,3 +265,38 @@ func TestUnbindThenRecordOne(t *testing.T) {
mock := global.Meter("test").(*metrictest.Meter)
require.Equal(t, 0, len(mock.MeasurementBatches))
}

type meterProviderWithConstructorError struct {
metric.Provider
}

type meterWithConstructorError struct {
metric.Meter
}

func (m *meterProviderWithConstructorError) Meter(name string) metric.Meter {
return &meterWithConstructorError{m.Provider.Meter(name)}
}

func (m *meterWithConstructorError) NewInt64Counter(name string, cos ...metric.CounterOptionApplier) (metric.Int64Counter, error) {
return metric.Int64Counter{}, errors.New("constructor error")
}

func TestErrorInDeferredConstructor(t *testing.T) {
internal.ResetForTest()

ctx := context.Background()
meter := global.MeterProvider().Meter("builtin")

c1 := Must(meter).NewInt64Counter("test")
c2 := Must(meter).NewInt64Counter("test")

sdk := &meterProviderWithConstructorError{metrictest.NewProvider()}

require.Panics(t, func() {
global.SetMeterProvider(sdk)
})

c1.Add(ctx, 1, meter.Labels())
c2.Add(ctx, 2, meter.Labels())
}
24 changes: 15 additions & 9 deletions api/metric/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,30 +108,36 @@ type Meter interface {
// be read by the application.
Labels(...core.KeyValue) LabelSet

// RecordBatch atomically records a batch of measurements.
RecordBatch(context.Context, LabelSet, ...Measurement)

// All instrument constructors may return an error for
// conditions such as:
// `name` is an empty string
// `name` was previously registered as a different kind of instrument
// for a given named `Meter`.

// NewInt64Counter creates a new integral counter with a given
// name and customized with passed options.
NewInt64Counter(name string, cos ...CounterOptionApplier) Int64Counter
NewInt64Counter(name string, cos ...CounterOptionApplier) (Int64Counter, error)
// NewFloat64Counter creates a new floating point counter with
// a given name and customized with passed options.
NewFloat64Counter(name string, cos ...CounterOptionApplier) Float64Counter
NewFloat64Counter(name string, cos ...CounterOptionApplier) (Float64Counter, error)
// NewInt64Measure creates a new integral measure with a given
// name and customized with passed options.
NewInt64Measure(name string, mos ...MeasureOptionApplier) Int64Measure
NewInt64Measure(name string, mos ...MeasureOptionApplier) (Int64Measure, error)
// NewFloat64Measure creates a new floating point measure with
// a given name and customized with passed options.
NewFloat64Measure(name string, mos ...MeasureOptionApplier) Float64Measure
NewFloat64Measure(name string, mos ...MeasureOptionApplier) (Float64Measure, error)

// RegisterInt64Observer creates a new integral observer with a
// given name, running a given callback, and customized with passed
// options. Callback can be nil.
RegisterInt64Observer(name string, callback Int64ObserverCallback, oos ...ObserverOptionApplier) Int64Observer
RegisterInt64Observer(name string, callback Int64ObserverCallback, oos ...ObserverOptionApplier) (Int64Observer, error)
// RegisterFloat64Observer creates a new floating point observer
// with a given name, running a given callback, and customized with
// passed options. Callback can be nil.
RegisterFloat64Observer(name string, callback Float64ObserverCallback, oos ...ObserverOptionApplier) Float64Observer

// RecordBatch atomically records a batch of measurements.
RecordBatch(context.Context, LabelSet, ...Measurement)
RegisterFloat64Observer(name string, callback Float64ObserverCallback, oos ...ObserverOptionApplier) (Float64Observer, error)
}

// Int64ObserverResult is an interface for reporting integral
Expand Down
Loading

0 comments on commit 4047c08

Please sign in to comment.