diff --git a/api/global/internal/internal_test.go b/api/global/internal/internal_test.go new file mode 100644 index 00000000000..8dfb78fd899 --- /dev/null +++ b/api/global/internal/internal_test.go @@ -0,0 +1,26 @@ +package internal_test + +import ( + "os" + "testing" + + "go.opentelemetry.io/otel/api/global/internal" + ottest "go.opentelemetry.io/otel/internal/testing" +) + +// Ensure struct alignment prior to running tests. +func TestMain(m *testing.M) { + fieldsMap := internal.AtomicFieldOffsets() + fields := make([]ottest.FieldOffset, 0, len(fieldsMap)) + for name, offset := range fieldsMap { + fields = append(fields, ottest.FieldOffset{ + Name: name, + Offset: offset, + }) + } + if !ottest.Aligned8Byte(fields, os.Stderr) { + os.Exit(1) + } + + os.Exit(m.Run()) +} diff --git a/api/global/internal/meter.go b/api/global/internal/meter.go index e15d75feda6..2c4f33140de 100644 --- a/api/global/internal/meter.go +++ b/api/global/internal/meter.go @@ -40,44 +40,76 @@ const ( ) type meterProvider struct { - lock sync.Mutex - meters []*meter delegate metric.Provider + + lock sync.Mutex + meters []*meter } type meter struct { + delegate unsafe.Pointer // (*metric.Meter) + provider *meterProvider name string - lock sync.Mutex - instruments []*instImpl - - delegate unsafe.Pointer // (*metric.Meter) + lock sync.Mutex + instruments []*instImpl + liveObservers map[*obsImpl]struct{} + // orderedObservers slice contains observers in their order of + // registration. It may also contain unregistered + // observers. The liveObservers map should be consulted to + // check if the observer is registered or not. + orderedObservers []*obsImpl } type instImpl struct { + delegate unsafe.Pointer // (*metric.InstrumentImpl) + name string mkind metricKind nkind core.NumberKind opts interface{} +} - delegate unsafe.Pointer // (*metric.InstrumentImpl) +type obsImpl struct { + delegate unsafe.Pointer // (*metric.Int64Observer or *metric.Float64Observer) + + name string + nkind core.NumberKind + opts []metric.ObserverOptionApplier + meter *meter + callback interface{} +} + +type int64ObsImpl struct { + observer *obsImpl +} + +type float64ObsImpl struct { + observer *obsImpl +} + +// this is a common subset of the metric observers interfaces +type observerUnregister interface { + Unregister() } type labelSet struct { + delegate unsafe.Pointer // (* metric.LabelSet) + meter *meter value []core.KeyValue initialize sync.Once - delegate unsafe.Pointer // (* metric.LabelSet) } type instHandle struct { + delegate unsafe.Pointer // (*metric.HandleImpl) + inst *instImpl labels metric.LabelSet initialize sync.Once - delegate unsafe.Pointer // (*metric.HandleImpl) } var _ metric.Provider = &meterProvider{} @@ -86,6 +118,10 @@ var _ metric.LabelSet = &labelSet{} var _ metric.LabelSetDelegate = &labelSet{} var _ metric.InstrumentImpl = &instImpl{} var _ metric.BoundInstrumentImpl = &instHandle{} +var _ metric.Int64Observer = int64ObsImpl{} +var _ metric.Float64Observer = float64ObsImpl{} +var _ observerUnregister = (metric.Int64Observer)(nil) +var _ observerUnregister = (metric.Float64Observer)(nil) // Provider interface and delegation @@ -130,6 +166,13 @@ func (m *meter) setDelegate(provider metric.Provider) { inst.setDelegate(*d) } m.instruments = nil + for _, obs := range m.orderedObservers { + if _, ok := m.liveObservers[obs]; ok { + obs.setDelegate(*d) + } + } + m.liveObservers = nil + m.orderedObservers = nil } func (m *meter) newInst(name string, mkind metricKind, nkind core.NumberKind, opts interface{}) metric.InstrumentImpl { @@ -203,6 +246,68 @@ func (bound *instHandle) Unbind() { (*implPtr).Unbind() } +// Any Observer delegation + +func (obs *obsImpl) setDelegate(d metric.Meter) { + if obs.nkind == core.Int64NumberKind { + obs.setInt64Delegate(d) + } else { + obs.setFloat64Delegate(d) + } +} + +func (obs *obsImpl) unregister() { + unreg := obs.getUnregister() + if unreg != nil { + unreg.Unregister() + return + } + obs.meter.lock.Lock() + defer obs.meter.lock.Unlock() + delete(obs.meter.liveObservers, obs) + if len(obs.meter.liveObservers) == 0 { + obs.meter.liveObservers = nil + obs.meter.orderedObservers = nil + } +} + +func (obs *obsImpl) getUnregister() observerUnregister { + ptr := atomic.LoadPointer(&obs.delegate) + if ptr == nil { + return nil + } + if obs.nkind == core.Int64NumberKind { + return *(*metric.Int64Observer)(ptr) + } + return *(*metric.Float64Observer)(ptr) +} + +// Int64Observer delegation + +func (obs *obsImpl) setInt64Delegate(d metric.Meter) { + obsPtr := new(metric.Int64Observer) + cb := obs.callback.(metric.Int64ObserverCallback) + *obsPtr = d.RegisterInt64Observer(obs.name, cb, obs.opts...) + atomic.StorePointer(&obs.delegate, unsafe.Pointer(obsPtr)) +} + +func (obs int64ObsImpl) Unregister() { + obs.observer.unregister() +} + +// Float64Observer delegation + +func (obs *obsImpl) setFloat64Delegate(d metric.Meter) { + obsPtr := new(metric.Float64Observer) + cb := obs.callback.(metric.Float64ObserverCallback) + *obsPtr = d.RegisterFloat64Observer(obs.name, cb, obs.opts...) + atomic.StorePointer(&obs.delegate, unsafe.Pointer(obsPtr)) +} + +func (obs float64ObsImpl) Unregister() { + obs.observer.unregister() +} + // Metric updates func (m *meter) RecordBatch(ctx context.Context, labels metric.LabelSet, measurements ...metric.Measurement) { @@ -296,3 +401,64 @@ func (m *meter) NewInt64Measure(name string, opts ...metric.MeasureOptionApplier func (m *meter) NewFloat64Measure(name string, opts ...metric.MeasureOptionApplier) metric.Float64Measure { return metric.WrapFloat64MeasureInstrument(m.newInst(name, measureKind, core.Float64NumberKind, opts)) } + +func (m *meter) RegisterInt64Observer(name string, callback metric.Int64ObserverCallback, oos ...metric.ObserverOptionApplier) metric.Int64Observer { + m.lock.Lock() + defer m.lock.Unlock() + + if meterPtr := (*metric.Meter)(atomic.LoadPointer(&m.delegate)); meterPtr != nil { + return (*meterPtr).RegisterInt64Observer(name, callback, oos...) + } + + obs := &obsImpl{ + name: name, + nkind: core.Int64NumberKind, + opts: oos, + meter: m, + callback: callback, + } + m.addObserver(obs) + return int64ObsImpl{ + observer: obs, + } +} + +func (m *meter) RegisterFloat64Observer(name string, callback metric.Float64ObserverCallback, oos ...metric.ObserverOptionApplier) metric.Float64Observer { + m.lock.Lock() + defer m.lock.Unlock() + + if meterPtr := (*metric.Meter)(atomic.LoadPointer(&m.delegate)); meterPtr != nil { + return (*meterPtr).RegisterFloat64Observer(name, callback, oos...) + } + + obs := &obsImpl{ + name: name, + nkind: core.Float64NumberKind, + opts: oos, + meter: m, + callback: callback, + } + m.addObserver(obs) + return float64ObsImpl{ + observer: obs, + } +} + +func (m *meter) addObserver(obs *obsImpl) { + if m.liveObservers == nil { + m.liveObservers = make(map[*obsImpl]struct{}) + } + m.liveObservers[obs] = struct{}{} + m.orderedObservers = append(m.orderedObservers, obs) +} + +func AtomicFieldOffsets() map[string]uintptr { + return map[string]uintptr{ + "meterProvider.delegate": unsafe.Offsetof(meterProvider{}.delegate), + "meter.delegate": unsafe.Offsetof(meter{}.delegate), + "instImpl.delegate": unsafe.Offsetof(instImpl{}.delegate), + "obsImpl.delegate": unsafe.Offsetof(obsImpl{}.delegate), + "labelSet.delegate": unsafe.Offsetof(labelSet{}.delegate), + "instHandle.delegate": unsafe.Offsetof(instHandle{}.delegate), + } +} diff --git a/api/global/internal/meter_test.go b/api/global/internal/meter_test.go index c9d0ac7482e..7360a2bbc58 100644 --- a/api/global/internal/meter_test.go +++ b/api/global/internal/meter_test.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/global/internal" "go.opentelemetry.io/otel/api/key" + "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/exporters/metric/stdout" metrictest "go.opentelemetry.io/otel/internal/metric" ) @@ -41,6 +42,16 @@ func TestDirect(t *testing.T) { measure.Record(ctx, 1, labels1) measure.Record(ctx, 2, labels1) + _ = meter1.RegisterFloat64Observer("test.observer.float", func(result metric.Float64ObserverResult) { + result.Observe(1., labels1) + result.Observe(2., labels2) + }) + + _ = meter1.RegisterInt64Observer("test.observer.int", func(result metric.Int64ObserverResult) { + result.Observe(1, labels1) + result.Observe(2, labels2) + }) + second := meter2.NewFloat64Measure("test.second") second.Record(ctx, 1, labels3) second.Record(ctx, 2, labels3) @@ -54,45 +65,86 @@ func TestDirect(t *testing.T) { second.Record(ctx, 3, labels3) mock := sdk.Meter("test1").(*metrictest.Meter) - require.Equal(t, 3, len(mock.MeasurementBatches)) + mock.RunObservers() + require.Len(t, mock.MeasurementBatches, 7) require.Equal(t, map[core.Key]core.Value{ lvals1.Key: lvals1.Value, }, mock.MeasurementBatches[0].LabelSet.Labels) - require.Equal(t, 1, len(mock.MeasurementBatches[0].Measurements)) - require.Equal(t, core.NewInt64Number(1), - mock.MeasurementBatches[0].Measurements[0].Number) + require.Len(t, mock.MeasurementBatches[0].Measurements, 1) + require.Equal(t, int64(1), + mock.MeasurementBatches[0].Measurements[0].Number.AsInt64()) require.Equal(t, "test.counter", mock.MeasurementBatches[0].Measurements[0].Instrument.Name) require.Equal(t, map[core.Key]core.Value{ lvals2.Key: lvals2.Value, }, mock.MeasurementBatches[1].LabelSet.Labels) - require.Equal(t, 1, len(mock.MeasurementBatches[1].Measurements)) - require.Equal(t, core.NewInt64Number(3), - mock.MeasurementBatches[1].Measurements[0].Number) + require.Len(t, mock.MeasurementBatches[1].Measurements, 1) + require.Equal(t, int64(3), + mock.MeasurementBatches[1].Measurements[0].Number.AsInt64()) require.Equal(t, "test.gauge", mock.MeasurementBatches[1].Measurements[0].Instrument.Name) require.Equal(t, map[core.Key]core.Value{ lvals1.Key: lvals1.Value, }, mock.MeasurementBatches[2].LabelSet.Labels) - require.Equal(t, 1, len(mock.MeasurementBatches[2].Measurements)) - require.Equal(t, core.NewFloat64Number(3), - mock.MeasurementBatches[2].Measurements[0].Number) + require.Len(t, mock.MeasurementBatches[2].Measurements, 1) + require.InDelta(t, float64(3), + mock.MeasurementBatches[2].Measurements[0].Number.AsFloat64(), + 0.01) require.Equal(t, "test.measure", mock.MeasurementBatches[2].Measurements[0].Instrument.Name) + require.Equal(t, map[core.Key]core.Value{ + lvals1.Key: lvals1.Value, + }, mock.MeasurementBatches[3].LabelSet.Labels) + require.Len(t, mock.MeasurementBatches[3].Measurements, 1) + require.InDelta(t, float64(1), + mock.MeasurementBatches[3].Measurements[0].Number.AsFloat64(), + 0.01) + require.Equal(t, "test.observer.float", + mock.MeasurementBatches[3].Measurements[0].Instrument.Name) + + require.Equal(t, map[core.Key]core.Value{ + lvals2.Key: lvals2.Value, + }, mock.MeasurementBatches[4].LabelSet.Labels) + require.Len(t, mock.MeasurementBatches[4].Measurements, 1) + require.InDelta(t, float64(2), + mock.MeasurementBatches[4].Measurements[0].Number.AsFloat64(), + 0.01) + require.Equal(t, "test.observer.float", + mock.MeasurementBatches[4].Measurements[0].Instrument.Name) + + require.Equal(t, map[core.Key]core.Value{ + lvals1.Key: lvals1.Value, + }, mock.MeasurementBatches[5].LabelSet.Labels) + require.Len(t, mock.MeasurementBatches[5].Measurements, 1) + require.Equal(t, int64(1), + mock.MeasurementBatches[5].Measurements[0].Number.AsInt64()) + require.Equal(t, "test.observer.int", + mock.MeasurementBatches[5].Measurements[0].Instrument.Name) + + require.Equal(t, map[core.Key]core.Value{ + lvals2.Key: lvals2.Value, + }, mock.MeasurementBatches[6].LabelSet.Labels) + require.Len(t, mock.MeasurementBatches[6].Measurements, 1) + require.Equal(t, int64(2), + mock.MeasurementBatches[6].Measurements[0].Number.AsInt64()) + require.Equal(t, "test.observer.int", + mock.MeasurementBatches[6].Measurements[0].Instrument.Name) + // This tests the second Meter instance mock = sdk.Meter("test2").(*metrictest.Meter) - require.Equal(t, 1, len(mock.MeasurementBatches)) + require.Len(t, mock.MeasurementBatches, 1) require.Equal(t, map[core.Key]core.Value{ lvals3.Key: lvals3.Value, }, mock.MeasurementBatches[0].LabelSet.Labels) - require.Equal(t, 1, len(mock.MeasurementBatches[0].Measurements)) - require.Equal(t, core.NewFloat64Number(3), - mock.MeasurementBatches[0].Measurements[0].Number) + require.Len(t, mock.MeasurementBatches[0].Measurements, 1) + require.InDelta(t, float64(3), + mock.MeasurementBatches[0].Measurements[0].Number.AsFloat64(), + 0.01) require.Equal(t, "test.second", mock.MeasurementBatches[0].Measurements[0].Instrument.Name) } @@ -138,8 +190,9 @@ func TestBound(t *testing.T) { lvals1.Key: lvals1.Value, }, mock.MeasurementBatches[0].LabelSet.Labels) require.Equal(t, 1, len(mock.MeasurementBatches[0].Measurements)) - require.Equal(t, core.NewFloat64Number(1), - mock.MeasurementBatches[0].Measurements[0].Number) + require.InDelta(t, float64(1), + mock.MeasurementBatches[0].Measurements[0].Number.AsFloat64(), + 0.01) require.Equal(t, "test.counter", mock.MeasurementBatches[0].Measurements[0].Instrument.Name) @@ -147,8 +200,9 @@ func TestBound(t *testing.T) { lvals2.Key: lvals2.Value, }, mock.MeasurementBatches[1].LabelSet.Labels) require.Equal(t, 1, len(mock.MeasurementBatches[1].Measurements)) - require.Equal(t, core.NewFloat64Number(3), - mock.MeasurementBatches[1].Measurements[0].Number) + require.InDelta(t, float64(3), + mock.MeasurementBatches[1].Measurements[0].Number.AsFloat64(), + 0.01) require.Equal(t, "test.gauge", mock.MeasurementBatches[1].Measurements[0].Instrument.Name) @@ -156,8 +210,8 @@ func TestBound(t *testing.T) { lvals1.Key: lvals1.Value, }, mock.MeasurementBatches[2].LabelSet.Labels) require.Equal(t, 1, len(mock.MeasurementBatches[2].Measurements)) - require.Equal(t, core.NewInt64Number(3), - mock.MeasurementBatches[2].Measurements[0].Number) + require.Equal(t, int64(3), + mock.MeasurementBatches[2].Measurements[0].Number.AsInt64()) require.Equal(t, "test.measure", mock.MeasurementBatches[2].Measurements[0].Instrument.Name) @@ -185,9 +239,14 @@ func TestUnbind(t *testing.T) { measure := glob.NewInt64Measure("test.measure") boundM := measure.Bind(labels1) + observerInt := glob.RegisterInt64Observer("test.observer.int", nil) + observerFloat := glob.RegisterFloat64Observer("test.observer.float", nil) + boundC.Unbind() boundG.Unbind() boundM.Unbind() + observerInt.Unregister() + observerFloat.Unregister() } func TestDefaultSDK(t *testing.T) { diff --git a/api/metric/api.go b/api/metric/api.go index 2e546c697a4..9ccec6b8e38 100644 --- a/api/metric/api.go +++ b/api/metric/api.go @@ -81,6 +81,14 @@ type MeasureOptionApplier interface { ApplyMeasureOption(*Options) } +// ObserverOptionApplier is an interface for applying metric options +// that are valid only for observer metrics. +type ObserverOptionApplier interface { + // ApplyObserverOption is used to make some general or + // observer-specific changes in the Options. + ApplyObserverOption(*Options) +} + // Measurement is used for reporting a batch of metric // values. Instances of this type should be created by instruments // (e.g., Int64Counter.Measurement()). @@ -127,10 +135,51 @@ type Meter interface { // a given name and customized with passed options. NewFloat64Measure(name string, mos ...MeasureOptionApplier) Float64Measure + // RegisterInt64Observer creates a new integral observer with a + // given name, running a given callback, and customized with passed + // options. Callback can be nil. + RegisterInt64Observer(name string, callback Int64ObserverCallback, oos ...ObserverOptionApplier) Int64Observer + // RegisterFloat64Observer creates a new floating point observer + // with a given name, running a given callback, and customized with + // passed options. Callback can be nil. + RegisterFloat64Observer(name string, callback Float64ObserverCallback, oos ...ObserverOptionApplier) Float64Observer + // RecordBatch atomically records a batch of measurements. RecordBatch(context.Context, LabelSet, ...Measurement) } +// Int64ObserverResult is an interface for reporting integral +// observations. +type Int64ObserverResult interface { + Observe(value int64, labels LabelSet) +} + +// Float64ObserverResult is an interface for reporting floating point +// observations. +type Float64ObserverResult interface { + Observe(value float64, labels LabelSet) +} + +// Int64ObserverCallback is a type of callback that integral +// observers run. +type Int64ObserverCallback func(result Int64ObserverResult) + +// Float64ObserverCallback is a type of callback that floating point +// observers run. +type Float64ObserverCallback func(result Float64ObserverResult) + +// Int64Observer is a metric that captures a set of int64 values at a +// point in time. +type Int64Observer interface { + Unregister() +} + +// Float64Observer is a metric that captures a set of float64 values +// at a point in time. +type Float64Observer interface { + Unregister() +} + // Option supports specifying the various metric options. type Option func(*Options) @@ -140,16 +189,19 @@ type OptionApplier interface { CounterOptionApplier GaugeOptionApplier MeasureOptionApplier + ObserverOptionApplier // ApplyOption is used to make some general changes in the // Options. ApplyOption(*Options) } -// CounterGaugeOptionApplier is an interface for applying metric -// options that are valid for counter or gauge metrics. -type CounterGaugeOptionApplier interface { +// CounterGaugeObserverOptionApplier is an interface for applying +// metric options that are valid for counter, gauge or observer +// metrics. +type CounterGaugeObserverOptionApplier interface { CounterOptionApplier GaugeOptionApplier + ObserverOptionApplier } type optionWrapper struct { @@ -168,16 +220,22 @@ type measureOptionWrapper struct { F Option } -type counterGaugeOptionWrapper struct { +type observerOptionWrapper struct { + F Option +} + +type counterGaugeObserverOptionWrapper struct { FC Option FG Option + FO Option } var ( - _ OptionApplier = optionWrapper{} - _ CounterOptionApplier = counterOptionWrapper{} - _ GaugeOptionApplier = gaugeOptionWrapper{} - _ MeasureOptionApplier = measureOptionWrapper{} + _ OptionApplier = optionWrapper{} + _ CounterOptionApplier = counterOptionWrapper{} + _ GaugeOptionApplier = gaugeOptionWrapper{} + _ MeasureOptionApplier = measureOptionWrapper{} + _ ObserverOptionApplier = observerOptionWrapper{} ) func (o optionWrapper) ApplyCounterOption(opts *Options) { @@ -192,6 +250,10 @@ func (o optionWrapper) ApplyMeasureOption(opts *Options) { o.ApplyOption(opts) } +func (o optionWrapper) ApplyObserverOption(opts *Options) { + o.ApplyOption(opts) +} + func (o optionWrapper) ApplyOption(opts *Options) { o.F(opts) } @@ -208,14 +270,22 @@ func (o measureOptionWrapper) ApplyMeasureOption(opts *Options) { o.F(opts) } -func (o counterGaugeOptionWrapper) ApplyCounterOption(opts *Options) { +func (o counterGaugeObserverOptionWrapper) ApplyCounterOption(opts *Options) { o.FC(opts) } -func (o counterGaugeOptionWrapper) ApplyGaugeOption(opts *Options) { +func (o counterGaugeObserverOptionWrapper) ApplyGaugeOption(opts *Options) { o.FG(opts) } +func (o counterGaugeObserverOptionWrapper) ApplyObserverOption(opts *Options) { + o.FO(opts) +} + +func (o observerOptionWrapper) ApplyObserverOption(opts *Options) { + o.F(opts) +} + // WithDescription applies provided description. func WithDescription(desc string) OptionApplier { return optionWrapper{ @@ -244,16 +314,19 @@ func WithKeys(keys ...core.Key) OptionApplier { } } -// WithMonotonic sets whether a counter or a gauge is not permitted to -// go down. -func WithMonotonic(monotonic bool) CounterGaugeOptionApplier { - return counterGaugeOptionWrapper{ +// WithMonotonic sets whether a counter, a gauge or an observer is not +// permitted to go down. +func WithMonotonic(monotonic bool) CounterGaugeObserverOptionApplier { + return counterGaugeObserverOptionWrapper{ FC: func(opts *Options) { opts.Alternate = !monotonic }, FG: func(opts *Options) { opts.Alternate = monotonic }, + FO: func(opts *Options) { + opts.Alternate = monotonic + }, } } diff --git a/api/metric/api_test.go b/api/metric/api_test.go index a45c96c6118..8698727b763 100644 --- a/api/metric/api_test.go +++ b/api/metric/api_test.go @@ -26,6 +26,7 @@ import ( mock "go.opentelemetry.io/otel/internal/metric" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" ) func TestCounterOptions(t *testing.T) { @@ -361,6 +362,117 @@ func TestMeasureOptions(t *testing.T) { } } +func TestObserverOptions(t *testing.T) { + type testcase struct { + name string + opts []metric.ObserverOptionApplier + keys []core.Key + desc string + unit unit.Unit + alt bool + } + testcases := []testcase{ + { + name: "no opts", + opts: nil, + keys: nil, + desc: "", + unit: "", + alt: false, + }, + { + name: "keys keys keys", + opts: []metric.ObserverOptionApplier{ + metric.WithKeys(key.New("foo"), key.New("foo2")), + metric.WithKeys(key.New("bar"), key.New("bar2")), + metric.WithKeys(key.New("baz"), key.New("baz2")), + }, + keys: []core.Key{ + key.New("foo"), key.New("foo2"), + key.New("bar"), key.New("bar2"), + key.New("baz"), key.New("baz2"), + }, + desc: "", + unit: "", + alt: false, + }, + { + name: "description", + opts: []metric.ObserverOptionApplier{ + metric.WithDescription("stuff"), + }, + keys: nil, + desc: "stuff", + unit: "", + alt: false, + }, + { + name: "description override", + opts: []metric.ObserverOptionApplier{ + metric.WithDescription("stuff"), + metric.WithDescription("things"), + }, + keys: nil, + desc: "things", + unit: "", + alt: false, + }, + { + name: "unit", + opts: []metric.ObserverOptionApplier{ + metric.WithUnit("s"), + }, + keys: nil, + desc: "", + unit: "s", + alt: false, + }, + { + name: "unit override", + opts: []metric.ObserverOptionApplier{ + metric.WithUnit("s"), + metric.WithUnit("h"), + }, + keys: nil, + desc: "", + unit: "h", + alt: false, + }, + { + name: "monotonic", + opts: []metric.ObserverOptionApplier{ + metric.WithMonotonic(true), + }, + keys: nil, + desc: "", + unit: "", + alt: true, + }, + { + name: "monotonic, but not really", + opts: []metric.ObserverOptionApplier{ + metric.WithMonotonic(true), + metric.WithMonotonic(false), + }, + keys: nil, + desc: "", + unit: "", + alt: false, + }, + } + for idx, tt := range testcases { + t.Logf("Testing observer case %s (%d)", tt.name, idx) + opts := &metric.Options{} + metric.ApplyObserverOptions(opts, tt.opts...) + checkOptions(t, opts, &metric.Options{ + Description: tt.desc, + Unit: tt.unit, + Keys: tt.keys, + Alternate: tt.alt, + }) + } +} + func checkOptions(t *testing.T, got *metric.Options, expected *metric.Options) { if diff := cmp.Diff(got, expected); diff != "" { t.Errorf("Compare options: -got +want %s", diff) @@ -448,6 +560,29 @@ func TestMeasure(t *testing.T) { } } +func TestObserver(t *testing.T) { + { + meter := mock.NewMeter() + labels := meter.Labels() + o := meter.RegisterFloat64Observer("test.observer.float", func(result metric.Float64ObserverResult) { + result.Observe(42, labels) + }) + t.Log("Testing float observer") + meter.RunObservers() + checkObserverBatch(t, labels, meter, core.Float64NumberKind, o) + } + { + meter := mock.NewMeter() + labels := meter.Labels() + o := meter.RegisterInt64Observer("test.observer.int", func(result metric.Int64ObserverResult) { + result.Observe(42, labels) + }) + t.Log("Testing int observer") + meter.RunObservers() + checkObserverBatch(t, labels, meter, core.Int64NumberKind, o) + } +} + func checkBatches(t *testing.T, ctx context.Context, labels metric.LabelSet, meter *mock.Meter, kind core.NumberKind, instrument metric.InstrumentImpl) { t.Helper() if len(meter.MeasurementBatches) != 3 { @@ -496,7 +631,31 @@ func checkBatches(t *testing.T, ctx context.Context, labels metric.LabelSet, met } } +func checkObserverBatch(t *testing.T, labels metric.LabelSet, meter *mock.Meter, kind core.NumberKind, observer interface{}) { + t.Helper() + assert.Len(t, meter.MeasurementBatches, 1) + if len(meter.MeasurementBatches) < 1 { + return + } + o := observer.(*mock.Observer) + if !assert.NotNil(t, o) { + return + } + ourLabelSet := labels.(*mock.LabelSet) + got := meter.MeasurementBatches[0] + assert.Equal(t, ourLabelSet, got.LabelSet) + assert.Len(t, got.Measurements, 1) + if len(got.Measurements) < 1 { + return + } + measurement := got.Measurements[0] + assert.Equal(t, o.Instrument, measurement.Instrument) + ft := fortyTwo(t, kind) + assert.Equal(t, 0, measurement.Number.CompareNumber(kind, ft)) +} + func fortyTwo(t *testing.T, kind core.NumberKind) core.Number { + t.Helper() switch kind { case core.Int64NumberKind: return core.NewInt64Number(42) diff --git a/api/metric/doc.go b/api/metric/doc.go index 26e5884be07..27fc33aa5d7 100644 --- a/api/metric/doc.go +++ b/api/metric/doc.go @@ -13,21 +13,22 @@ // limitations under the License. // metric package provides an API for reporting diagnostic -// measurements using three basic kinds of instruments (or four, if -// calling one special case a separate one). +// measurements using four basic kinds of instruments. // -// The three basic kinds are: +// The four basic kinds are: // // - counters // - gauges // - measures +// - observers // // All instruments report either float64 or int64 values. // -// The primary object that handles metrics is Meter. The -// implementation of the Meter is provided by SDK. Normally, the Meter -// is used directly only for the LabelSet generation, batch recording -// and the bound instrument destruction. +// The primary object that handles metrics is Meter. Meter can be +// obtained from Provider. The implementations of the Meter and +// Provider are provided by SDK. Normally, the Meter is used directly +// only for the instrument creation, LabelSet generation and batch +// recording. // // LabelSet is a set of keys and values that are in a suitable, // optimized form to be used by Meter. @@ -60,11 +61,24 @@ // the New*Measure function - this allows reporting negative values // too. To report a new value, use the Record function. // -// All the basic kinds of instruments also support creating bound -// instruments for a potentially more efficient reporting. The bound -// instruments have the same function names as the instruments (so a -// Counter bound instrument has Add, a Gauge bound instrument has Set, -// and a Measure bound instrument has Record). Bound Instruments can -// be created with the Bind function of the respective -// instrument. When done with the bound instrument, call Unbind on it. +// Observers are instruments that are reporting a current state of a +// set of values. An example could be voltage or +// temperature. Observers can be created with either +// RegisterFloat64Observer or RegisterInt64Observer. Observers by +// default have no limitations about reported values - they can be +// less or greater than the last reported value. This can be changed +// with the WithMonotonic option passed to the Register*Observer +// function - this permits the reported values only to go +// up. Reporting of the new values happens asynchronously, with the +// use of a callback passed to the Register*Observer function. The +// callback can report multiple values. To unregister the observer, +// call Unregister on it. +// +// Counters, gauges and measures support creating bound instruments +// for a potentially more efficient reporting. The bound instruments +// have the same function names as the instruments (so a Counter bound +// instrument has Add, a Gauge bound instrument has Set, and a Measure +// bound instrument has Record). Bound Instruments can be created +// with the Bind function of the respective instrument. When done with +// the bound instrument, call Unbind on it. package metric // import "go.opentelemetry.io/otel/api/metric" diff --git a/api/metric/noop.go b/api/metric/noop.go index 66cdd6e1f5d..d569dbae6aa 100644 --- a/api/metric/noop.go +++ b/api/metric/noop.go @@ -11,12 +11,16 @@ type NoopMeter struct{} type noopBoundInstrument struct{} type noopLabelSet struct{} type noopInstrument struct{} +type noopInt64Observer struct{} +type noopFloat64Observer struct{} var _ Provider = NoopProvider{} var _ Meter = NoopMeter{} var _ InstrumentImpl = noopInstrument{} var _ BoundInstrumentImpl = noopBoundInstrument{} var _ LabelSet = noopLabelSet{} +var _ Int64Observer = noopInt64Observer{} +var _ Float64Observer = noopFloat64Observer{} func (NoopProvider) Meter(name string) Meter { return NoopMeter{} @@ -39,6 +43,12 @@ func (noopInstrument) Meter() Meter { return NoopMeter{} } +func (noopInt64Observer) Unregister() { +} + +func (noopFloat64Observer) Unregister() { +} + func (NoopMeter) Labels(...core.KeyValue) LabelSet { return noopLabelSet{} } @@ -69,3 +79,11 @@ func (NoopMeter) NewFloat64Measure(name string, mos ...MeasureOptionApplier) Flo func (NoopMeter) RecordBatch(context.Context, LabelSet, ...Measurement) { } + +func (NoopMeter) RegisterInt64Observer(name string, callback Int64ObserverCallback, oos ...ObserverOptionApplier) Int64Observer { + return noopInt64Observer{} +} + +func (NoopMeter) RegisterFloat64Observer(name string, callback Float64ObserverCallback, oos ...ObserverOptionApplier) Float64Observer { + return noopFloat64Observer{} +} diff --git a/api/metric/sdkhelpers.go b/api/metric/sdkhelpers.go index 882b627224b..5efdab24d51 100644 --- a/api/metric/sdkhelpers.go +++ b/api/metric/sdkhelpers.go @@ -122,3 +122,11 @@ func ApplyMeasureOptions(opts *Options, mos ...MeasureOptionApplier) { o.ApplyMeasureOption(opts) } } + +// ApplyObserverOptions is a helper that applies all the observer +// options to passed opts. +func ApplyObserverOptions(opts *Options, mos ...ObserverOptionApplier) { + for _, o := range mos { + o.ApplyObserverOption(opts) + } +} diff --git a/internal/metric/mock.go b/internal/metric/mock.go index 8bd37574f5b..8a70a47c3f7 100644 --- a/internal/metric/mock.go +++ b/internal/metric/mock.go @@ -54,6 +54,10 @@ type ( Meter struct { MeasurementBatches []Batch + // Observers contains also unregistered + // observers. Check the Dead field of the Observer to + // figure out its status. + Observers []*Observer } Kind int8 @@ -63,21 +67,63 @@ type ( Number core.Number Instrument *Instrument } + + observerResult struct { + instrument *Instrument + } + + int64ObserverResult struct { + result observerResult + } + + float64ObserverResult struct { + result observerResult + } + + observerCallback func(observerResult) + + Observer struct { + Instrument *Instrument + Meter *Meter + Dead bool + callback observerCallback + } ) var ( - _ apimetric.InstrumentImpl = &Instrument{} - _ apimetric.BoundInstrumentImpl = &Handle{} - _ apimetric.LabelSet = &LabelSet{} - _ apimetric.Meter = &Meter{} + _ apimetric.InstrumentImpl = &Instrument{} + _ apimetric.BoundInstrumentImpl = &Handle{} + _ apimetric.LabelSet = &LabelSet{} + _ apimetric.Meter = &Meter{} + _ apimetric.Int64Observer = &Observer{} + _ apimetric.Float64Observer = &Observer{} + _ apimetric.Int64ObserverResult = int64ObserverResult{} + _ apimetric.Float64ObserverResult = float64ObserverResult{} ) const ( KindCounter Kind = iota KindGauge KindMeasure + KindObserver ) +func (o *Observer) Unregister() { + o.Dead = true +} + +func (r int64ObserverResult) Observe(value int64, labels apimetric.LabelSet) { + r.result.observe(core.NewInt64Number(value), labels) +} + +func (r float64ObserverResult) Observe(value float64, labels apimetric.LabelSet) { + r.result.observe(core.NewFloat64Number(value), labels) +} + +func (r observerResult) observe(number core.Number, labels apimetric.LabelSet) { + r.instrument.RecordOne(context.Background(), number, labels) +} + func (i *Instrument) Bind(labels apimetric.LabelSet) apimetric.BoundInstrumentImpl { if ld, ok := labels.(apimetric.LabelSetDelegate); ok { labels = ld.Delegate() @@ -209,6 +255,58 @@ func (m *Meter) newMeasureInstrument(name string, numberKind core.NumberKind, mo } } +func (m *Meter) RegisterInt64Observer(name string, callback apimetric.Int64ObserverCallback, oos ...apimetric.ObserverOptionApplier) apimetric.Int64Observer { + wrappedCallback := wrapInt64ObserverCallback(callback) + return m.newObserver(name, wrappedCallback, core.Int64NumberKind, oos...) +} + +func wrapInt64ObserverCallback(callback apimetric.Int64ObserverCallback) observerCallback { + if callback == nil { + return func(result observerResult) {} + } + return func(result observerResult) { + typeSafeResult := int64ObserverResult{ + result: result, + } + callback(typeSafeResult) + } +} + +func (m *Meter) RegisterFloat64Observer(name string, callback apimetric.Float64ObserverCallback, oos ...apimetric.ObserverOptionApplier) apimetric.Float64Observer { + wrappedCallback := wrapFloat64ObserverCallback(callback) + return m.newObserver(name, wrappedCallback, core.Float64NumberKind, oos...) +} + +func wrapFloat64ObserverCallback(callback apimetric.Float64ObserverCallback) observerCallback { + if callback == nil { + return func(result observerResult) {} + } + return func(result observerResult) { + typeSafeResult := float64ObserverResult{ + result: result, + } + callback(typeSafeResult) + } +} + +func (m *Meter) newObserver(name string, callback observerCallback, numberKind core.NumberKind, oos ...apimetric.ObserverOptionApplier) *Observer { + opts := apimetric.Options{} + apimetric.ApplyObserverOptions(&opts, oos...) + obs := &Observer{ + Instrument: &Instrument{ + Name: name, + Kind: KindObserver, + NumberKind: numberKind, + Opts: opts, + }, + Meter: m, + Dead: false, + callback: callback, + } + m.Observers = append(m.Observers, obs) + return obs +} + func (m *Meter) RecordBatch(ctx context.Context, labels apimetric.LabelSet, measurements ...apimetric.Measurement) { ourLabelSet := labels.(*LabelSet) mm := make([]Measurement, len(measurements)) @@ -229,3 +327,14 @@ func (m *Meter) recordMockBatch(ctx context.Context, labelSet *LabelSet, measure Measurements: measurements, }) } + +func (m *Meter) RunObservers() { + for _, observer := range m.Observers { + if observer.Dead { + continue + } + observer.callback(observerResult{ + instrument: observer.Instrument, + }) + } +} diff --git a/sdk/export/metric/kind_string.go b/sdk/export/metric/kind_string.go index e2d2b88f466..d5f388694a8 100644 --- a/sdk/export/metric/kind_string.go +++ b/sdk/export/metric/kind_string.go @@ -11,11 +11,12 @@ func _() { _ = x[CounterKind-0] _ = x[GaugeKind-1] _ = x[MeasureKind-2] + _ = x[ObserverKind-3] } -const _Kind_name = "CounterKindGaugeKindMeasureKind" +const _Kind_name = "CounterKindGaugeKindMeasureKindObserverKind" -var _Kind_index = [...]uint8{0, 11, 20, 31} +var _Kind_index = [...]uint8{0, 11, 20, 31, 43} func (i Kind) String() string { if i < 0 || i >= Kind(len(_Kind_index)-1) { diff --git a/sdk/export/metric/metric.go b/sdk/export/metric/metric.go index dc8e5f7182c..f4cb2042100 100644 --- a/sdk/export/metric/metric.go +++ b/sdk/export/metric/metric.go @@ -295,6 +295,9 @@ const ( // Measure kind indicates a measure instrument. MeasureKind + + // Observer kind indicates an observer instrument + ObserverKind ) // Descriptor describes a metric instrument to the exporter. diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index 03fa6eb5e7b..44d2eb3558b 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -52,6 +52,8 @@ func (*benchFixture) AggregatorFor(descriptor *export.Descriptor) export.Aggrega return counter.New() case export.GaugeKind: return gauge.New() + case export.ObserverKind: + fallthrough case export.MeasureKind: if strings.HasSuffix(descriptor.Name(), "minmaxsumcount") { return minmaxsumcount.New(descriptor) @@ -357,6 +359,89 @@ func benchmarkFloat64MeasureHandleAdd(b *testing.B, name string) { } } +// Observers + +func BenchmarkObserverRegistration(b *testing.B) { + fix := newFixture(b) + names := make([]string, 0, b.N) + for i := 0; i < b.N; i++ { + names = append(names, fmt.Sprintf("test.observer.%d", i)) + } + cb := func(result metric.Int64ObserverResult) {} + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + fix.sdk.RegisterInt64Observer(names[i], cb) + } +} + +func BenchmarkObserverRegistrationUnregistration(b *testing.B) { + fix := newFixture(b) + names := make([]string, 0, b.N) + for i := 0; i < b.N; i++ { + names = append(names, fmt.Sprintf("test.observer.%d", i)) + } + cb := func(result metric.Int64ObserverResult) {} + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + fix.sdk.RegisterInt64Observer(names[i], cb).Unregister() + } +} + +func BenchmarkObserverRegistrationUnregistrationBatched(b *testing.B) { + fix := newFixture(b) + names := make([]string, 0, b.N) + for i := 0; i < b.N; i++ { + names = append(names, fmt.Sprintf("test.observer.%d", i)) + } + observers := make([]metric.Int64Observer, 0, b.N) + cb := func(result metric.Int64ObserverResult) {} + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + observers = append(observers, fix.sdk.RegisterInt64Observer(names[i], cb)) + } + for i := 0; i < b.N; i++ { + observers[i].Unregister() + } +} + +func BenchmarkObserverObservationInt64(b *testing.B) { + ctx := context.Background() + fix := newFixture(b) + labs := fix.sdk.Labels(makeLabels(1)...) + _ = fix.sdk.RegisterInt64Observer("test.observer", func(result metric.Int64ObserverResult) { + b.StartTimer() + defer b.StopTimer() + for i := 0; i < b.N; i++ { + result.Observe((int64)(i), labs) + } + }) + b.StopTimer() + b.ResetTimer() + fix.sdk.Collect(ctx) +} + +func BenchmarkObserverObservationFloat64(b *testing.B) { + ctx := context.Background() + fix := newFixture(b) + labs := fix.sdk.Labels(makeLabels(1)...) + _ = fix.sdk.RegisterFloat64Observer("test.observer", func(result metric.Float64ObserverResult) { + b.StartTimer() + defer b.StopTimer() + for i := 0; i < b.N; i++ { + result.Observe((float64)(i), labs) + } + }) + b.StopTimer() + b.ResetTimer() + fix.sdk.Collect(ctx) +} + // MaxSumCount func BenchmarkInt64MaxSumCountAdd(b *testing.B) { diff --git a/sdk/metric/doc.go b/sdk/metric/doc.go index 00071a3233e..17784e35a5a 100644 --- a/sdk/metric/doc.go +++ b/sdk/metric/doc.go @@ -18,10 +18,10 @@ supports configurable metrics export behavior through a collection of export interfaces that support various export strategies, described below. The metric.Meter API consists of methods for constructing each of the -basic kinds of metric instrument. There are six types of instrument -available to the end user, comprised of three basic kinds of metric -instrument (Counter, Gauge, Measure) crossed with two kinds of number -(int64, float64). +basic kinds of metric instrument. There are eight types of instrument +available to the end user, comprised of four basic kinds of metric +instrument (Counter, Gauge, Measure, Observer) crossed with two kinds +of number (int64, float64). The API assists the SDK by consolidating the variety of metric instruments into a narrower interface, allowing the SDK to avoid repetition of @@ -31,17 +31,25 @@ numerical value. To this end, the API uses a core.Number type to represent either an int64 or a float64, depending on the instrument's definition. A single -implementation interface is used for instruments, metric.InstrumentImpl, -and a single implementation interface is used for handles, -metric.HandleImpl. - -There are three entry points for events in the Metrics API: via instrument -handles, via direct instrument calls, and via BatchRecord. The SDK is -designed with handles as the primary entry point, the other two entry -points are implemented in terms of short-lived handles. For example, the -implementation of a direct call allocates a handle, operates on the -handle, and releases the handle. Similarly, the implementation of -RecordBatch uses a short-lived handle for each measurement in the batch. +implementation interface is used for counter, gauge and measure +instruments, metric.InstrumentImpl, and a single implementation interface +is used for their handles, metric.HandleImpl. For observers, the API +defines interfaces, for which the SDK provides an implementation. + +There are four entry points for events in the Metrics API - three for +synchronous instruments (counters, gauges and measures) and one for +asynchronous instruments (observers). The entry points for synchronous +instruments are: via instrument handles, via direct instrument calls, and +via BatchRecord. The SDK is designed with handles as the primary entry +point, the other two entry points are implemented in terms of short-lived +handles. For example, the implementation of a direct call allocates a +handle, operates on the handle, and releases the handle. Similarly, the +implementation of RecordBatch uses a short-lived handle for each +measurement in the batch. The entry point for asynchronous instruments is +via observer callbacks. Observer callbacks behave like a set of instrument +handles - one for each observation for a distinct label set. The observer +handles are alive as long as they are used. If the callback stops +reporting values for a certain label set, the associated handle is dropped. Internal Structure @@ -51,6 +59,10 @@ user-level code or a short-lived device, there exists an internal record managed by the SDK. Each internal record corresponds to a specific instrument and label set combination. +Each observer also has its own kind of record stored in the SDK. This +record contains a set of recorders for every specific label set used in the +callback. + A sync.Map maintains the mapping of current instruments and label sets to internal records. To create a new handle, the SDK consults the Map to locate an existing record, otherwise it constructs a new record. The SDK @@ -61,31 +73,18 @@ from the user's perspective. Metric collection is performed via a single-threaded call to Collect that sweeps through all records in the SDK, checkpointing their state. When a record is discovered that has no references and has not been updated since -the prior collection pass, it is marked for reclamation and removed from -the Map. There exists, at this moment, a race condition since another -goroutine could, in the same instant, obtain a reference to the handle. - -The SDK is designed to tolerate this sort of race condition, in the name -of reducing lock contention. It is possible for more than one record with -identical instrument and label set to exist simultaneously, though only -one can be linked from the Map at a time. To avoid lost updates, the SDK -maintains two additional linked lists of records, one managed by the -collection code path and one managed by the instrumentation code path. +the prior collection pass, it is removed from the Map. The SDK maintains a current epoch number, corresponding to the number of -completed collections. Each record contains the last epoch during which -it was collected and updated. These variables allow the collection code -path to detect stale records while allowing the instrumentation code path -to detect potential reclamations. When the instrumentation code path -detects a potential reclamation, it adds itself to the second linked list, -where records are saved from reclamation. - -Each record has an associated aggregator, which maintains the current -state resulting from all metric events since its last checkpoint. -Aggregators may be lock-free or they may use locking, but they should -expect to be called concurrently. Because of the tolerated race condition -described above, aggregators must be capable of merging with another -aggregator of the same type. +completed collections. Each recorder of an observer record contains the +last epoch during which it was updated. This variable allows the collection +code path to detect stale recorders and remove them. + +Each record of a handle and recorder of an observer has an associated +aggregator, which maintains the current state resulting from all metric +events since its last checkpoint. Aggregators may be lock-free or they may +use locking, but they should expect to be called concurrently. Aggregators +must be capable of merging with another aggregator of the same type. Export Pipeline diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index ce98abf52d5..a8d57a5fa89 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -43,6 +43,9 @@ type ( // current maps `mapkey` to *record. current sync.Map + // observers is a set of `*observer` instances + observers sync.Map + // empty is the (singleton) result of Labels() // w/ zero arguments. empty labels @@ -115,16 +118,121 @@ type ( recorder export.Aggregator } + observerResult struct { + observer *observer + } + + int64ObserverResult struct { + result observerResult + } + + float64ObserverResult struct { + result observerResult + } + + observerCallback func(result observerResult) + + observer struct { + meter *SDK + descriptor *export.Descriptor + // recorders maps encoded labelset to the pair of + // labelset and recorder + recorders map[string]labeledRecorder + callback observerCallback + } + + labeledRecorder struct { + recorder export.Aggregator + labels *labels + modifiedEpoch int64 + } + + int64Observer struct { + observer *observer + } + + float64Observer struct { + observer *observer + } + ErrorHandler func(error) ) var ( - _ api.Meter = &SDK{} - _ api.LabelSet = &labels{} - _ api.InstrumentImpl = &instrument{} - _ api.BoundInstrumentImpl = &record{} + _ api.Meter = &SDK{} + _ api.LabelSet = &labels{} + _ api.InstrumentImpl = &instrument{} + _ api.BoundInstrumentImpl = &record{} + _ api.Int64Observer = int64Observer{} + _ api.Float64Observer = float64Observer{} + _ api.Int64ObserverResult = int64ObserverResult{} + _ api.Float64ObserverResult = float64ObserverResult{} ) +func (r observerResult) observe(number core.Number, ls api.LabelSet) { + r.observer.recordOne(number, ls) +} + +func (o *observer) recordOne(number core.Number, ls api.LabelSet) { + if err := aggregator.RangeTest(number, o.descriptor); err != nil { + o.meter.errorHandler(err) + return + } + recorder := o.getRecorder(ls) + if recorder == nil { + // The instrument is disabled according to the + // AggregationSelector. + return + } + if err := recorder.Update(context.Background(), number, o.descriptor); err != nil { + o.meter.errorHandler(err) + return + } +} + +func (o *observer) getRecorder(ls api.LabelSet) export.Aggregator { + labels := o.meter.labsFor(ls) + lrec, ok := o.recorders[labels.encoded] + if ok { + lrec.modifiedEpoch = o.meter.currentEpoch + o.recorders[labels.encoded] = lrec + return lrec.recorder + } + rec := o.meter.batcher.AggregatorFor(o.descriptor) + if o.recorders == nil { + o.recorders = make(map[string]labeledRecorder) + } + // This may store nil recorder in the map, thus disabling the + // observer for the labelset for good. This is intentional, + // but will be revisited later. + o.recorders[labels.encoded] = labeledRecorder{ + recorder: rec, + labels: labels, + modifiedEpoch: o.meter.currentEpoch, + } + return rec +} + +func (o *observer) unregister() { + o.meter.observers.Delete(o) +} + +func (r int64ObserverResult) Observe(value int64, labels api.LabelSet) { + r.result.observe(core.NewInt64Number(value), labels) +} + +func (r float64ObserverResult) Observe(value float64, labels api.LabelSet) { + r.result.observe(core.NewFloat64Number(value), labels) +} + +func (o int64Observer) Unregister() { + o.observer.unregister() +} + +func (o float64Observer) Unregister() { + o.observer.unregister() +} + func (i *instrument) Meter() api.Meter { return i.meter } @@ -275,8 +383,8 @@ func (m *SDK) labsFor(ls api.LabelSet) *labels { return &m.empty } -func (m *SDK) newInstrument(name string, metricKind export.Kind, numberKind core.NumberKind, opts *api.Options) *instrument { - descriptor := export.NewDescriptor( +func newDescriptor(name string, metricKind export.Kind, numberKind core.NumberKind, opts *api.Options) *export.Descriptor { + return export.NewDescriptor( name, metricKind, opts.Keys, @@ -284,6 +392,10 @@ func (m *SDK) newInstrument(name string, metricKind export.Kind, numberKind core opts.Unit, numberKind, opts.Alternate) +} + +func (m *SDK) newInstrument(name string, metricKind export.Kind, numberKind core.NumberKind, opts *api.Options) *instrument { + descriptor := newDescriptor(name, metricKind, numberKind, opts) return &instrument{ descriptor: descriptor, meter: m, @@ -332,8 +444,66 @@ func (m *SDK) NewFloat64Measure(name string, mos ...api.MeasureOptionApplier) ap return api.WrapFloat64MeasureInstrument(m.newMeasureInstrument(name, core.Float64NumberKind, mos...)) } -// Collect traverses the list of active records and exports data for -// each active instrument. Collect() may not be called concurrently. +func (m *SDK) RegisterInt64Observer(name string, callback api.Int64ObserverCallback, oos ...api.ObserverOptionApplier) api.Int64Observer { + if callback == nil { + return api.NoopMeter{}.RegisterInt64Observer("", nil) + } + opts := api.Options{} + api.ApplyObserverOptions(&opts, oos...) + descriptor := newDescriptor(name, export.ObserverKind, core.Int64NumberKind, &opts) + cb := wrapInt64ObserverCallback(callback) + obs := m.newObserver(descriptor, cb) + return int64Observer{ + observer: obs, + } +} + +func wrapInt64ObserverCallback(callback api.Int64ObserverCallback) observerCallback { + return func(result observerResult) { + typeSafeResult := int64ObserverResult{ + result: result, + } + callback(typeSafeResult) + } +} + +func (m *SDK) RegisterFloat64Observer(name string, callback api.Float64ObserverCallback, oos ...api.ObserverOptionApplier) api.Float64Observer { + if callback == nil { + return api.NoopMeter{}.RegisterFloat64Observer("", nil) + } + opts := api.Options{} + api.ApplyObserverOptions(&opts, oos...) + descriptor := newDescriptor(name, export.ObserverKind, core.Float64NumberKind, &opts) + cb := wrapFloat64ObserverCallback(callback) + obs := m.newObserver(descriptor, cb) + return float64Observer{ + observer: obs, + } +} + +func wrapFloat64ObserverCallback(callback api.Float64ObserverCallback) observerCallback { + return func(result observerResult) { + typeSafeResult := float64ObserverResult{ + result: result, + } + callback(typeSafeResult) + } +} + +func (m *SDK) newObserver(descriptor *export.Descriptor, callback observerCallback) *observer { + obs := &observer{ + meter: m, + descriptor: descriptor, + recorders: nil, + callback: callback, + } + m.observers.Store(obs, nil) + return obs +} + +// Collect traverses the list of active records and observers and +// exports data for each active instrument. Collect() may not be +// called concurrently. // // During the collection pass, the export.Batcher will receive // one Export() call per current aggregation. @@ -343,6 +513,13 @@ func (m *SDK) Collect(ctx context.Context) int { m.collectLock.Lock() defer m.collectLock.Unlock() + checkpointed := m.collectRecords(ctx) + checkpointed += m.collectObservers(ctx) + m.currentEpoch++ + return checkpointed +} + +func (m *SDK) collectRecords(ctx context.Context) int { checkpointed := 0 m.current.Range(func(key interface{}, value interface{}) bool { @@ -358,25 +535,66 @@ func (m *SDK) Collect(ctx context.Context) int { // TODO: Reconsider this logic. if inuse.refMapped.inUse() || atomic.LoadInt64(&inuse.modified) != 0 { atomic.StoreInt64(&inuse.modified, 0) - checkpointed += m.checkpoint(ctx, inuse) + checkpointed += m.checkpointRecord(ctx, inuse) } // Always continue to iterate over the entire map. return true }) - m.currentEpoch++ return checkpointed } -func (m *SDK) checkpoint(ctx context.Context, r *record) int { - if r.recorder == nil { +func (m *SDK) collectObservers(ctx context.Context) int { + checkpointed := 0 + + m.observers.Range(func(key, value interface{}) bool { + obs := key.(*observer) + result := observerResult{ + observer: obs, + } + obs.callback(result) + checkpointed += m.checkpointObserver(ctx, obs) + return true + }) + + return checkpointed +} + +func (m *SDK) checkpointRecord(ctx context.Context, r *record) int { + return m.checkpoint(ctx, r.descriptor, r.recorder, r.labels) +} + +func (m *SDK) checkpointObserver(ctx context.Context, obs *observer) int { + if len(obs.recorders) == 0 { return 0 } - r.recorder.Checkpoint(ctx, r.descriptor) - labels := export.NewLabels(r.labels.sorted, r.labels.encoded, m.labelEncoder) - err := m.batcher.Process(ctx, export.NewRecord(r.descriptor, labels, r.recorder)) + checkpointed := 0 + for encodedLabels, lrec := range obs.recorders { + epochDiff := m.currentEpoch - lrec.modifiedEpoch + if epochDiff == 0 { + checkpointed += m.checkpoint(ctx, obs.descriptor, lrec.recorder, lrec.labels) + } else if epochDiff > 1 { + // This is second collection cycle with no + // observations for this labelset. Remove the + // recorder. + delete(obs.recorders, encodedLabels) + } + } + if len(obs.recorders) == 0 { + obs.recorders = nil + } + return checkpointed +} +func (m *SDK) checkpoint(ctx context.Context, descriptor *export.Descriptor, recorder export.Aggregator, labels *labels) int { + if recorder == nil { + return 0 + } + recorder.Checkpoint(ctx, descriptor) + exportLabels := export.NewLabels(labels.sorted, labels.encoded, m.labelEncoder) + exportRecord := export.NewRecord(descriptor, exportLabels, recorder) + err := m.batcher.Process(ctx, exportRecord) if err != nil { m.errorHandler(err) } diff --git a/sdk/metric/selector/simple/simple.go b/sdk/metric/selector/simple/simple.go index d86005b0365..a60dc3fbd71 100644 --- a/sdk/metric/selector/simple/simple.go +++ b/sdk/metric/selector/simple/simple.go @@ -38,19 +38,19 @@ var ( ) // NewWithInexpensiveMeasure returns a simple aggregation selector -// that uses counter, gauge, and minmaxsumcount aggregators for the three -// kinds of metric. This selector is faster and uses less memory than -// the others because minmaxsumcount does not aggregate quantile +// that uses counter, gauge, and minmaxsumcount aggregators for the +// four kinds of metric. This selector is faster and uses less memory +// than the others because minmaxsumcount does not aggregate quantile // information. func NewWithInexpensiveMeasure() export.AggregationSelector { return selectorInexpensive{} } // NewWithSketchMeasure returns a simple aggregation selector that -// uses counter, gauge, and ddsketch aggregators for the three kinds -// of metric. This selector uses more cpu and memory than the +// uses counter, gauge, and ddsketch aggregators for the four kinds of +// metric. This selector uses more cpu and memory than the // NewWithInexpensiveMeasure because it uses one DDSketch per distinct -// measure and labelset. +// measure/observer and labelset. func NewWithSketchMeasure(config *ddsketch.Config) export.AggregationSelector { return selectorSketch{ config: config, @@ -58,7 +58,7 @@ func NewWithSketchMeasure(config *ddsketch.Config) export.AggregationSelector { } // NewWithExactMeasure returns a simple aggregation selector that uses -// counter, gauge, and array behavior for the three kinds of metric. +// counter, gauge, and array aggregators for the four kinds of metric. // This selector uses more memory than the NewWithSketchMeasure // because it aggregates an array of all values, therefore is able to // compute exact quantiles. @@ -70,6 +70,8 @@ func (selectorInexpensive) AggregatorFor(descriptor *export.Descriptor) export.A switch descriptor.MetricKind() { case export.GaugeKind: return gauge.New() + case export.ObserverKind: + fallthrough case export.MeasureKind: return minmaxsumcount.New(descriptor) default: @@ -81,6 +83,8 @@ func (s selectorSketch) AggregatorFor(descriptor *export.Descriptor) export.Aggr switch descriptor.MetricKind() { case export.GaugeKind: return gauge.New() + case export.ObserverKind: + fallthrough case export.MeasureKind: return ddsketch.New(s.config, descriptor) default: @@ -92,6 +96,8 @@ func (selectorExact) AggregatorFor(descriptor *export.Descriptor) export.Aggrega switch descriptor.MetricKind() { case export.GaugeKind: return gauge.New() + case export.ObserverKind: + fallthrough case export.MeasureKind: return array.New() default: diff --git a/sdk/metric/selector/simple/simple_test.go b/sdk/metric/selector/simple/simple_test.go index 4e03b2c3daf..e1667195894 100644 --- a/sdk/metric/selector/simple/simple_test.go +++ b/sdk/metric/selector/simple/simple_test.go @@ -30,9 +30,10 @@ import ( ) var ( - testGaugeDesc = export.NewDescriptor("gauge", export.GaugeKind, nil, "", "", core.Int64NumberKind, false) - testCounterDesc = export.NewDescriptor("counter", export.CounterKind, nil, "", "", core.Int64NumberKind, false) - testMeasureDesc = export.NewDescriptor("measure", export.MeasureKind, nil, "", "", core.Int64NumberKind, false) + testGaugeDesc = export.NewDescriptor("gauge", export.GaugeKind, nil, "", "", core.Int64NumberKind, false) + testCounterDesc = export.NewDescriptor("counter", export.CounterKind, nil, "", "", core.Int64NumberKind, false) + testMeasureDesc = export.NewDescriptor("measure", export.MeasureKind, nil, "", "", core.Int64NumberKind, false) + testObserverDesc = export.NewDescriptor("observer", export.ObserverKind, nil, "", "", core.Int64NumberKind, false) ) func TestInexpensiveMeasure(t *testing.T) { @@ -40,6 +41,7 @@ func TestInexpensiveMeasure(t *testing.T) { require.NotPanics(t, func() { _ = inex.AggregatorFor(testGaugeDesc).(*gauge.Aggregator) }) require.NotPanics(t, func() { _ = inex.AggregatorFor(testCounterDesc).(*counter.Aggregator) }) require.NotPanics(t, func() { _ = inex.AggregatorFor(testMeasureDesc).(*minmaxsumcount.Aggregator) }) + require.NotPanics(t, func() { _ = inex.AggregatorFor(testObserverDesc).(*minmaxsumcount.Aggregator) }) } func TestSketchMeasure(t *testing.T) { @@ -47,6 +49,7 @@ func TestSketchMeasure(t *testing.T) { require.NotPanics(t, func() { _ = sk.AggregatorFor(testGaugeDesc).(*gauge.Aggregator) }) require.NotPanics(t, func() { _ = sk.AggregatorFor(testCounterDesc).(*counter.Aggregator) }) require.NotPanics(t, func() { _ = sk.AggregatorFor(testMeasureDesc).(*ddsketch.Aggregator) }) + require.NotPanics(t, func() { _ = sk.AggregatorFor(testObserverDesc).(*ddsketch.Aggregator) }) } func TestExactMeasure(t *testing.T) { @@ -54,4 +57,5 @@ func TestExactMeasure(t *testing.T) { require.NotPanics(t, func() { _ = ex.AggregatorFor(testGaugeDesc).(*gauge.Aggregator) }) require.NotPanics(t, func() { _ = ex.AggregatorFor(testCounterDesc).(*counter.Aggregator) }) require.NotPanics(t, func() { _ = ex.AggregatorFor(testMeasureDesc).(*array.Aggregator) }) + require.NotPanics(t, func() { _ = ex.AggregatorFor(testObserverDesc).(*array.Aggregator) }) }