From a202f161005bf092fd23265b1577e7f8fee155d2 Mon Sep 17 00:00:00 2001 From: Rahul Patel Date: Thu, 5 Mar 2020 12:15:30 -0800 Subject: [PATCH] Add observer metric (#474) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * wip: observers * wip: float observers * fix copy pasta * wip: rework observers in sdk * small fix in global meter * wip: aggregators and selectors * wip: monotonicity option for observers * some refactor * wip: docs needs more package docs (especially for api/metric and sdk/metric) * fix ci * Fix copy-pasta in docs Co-Authored-By: Mauricio Vásquez * recycle unused recorders in observers if a recorder for a labelset is unused for a second collection cycle in a row, drop it * unregister * thread-safe set callback * Fix docs * Revert "wip: aggregators and selectors" This reverts commit 37b7d05aed5dc90f6d5593325b6eb77494e21736. * update selector * tests * Rework number equality Compare concrete numbers, so we can get actual numbers in the error message when they are not equal, not some uint64 representation. This also uses InDelta for comparing floats. * Ensure that Observers are registered in the same order * Run observers in fixed order So the tests can be reproducible - iterating a map made the order of measurements random. * Ensure the proper alignment of the delegates This wasn't checked at all. After adding the checks, the test-386 failed. * Small tweaks to the global meter test * Ensure proper alignment of the callback pointer test-386 was complaining about it * update docs * update a TODO * address review issues * drop SetCallback Co-authored-by: Mauricio Vásquez Co-authored-by: Rahul Patel --- api/global/internal/internal_test.go | 26 +++ api/global/internal/meter.go | 184 +++++++++++++++- api/global/internal/meter_test.go | 99 +++++++-- api/metric/api.go | 101 +++++++-- api/metric/api_test.go | 159 ++++++++++++++ api/metric/doc.go | 42 ++-- api/metric/noop.go | 18 ++ api/metric/sdkhelpers.go | 8 + internal/metric/mock.go | 117 +++++++++- sdk/export/metric/kind_string.go | 5 +- sdk/export/metric/metric.go | 3 + sdk/metric/benchmark_test.go | 85 ++++++++ sdk/metric/doc.go | 75 ++++--- sdk/metric/sdk.go | 248 ++++++++++++++++++++-- sdk/metric/selector/simple/simple.go | 20 +- sdk/metric/selector/simple/simple_test.go | 10 +- 16 files changed, 1074 insertions(+), 126 deletions(-) create mode 100644 api/global/internal/internal_test.go diff --git a/api/global/internal/internal_test.go b/api/global/internal/internal_test.go new file mode 100644 index 00000000000..8dfb78fd899 --- /dev/null +++ b/api/global/internal/internal_test.go @@ -0,0 +1,26 @@ +package internal_test + +import ( + "os" + "testing" + + "go.opentelemetry.io/otel/api/global/internal" + ottest "go.opentelemetry.io/otel/internal/testing" +) + +// Ensure struct alignment prior to running tests. +func TestMain(m *testing.M) { + fieldsMap := internal.AtomicFieldOffsets() + fields := make([]ottest.FieldOffset, 0, len(fieldsMap)) + for name, offset := range fieldsMap { + fields = append(fields, ottest.FieldOffset{ + Name: name, + Offset: offset, + }) + } + if !ottest.Aligned8Byte(fields, os.Stderr) { + os.Exit(1) + } + + os.Exit(m.Run()) +} diff --git a/api/global/internal/meter.go b/api/global/internal/meter.go index e15d75feda6..2c4f33140de 100644 --- a/api/global/internal/meter.go +++ b/api/global/internal/meter.go @@ -40,44 +40,76 @@ const ( ) type meterProvider struct { - lock sync.Mutex - meters []*meter delegate metric.Provider + + lock sync.Mutex + meters []*meter } type meter struct { + delegate unsafe.Pointer // (*metric.Meter) + provider *meterProvider name string - lock sync.Mutex - instruments []*instImpl - - delegate unsafe.Pointer // (*metric.Meter) + lock sync.Mutex + instruments []*instImpl + liveObservers map[*obsImpl]struct{} + // orderedObservers slice contains observers in their order of + // registration. It may also contain unregistered + // observers. The liveObservers map should be consulted to + // check if the observer is registered or not. + orderedObservers []*obsImpl } type instImpl struct { + delegate unsafe.Pointer // (*metric.InstrumentImpl) + name string mkind metricKind nkind core.NumberKind opts interface{} +} - delegate unsafe.Pointer // (*metric.InstrumentImpl) +type obsImpl struct { + delegate unsafe.Pointer // (*metric.Int64Observer or *metric.Float64Observer) + + name string + nkind core.NumberKind + opts []metric.ObserverOptionApplier + meter *meter + callback interface{} +} + +type int64ObsImpl struct { + observer *obsImpl +} + +type float64ObsImpl struct { + observer *obsImpl +} + +// this is a common subset of the metric observers interfaces +type observerUnregister interface { + Unregister() } type labelSet struct { + delegate unsafe.Pointer // (* metric.LabelSet) + meter *meter value []core.KeyValue initialize sync.Once - delegate unsafe.Pointer // (* metric.LabelSet) } type instHandle struct { + delegate unsafe.Pointer // (*metric.HandleImpl) + inst *instImpl labels metric.LabelSet initialize sync.Once - delegate unsafe.Pointer // (*metric.HandleImpl) } var _ metric.Provider = &meterProvider{} @@ -86,6 +118,10 @@ var _ metric.LabelSet = &labelSet{} var _ metric.LabelSetDelegate = &labelSet{} var _ metric.InstrumentImpl = &instImpl{} var _ metric.BoundInstrumentImpl = &instHandle{} +var _ metric.Int64Observer = int64ObsImpl{} +var _ metric.Float64Observer = float64ObsImpl{} +var _ observerUnregister = (metric.Int64Observer)(nil) +var _ observerUnregister = (metric.Float64Observer)(nil) // Provider interface and delegation @@ -130,6 +166,13 @@ func (m *meter) setDelegate(provider metric.Provider) { inst.setDelegate(*d) } m.instruments = nil + for _, obs := range m.orderedObservers { + if _, ok := m.liveObservers[obs]; ok { + obs.setDelegate(*d) + } + } + m.liveObservers = nil + m.orderedObservers = nil } func (m *meter) newInst(name string, mkind metricKind, nkind core.NumberKind, opts interface{}) metric.InstrumentImpl { @@ -203,6 +246,68 @@ func (bound *instHandle) Unbind() { (*implPtr).Unbind() } +// Any Observer delegation + +func (obs *obsImpl) setDelegate(d metric.Meter) { + if obs.nkind == core.Int64NumberKind { + obs.setInt64Delegate(d) + } else { + obs.setFloat64Delegate(d) + } +} + +func (obs *obsImpl) unregister() { + unreg := obs.getUnregister() + if unreg != nil { + unreg.Unregister() + return + } + obs.meter.lock.Lock() + defer obs.meter.lock.Unlock() + delete(obs.meter.liveObservers, obs) + if len(obs.meter.liveObservers) == 0 { + obs.meter.liveObservers = nil + obs.meter.orderedObservers = nil + } +} + +func (obs *obsImpl) getUnregister() observerUnregister { + ptr := atomic.LoadPointer(&obs.delegate) + if ptr == nil { + return nil + } + if obs.nkind == core.Int64NumberKind { + return *(*metric.Int64Observer)(ptr) + } + return *(*metric.Float64Observer)(ptr) +} + +// Int64Observer delegation + +func (obs *obsImpl) setInt64Delegate(d metric.Meter) { + obsPtr := new(metric.Int64Observer) + cb := obs.callback.(metric.Int64ObserverCallback) + *obsPtr = d.RegisterInt64Observer(obs.name, cb, obs.opts...) + atomic.StorePointer(&obs.delegate, unsafe.Pointer(obsPtr)) +} + +func (obs int64ObsImpl) Unregister() { + obs.observer.unregister() +} + +// Float64Observer delegation + +func (obs *obsImpl) setFloat64Delegate(d metric.Meter) { + obsPtr := new(metric.Float64Observer) + cb := obs.callback.(metric.Float64ObserverCallback) + *obsPtr = d.RegisterFloat64Observer(obs.name, cb, obs.opts...) + atomic.StorePointer(&obs.delegate, unsafe.Pointer(obsPtr)) +} + +func (obs float64ObsImpl) Unregister() { + obs.observer.unregister() +} + // Metric updates func (m *meter) RecordBatch(ctx context.Context, labels metric.LabelSet, measurements ...metric.Measurement) { @@ -296,3 +401,64 @@ func (m *meter) NewInt64Measure(name string, opts ...metric.MeasureOptionApplier func (m *meter) NewFloat64Measure(name string, opts ...metric.MeasureOptionApplier) metric.Float64Measure { return metric.WrapFloat64MeasureInstrument(m.newInst(name, measureKind, core.Float64NumberKind, opts)) } + +func (m *meter) RegisterInt64Observer(name string, callback metric.Int64ObserverCallback, oos ...metric.ObserverOptionApplier) metric.Int64Observer { + m.lock.Lock() + defer m.lock.Unlock() + + if meterPtr := (*metric.Meter)(atomic.LoadPointer(&m.delegate)); meterPtr != nil { + return (*meterPtr).RegisterInt64Observer(name, callback, oos...) + } + + obs := &obsImpl{ + name: name, + nkind: core.Int64NumberKind, + opts: oos, + meter: m, + callback: callback, + } + m.addObserver(obs) + return int64ObsImpl{ + observer: obs, + } +} + +func (m *meter) RegisterFloat64Observer(name string, callback metric.Float64ObserverCallback, oos ...metric.ObserverOptionApplier) metric.Float64Observer { + m.lock.Lock() + defer m.lock.Unlock() + + if meterPtr := (*metric.Meter)(atomic.LoadPointer(&m.delegate)); meterPtr != nil { + return (*meterPtr).RegisterFloat64Observer(name, callback, oos...) + } + + obs := &obsImpl{ + name: name, + nkind: core.Float64NumberKind, + opts: oos, + meter: m, + callback: callback, + } + m.addObserver(obs) + return float64ObsImpl{ + observer: obs, + } +} + +func (m *meter) addObserver(obs *obsImpl) { + if m.liveObservers == nil { + m.liveObservers = make(map[*obsImpl]struct{}) + } + m.liveObservers[obs] = struct{}{} + m.orderedObservers = append(m.orderedObservers, obs) +} + +func AtomicFieldOffsets() map[string]uintptr { + return map[string]uintptr{ + "meterProvider.delegate": unsafe.Offsetof(meterProvider{}.delegate), + "meter.delegate": unsafe.Offsetof(meter{}.delegate), + "instImpl.delegate": unsafe.Offsetof(instImpl{}.delegate), + "obsImpl.delegate": unsafe.Offsetof(obsImpl{}.delegate), + "labelSet.delegate": unsafe.Offsetof(labelSet{}.delegate), + "instHandle.delegate": unsafe.Offsetof(instHandle{}.delegate), + } +} diff --git a/api/global/internal/meter_test.go b/api/global/internal/meter_test.go index c9d0ac7482e..7360a2bbc58 100644 --- a/api/global/internal/meter_test.go +++ b/api/global/internal/meter_test.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/global/internal" "go.opentelemetry.io/otel/api/key" + "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/exporters/metric/stdout" metrictest "go.opentelemetry.io/otel/internal/metric" ) @@ -41,6 +42,16 @@ func TestDirect(t *testing.T) { measure.Record(ctx, 1, labels1) measure.Record(ctx, 2, labels1) + _ = meter1.RegisterFloat64Observer("test.observer.float", func(result metric.Float64ObserverResult) { + result.Observe(1., labels1) + result.Observe(2., labels2) + }) + + _ = meter1.RegisterInt64Observer("test.observer.int", func(result metric.Int64ObserverResult) { + result.Observe(1, labels1) + result.Observe(2, labels2) + }) + second := meter2.NewFloat64Measure("test.second") second.Record(ctx, 1, labels3) second.Record(ctx, 2, labels3) @@ -54,45 +65,86 @@ func TestDirect(t *testing.T) { second.Record(ctx, 3, labels3) mock := sdk.Meter("test1").(*metrictest.Meter) - require.Equal(t, 3, len(mock.MeasurementBatches)) + mock.RunObservers() + require.Len(t, mock.MeasurementBatches, 7) require.Equal(t, map[core.Key]core.Value{ lvals1.Key: lvals1.Value, }, mock.MeasurementBatches[0].LabelSet.Labels) - require.Equal(t, 1, len(mock.MeasurementBatches[0].Measurements)) - require.Equal(t, core.NewInt64Number(1), - mock.MeasurementBatches[0].Measurements[0].Number) + require.Len(t, mock.MeasurementBatches[0].Measurements, 1) + require.Equal(t, int64(1), + mock.MeasurementBatches[0].Measurements[0].Number.AsInt64()) require.Equal(t, "test.counter", mock.MeasurementBatches[0].Measurements[0].Instrument.Name) require.Equal(t, map[core.Key]core.Value{ lvals2.Key: lvals2.Value, }, mock.MeasurementBatches[1].LabelSet.Labels) - require.Equal(t, 1, len(mock.MeasurementBatches[1].Measurements)) - require.Equal(t, core.NewInt64Number(3), - mock.MeasurementBatches[1].Measurements[0].Number) + require.Len(t, mock.MeasurementBatches[1].Measurements, 1) + require.Equal(t, int64(3), + mock.MeasurementBatches[1].Measurements[0].Number.AsInt64()) require.Equal(t, "test.gauge", mock.MeasurementBatches[1].Measurements[0].Instrument.Name) require.Equal(t, map[core.Key]core.Value{ lvals1.Key: lvals1.Value, }, mock.MeasurementBatches[2].LabelSet.Labels) - require.Equal(t, 1, len(mock.MeasurementBatches[2].Measurements)) - require.Equal(t, core.NewFloat64Number(3), - mock.MeasurementBatches[2].Measurements[0].Number) + require.Len(t, mock.MeasurementBatches[2].Measurements, 1) + require.InDelta(t, float64(3), + mock.MeasurementBatches[2].Measurements[0].Number.AsFloat64(), + 0.01) require.Equal(t, "test.measure", mock.MeasurementBatches[2].Measurements[0].Instrument.Name) + require.Equal(t, map[core.Key]core.Value{ + lvals1.Key: lvals1.Value, + }, mock.MeasurementBatches[3].LabelSet.Labels) + require.Len(t, mock.MeasurementBatches[3].Measurements, 1) + require.InDelta(t, float64(1), + mock.MeasurementBatches[3].Measurements[0].Number.AsFloat64(), + 0.01) + require.Equal(t, "test.observer.float", + mock.MeasurementBatches[3].Measurements[0].Instrument.Name) + + require.Equal(t, map[core.Key]core.Value{ + lvals2.Key: lvals2.Value, + }, mock.MeasurementBatches[4].LabelSet.Labels) + require.Len(t, mock.MeasurementBatches[4].Measurements, 1) + require.InDelta(t, float64(2), + mock.MeasurementBatches[4].Measurements[0].Number.AsFloat64(), + 0.01) + require.Equal(t, "test.observer.float", + mock.MeasurementBatches[4].Measurements[0].Instrument.Name) + + require.Equal(t, map[core.Key]core.Value{ + lvals1.Key: lvals1.Value, + }, mock.MeasurementBatches[5].LabelSet.Labels) + require.Len(t, mock.MeasurementBatches[5].Measurements, 1) + require.Equal(t, int64(1), + mock.MeasurementBatches[5].Measurements[0].Number.AsInt64()) + require.Equal(t, "test.observer.int", + mock.MeasurementBatches[5].Measurements[0].Instrument.Name) + + require.Equal(t, map[core.Key]core.Value{ + lvals2.Key: lvals2.Value, + }, mock.MeasurementBatches[6].LabelSet.Labels) + require.Len(t, mock.MeasurementBatches[6].Measurements, 1) + require.Equal(t, int64(2), + mock.MeasurementBatches[6].Measurements[0].Number.AsInt64()) + require.Equal(t, "test.observer.int", + mock.MeasurementBatches[6].Measurements[0].Instrument.Name) + // This tests the second Meter instance mock = sdk.Meter("test2").(*metrictest.Meter) - require.Equal(t, 1, len(mock.MeasurementBatches)) + require.Len(t, mock.MeasurementBatches, 1) require.Equal(t, map[core.Key]core.Value{ lvals3.Key: lvals3.Value, }, mock.MeasurementBatches[0].LabelSet.Labels) - require.Equal(t, 1, len(mock.MeasurementBatches[0].Measurements)) - require.Equal(t, core.NewFloat64Number(3), - mock.MeasurementBatches[0].Measurements[0].Number) + require.Len(t, mock.MeasurementBatches[0].Measurements, 1) + require.InDelta(t, float64(3), + mock.MeasurementBatches[0].Measurements[0].Number.AsFloat64(), + 0.01) require.Equal(t, "test.second", mock.MeasurementBatches[0].Measurements[0].Instrument.Name) } @@ -138,8 +190,9 @@ func TestBound(t *testing.T) { lvals1.Key: lvals1.Value, }, mock.MeasurementBatches[0].LabelSet.Labels) require.Equal(t, 1, len(mock.MeasurementBatches[0].Measurements)) - require.Equal(t, core.NewFloat64Number(1), - mock.MeasurementBatches[0].Measurements[0].Number) + require.InDelta(t, float64(1), + mock.MeasurementBatches[0].Measurements[0].Number.AsFloat64(), + 0.01) require.Equal(t, "test.counter", mock.MeasurementBatches[0].Measurements[0].Instrument.Name) @@ -147,8 +200,9 @@ func TestBound(t *testing.T) { lvals2.Key: lvals2.Value, }, mock.MeasurementBatches[1].LabelSet.Labels) require.Equal(t, 1, len(mock.MeasurementBatches[1].Measurements)) - require.Equal(t, core.NewFloat64Number(3), - mock.MeasurementBatches[1].Measurements[0].Number) + require.InDelta(t, float64(3), + mock.MeasurementBatches[1].Measurements[0].Number.AsFloat64(), + 0.01) require.Equal(t, "test.gauge", mock.MeasurementBatches[1].Measurements[0].Instrument.Name) @@ -156,8 +210,8 @@ func TestBound(t *testing.T) { lvals1.Key: lvals1.Value, }, mock.MeasurementBatches[2].LabelSet.Labels) require.Equal(t, 1, len(mock.MeasurementBatches[2].Measurements)) - require.Equal(t, core.NewInt64Number(3), - mock.MeasurementBatches[2].Measurements[0].Number) + require.Equal(t, int64(3), + mock.MeasurementBatches[2].Measurements[0].Number.AsInt64()) require.Equal(t, "test.measure", mock.MeasurementBatches[2].Measurements[0].Instrument.Name) @@ -185,9 +239,14 @@ func TestUnbind(t *testing.T) { measure := glob.NewInt64Measure("test.measure") boundM := measure.Bind(labels1) + observerInt := glob.RegisterInt64Observer("test.observer.int", nil) + observerFloat := glob.RegisterFloat64Observer("test.observer.float", nil) + boundC.Unbind() boundG.Unbind() boundM.Unbind() + observerInt.Unregister() + observerFloat.Unregister() } func TestDefaultSDK(t *testing.T) { diff --git a/api/metric/api.go b/api/metric/api.go index 2e546c697a4..9ccec6b8e38 100644 --- a/api/metric/api.go +++ b/api/metric/api.go @@ -81,6 +81,14 @@ type MeasureOptionApplier interface { ApplyMeasureOption(*Options) } +// ObserverOptionApplier is an interface for applying metric options +// that are valid only for observer metrics. +type ObserverOptionApplier interface { + // ApplyObserverOption is used to make some general or + // observer-specific changes in the Options. + ApplyObserverOption(*Options) +} + // Measurement is used for reporting a batch of metric // values. Instances of this type should be created by instruments // (e.g., Int64Counter.Measurement()). @@ -127,10 +135,51 @@ type Meter interface { // a given name and customized with passed options. NewFloat64Measure(name string, mos ...MeasureOptionApplier) Float64Measure + // RegisterInt64Observer creates a new integral observer with a + // given name, running a given callback, and customized with passed + // options. Callback can be nil. + RegisterInt64Observer(name string, callback Int64ObserverCallback, oos ...ObserverOptionApplier) Int64Observer + // RegisterFloat64Observer creates a new floating point observer + // with a given name, running a given callback, and customized with + // passed options. Callback can be nil. + RegisterFloat64Observer(name string, callback Float64ObserverCallback, oos ...ObserverOptionApplier) Float64Observer + // RecordBatch atomically records a batch of measurements. RecordBatch(context.Context, LabelSet, ...Measurement) } +// Int64ObserverResult is an interface for reporting integral +// observations. +type Int64ObserverResult interface { + Observe(value int64, labels LabelSet) +} + +// Float64ObserverResult is an interface for reporting floating point +// observations. +type Float64ObserverResult interface { + Observe(value float64, labels LabelSet) +} + +// Int64ObserverCallback is a type of callback that integral +// observers run. +type Int64ObserverCallback func(result Int64ObserverResult) + +// Float64ObserverCallback is a type of callback that floating point +// observers run. +type Float64ObserverCallback func(result Float64ObserverResult) + +// Int64Observer is a metric that captures a set of int64 values at a +// point in time. +type Int64Observer interface { + Unregister() +} + +// Float64Observer is a metric that captures a set of float64 values +// at a point in time. +type Float64Observer interface { + Unregister() +} + // Option supports specifying the various metric options. type Option func(*Options) @@ -140,16 +189,19 @@ type OptionApplier interface { CounterOptionApplier GaugeOptionApplier MeasureOptionApplier + ObserverOptionApplier // ApplyOption is used to make some general changes in the // Options. ApplyOption(*Options) } -// CounterGaugeOptionApplier is an interface for applying metric -// options that are valid for counter or gauge metrics. -type CounterGaugeOptionApplier interface { +// CounterGaugeObserverOptionApplier is an interface for applying +// metric options that are valid for counter, gauge or observer +// metrics. +type CounterGaugeObserverOptionApplier interface { CounterOptionApplier GaugeOptionApplier + ObserverOptionApplier } type optionWrapper struct { @@ -168,16 +220,22 @@ type measureOptionWrapper struct { F Option } -type counterGaugeOptionWrapper struct { +type observerOptionWrapper struct { + F Option +} + +type counterGaugeObserverOptionWrapper struct { FC Option FG Option + FO Option } var ( - _ OptionApplier = optionWrapper{} - _ CounterOptionApplier = counterOptionWrapper{} - _ GaugeOptionApplier = gaugeOptionWrapper{} - _ MeasureOptionApplier = measureOptionWrapper{} + _ OptionApplier = optionWrapper{} + _ CounterOptionApplier = counterOptionWrapper{} + _ GaugeOptionApplier = gaugeOptionWrapper{} + _ MeasureOptionApplier = measureOptionWrapper{} + _ ObserverOptionApplier = observerOptionWrapper{} ) func (o optionWrapper) ApplyCounterOption(opts *Options) { @@ -192,6 +250,10 @@ func (o optionWrapper) ApplyMeasureOption(opts *Options) { o.ApplyOption(opts) } +func (o optionWrapper) ApplyObserverOption(opts *Options) { + o.ApplyOption(opts) +} + func (o optionWrapper) ApplyOption(opts *Options) { o.F(opts) } @@ -208,14 +270,22 @@ func (o measureOptionWrapper) ApplyMeasureOption(opts *Options) { o.F(opts) } -func (o counterGaugeOptionWrapper) ApplyCounterOption(opts *Options) { +func (o counterGaugeObserverOptionWrapper) ApplyCounterOption(opts *Options) { o.FC(opts) } -func (o counterGaugeOptionWrapper) ApplyGaugeOption(opts *Options) { +func (o counterGaugeObserverOptionWrapper) ApplyGaugeOption(opts *Options) { o.FG(opts) } +func (o counterGaugeObserverOptionWrapper) ApplyObserverOption(opts *Options) { + o.FO(opts) +} + +func (o observerOptionWrapper) ApplyObserverOption(opts *Options) { + o.F(opts) +} + // WithDescription applies provided description. func WithDescription(desc string) OptionApplier { return optionWrapper{ @@ -244,16 +314,19 @@ func WithKeys(keys ...core.Key) OptionApplier { } } -// WithMonotonic sets whether a counter or a gauge is not permitted to -// go down. -func WithMonotonic(monotonic bool) CounterGaugeOptionApplier { - return counterGaugeOptionWrapper{ +// WithMonotonic sets whether a counter, a gauge or an observer is not +// permitted to go down. +func WithMonotonic(monotonic bool) CounterGaugeObserverOptionApplier { + return counterGaugeObserverOptionWrapper{ FC: func(opts *Options) { opts.Alternate = !monotonic }, FG: func(opts *Options) { opts.Alternate = monotonic }, + FO: func(opts *Options) { + opts.Alternate = monotonic + }, } } diff --git a/api/metric/api_test.go b/api/metric/api_test.go index a45c96c6118..8698727b763 100644 --- a/api/metric/api_test.go +++ b/api/metric/api_test.go @@ -26,6 +26,7 @@ import ( mock "go.opentelemetry.io/otel/internal/metric" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" ) func TestCounterOptions(t *testing.T) { @@ -361,6 +362,117 @@ func TestMeasureOptions(t *testing.T) { } } +func TestObserverOptions(t *testing.T) { + type testcase struct { + name string + opts []metric.ObserverOptionApplier + keys []core.Key + desc string + unit unit.Unit + alt bool + } + testcases := []testcase{ + { + name: "no opts", + opts: nil, + keys: nil, + desc: "", + unit: "", + alt: false, + }, + { + name: "keys keys keys", + opts: []metric.ObserverOptionApplier{ + metric.WithKeys(key.New("foo"), key.New("foo2")), + metric.WithKeys(key.New("bar"), key.New("bar2")), + metric.WithKeys(key.New("baz"), key.New("baz2")), + }, + keys: []core.Key{ + key.New("foo"), key.New("foo2"), + key.New("bar"), key.New("bar2"), + key.New("baz"), key.New("baz2"), + }, + desc: "", + unit: "", + alt: false, + }, + { + name: "description", + opts: []metric.ObserverOptionApplier{ + metric.WithDescription("stuff"), + }, + keys: nil, + desc: "stuff", + unit: "", + alt: false, + }, + { + name: "description override", + opts: []metric.ObserverOptionApplier{ + metric.WithDescription("stuff"), + metric.WithDescription("things"), + }, + keys: nil, + desc: "things", + unit: "", + alt: false, + }, + { + name: "unit", + opts: []metric.ObserverOptionApplier{ + metric.WithUnit("s"), + }, + keys: nil, + desc: "", + unit: "s", + alt: false, + }, + { + name: "unit override", + opts: []metric.ObserverOptionApplier{ + metric.WithUnit("s"), + metric.WithUnit("h"), + }, + keys: nil, + desc: "", + unit: "h", + alt: false, + }, + { + name: "monotonic", + opts: []metric.ObserverOptionApplier{ + metric.WithMonotonic(true), + }, + keys: nil, + desc: "", + unit: "", + alt: true, + }, + { + name: "monotonic, but not really", + opts: []metric.ObserverOptionApplier{ + metric.WithMonotonic(true), + metric.WithMonotonic(false), + }, + keys: nil, + desc: "", + unit: "", + alt: false, + }, + } + for idx, tt := range testcases { + t.Logf("Testing observer case %s (%d)", tt.name, idx) + opts := &metric.Options{} + metric.ApplyObserverOptions(opts, tt.opts...) + checkOptions(t, opts, &metric.Options{ + Description: tt.desc, + Unit: tt.unit, + Keys: tt.keys, + Alternate: tt.alt, + }) + } +} + func checkOptions(t *testing.T, got *metric.Options, expected *metric.Options) { if diff := cmp.Diff(got, expected); diff != "" { t.Errorf("Compare options: -got +want %s", diff) @@ -448,6 +560,29 @@ func TestMeasure(t *testing.T) { } } +func TestObserver(t *testing.T) { + { + meter := mock.NewMeter() + labels := meter.Labels() + o := meter.RegisterFloat64Observer("test.observer.float", func(result metric.Float64ObserverResult) { + result.Observe(42, labels) + }) + t.Log("Testing float observer") + meter.RunObservers() + checkObserverBatch(t, labels, meter, core.Float64NumberKind, o) + } + { + meter := mock.NewMeter() + labels := meter.Labels() + o := meter.RegisterInt64Observer("test.observer.int", func(result metric.Int64ObserverResult) { + result.Observe(42, labels) + }) + t.Log("Testing int observer") + meter.RunObservers() + checkObserverBatch(t, labels, meter, core.Int64NumberKind, o) + } +} + func checkBatches(t *testing.T, ctx context.Context, labels metric.LabelSet, meter *mock.Meter, kind core.NumberKind, instrument metric.InstrumentImpl) { t.Helper() if len(meter.MeasurementBatches) != 3 { @@ -496,7 +631,31 @@ func checkBatches(t *testing.T, ctx context.Context, labels metric.LabelSet, met } } +func checkObserverBatch(t *testing.T, labels metric.LabelSet, meter *mock.Meter, kind core.NumberKind, observer interface{}) { + t.Helper() + assert.Len(t, meter.MeasurementBatches, 1) + if len(meter.MeasurementBatches) < 1 { + return + } + o := observer.(*mock.Observer) + if !assert.NotNil(t, o) { + return + } + ourLabelSet := labels.(*mock.LabelSet) + got := meter.MeasurementBatches[0] + assert.Equal(t, ourLabelSet, got.LabelSet) + assert.Len(t, got.Measurements, 1) + if len(got.Measurements) < 1 { + return + } + measurement := got.Measurements[0] + assert.Equal(t, o.Instrument, measurement.Instrument) + ft := fortyTwo(t, kind) + assert.Equal(t, 0, measurement.Number.CompareNumber(kind, ft)) +} + func fortyTwo(t *testing.T, kind core.NumberKind) core.Number { + t.Helper() switch kind { case core.Int64NumberKind: return core.NewInt64Number(42) diff --git a/api/metric/doc.go b/api/metric/doc.go index 26e5884be07..27fc33aa5d7 100644 --- a/api/metric/doc.go +++ b/api/metric/doc.go @@ -13,21 +13,22 @@ // limitations under the License. // metric package provides an API for reporting diagnostic -// measurements using three basic kinds of instruments (or four, if -// calling one special case a separate one). +// measurements using four basic kinds of instruments. // -// The three basic kinds are: +// The four basic kinds are: // // - counters // - gauges // - measures +// - observers // // All instruments report either float64 or int64 values. // -// The primary object that handles metrics is Meter. The -// implementation of the Meter is provided by SDK. Normally, the Meter -// is used directly only for the LabelSet generation, batch recording -// and the bound instrument destruction. +// The primary object that handles metrics is Meter. Meter can be +// obtained from Provider. The implementations of the Meter and +// Provider are provided by SDK. Normally, the Meter is used directly +// only for the instrument creation, LabelSet generation and batch +// recording. // // LabelSet is a set of keys and values that are in a suitable, // optimized form to be used by Meter. @@ -60,11 +61,24 @@ // the New*Measure function - this allows reporting negative values // too. To report a new value, use the Record function. // -// All the basic kinds of instruments also support creating bound -// instruments for a potentially more efficient reporting. The bound -// instruments have the same function names as the instruments (so a -// Counter bound instrument has Add, a Gauge bound instrument has Set, -// and a Measure bound instrument has Record). Bound Instruments can -// be created with the Bind function of the respective -// instrument. When done with the bound instrument, call Unbind on it. +// Observers are instruments that are reporting a current state of a +// set of values. An example could be voltage or +// temperature. Observers can be created with either +// RegisterFloat64Observer or RegisterInt64Observer. Observers by +// default have no limitations about reported values - they can be +// less or greater than the last reported value. This can be changed +// with the WithMonotonic option passed to the Register*Observer +// function - this permits the reported values only to go +// up. Reporting of the new values happens asynchronously, with the +// use of a callback passed to the Register*Observer function. The +// callback can report multiple values. To unregister the observer, +// call Unregister on it. +// +// Counters, gauges and measures support creating bound instruments +// for a potentially more efficient reporting. The bound instruments +// have the same function names as the instruments (so a Counter bound +// instrument has Add, a Gauge bound instrument has Set, and a Measure +// bound instrument has Record). Bound Instruments can be created +// with the Bind function of the respective instrument. When done with +// the bound instrument, call Unbind on it. package metric // import "go.opentelemetry.io/otel/api/metric" diff --git a/api/metric/noop.go b/api/metric/noop.go index 66cdd6e1f5d..d569dbae6aa 100644 --- a/api/metric/noop.go +++ b/api/metric/noop.go @@ -11,12 +11,16 @@ type NoopMeter struct{} type noopBoundInstrument struct{} type noopLabelSet struct{} type noopInstrument struct{} +type noopInt64Observer struct{} +type noopFloat64Observer struct{} var _ Provider = NoopProvider{} var _ Meter = NoopMeter{} var _ InstrumentImpl = noopInstrument{} var _ BoundInstrumentImpl = noopBoundInstrument{} var _ LabelSet = noopLabelSet{} +var _ Int64Observer = noopInt64Observer{} +var _ Float64Observer = noopFloat64Observer{} func (NoopProvider) Meter(name string) Meter { return NoopMeter{} @@ -39,6 +43,12 @@ func (noopInstrument) Meter() Meter { return NoopMeter{} } +func (noopInt64Observer) Unregister() { +} + +func (noopFloat64Observer) Unregister() { +} + func (NoopMeter) Labels(...core.KeyValue) LabelSet { return noopLabelSet{} } @@ -69,3 +79,11 @@ func (NoopMeter) NewFloat64Measure(name string, mos ...MeasureOptionApplier) Flo func (NoopMeter) RecordBatch(context.Context, LabelSet, ...Measurement) { } + +func (NoopMeter) RegisterInt64Observer(name string, callback Int64ObserverCallback, oos ...ObserverOptionApplier) Int64Observer { + return noopInt64Observer{} +} + +func (NoopMeter) RegisterFloat64Observer(name string, callback Float64ObserverCallback, oos ...ObserverOptionApplier) Float64Observer { + return noopFloat64Observer{} +} diff --git a/api/metric/sdkhelpers.go b/api/metric/sdkhelpers.go index 882b627224b..5efdab24d51 100644 --- a/api/metric/sdkhelpers.go +++ b/api/metric/sdkhelpers.go @@ -122,3 +122,11 @@ func ApplyMeasureOptions(opts *Options, mos ...MeasureOptionApplier) { o.ApplyMeasureOption(opts) } } + +// ApplyObserverOptions is a helper that applies all the observer +// options to passed opts. +func ApplyObserverOptions(opts *Options, mos ...ObserverOptionApplier) { + for _, o := range mos { + o.ApplyObserverOption(opts) + } +} diff --git a/internal/metric/mock.go b/internal/metric/mock.go index 8bd37574f5b..8a70a47c3f7 100644 --- a/internal/metric/mock.go +++ b/internal/metric/mock.go @@ -54,6 +54,10 @@ type ( Meter struct { MeasurementBatches []Batch + // Observers contains also unregistered + // observers. Check the Dead field of the Observer to + // figure out its status. + Observers []*Observer } Kind int8 @@ -63,21 +67,63 @@ type ( Number core.Number Instrument *Instrument } + + observerResult struct { + instrument *Instrument + } + + int64ObserverResult struct { + result observerResult + } + + float64ObserverResult struct { + result observerResult + } + + observerCallback func(observerResult) + + Observer struct { + Instrument *Instrument + Meter *Meter + Dead bool + callback observerCallback + } ) var ( - _ apimetric.InstrumentImpl = &Instrument{} - _ apimetric.BoundInstrumentImpl = &Handle{} - _ apimetric.LabelSet = &LabelSet{} - _ apimetric.Meter = &Meter{} + _ apimetric.InstrumentImpl = &Instrument{} + _ apimetric.BoundInstrumentImpl = &Handle{} + _ apimetric.LabelSet = &LabelSet{} + _ apimetric.Meter = &Meter{} + _ apimetric.Int64Observer = &Observer{} + _ apimetric.Float64Observer = &Observer{} + _ apimetric.Int64ObserverResult = int64ObserverResult{} + _ apimetric.Float64ObserverResult = float64ObserverResult{} ) const ( KindCounter Kind = iota KindGauge KindMeasure + KindObserver ) +func (o *Observer) Unregister() { + o.Dead = true +} + +func (r int64ObserverResult) Observe(value int64, labels apimetric.LabelSet) { + r.result.observe(core.NewInt64Number(value), labels) +} + +func (r float64ObserverResult) Observe(value float64, labels apimetric.LabelSet) { + r.result.observe(core.NewFloat64Number(value), labels) +} + +func (r observerResult) observe(number core.Number, labels apimetric.LabelSet) { + r.instrument.RecordOne(context.Background(), number, labels) +} + func (i *Instrument) Bind(labels apimetric.LabelSet) apimetric.BoundInstrumentImpl { if ld, ok := labels.(apimetric.LabelSetDelegate); ok { labels = ld.Delegate() @@ -209,6 +255,58 @@ func (m *Meter) newMeasureInstrument(name string, numberKind core.NumberKind, mo } } +func (m *Meter) RegisterInt64Observer(name string, callback apimetric.Int64ObserverCallback, oos ...apimetric.ObserverOptionApplier) apimetric.Int64Observer { + wrappedCallback := wrapInt64ObserverCallback(callback) + return m.newObserver(name, wrappedCallback, core.Int64NumberKind, oos...) +} + +func wrapInt64ObserverCallback(callback apimetric.Int64ObserverCallback) observerCallback { + if callback == nil { + return func(result observerResult) {} + } + return func(result observerResult) { + typeSafeResult := int64ObserverResult{ + result: result, + } + callback(typeSafeResult) + } +} + +func (m *Meter) RegisterFloat64Observer(name string, callback apimetric.Float64ObserverCallback, oos ...apimetric.ObserverOptionApplier) apimetric.Float64Observer { + wrappedCallback := wrapFloat64ObserverCallback(callback) + return m.newObserver(name, wrappedCallback, core.Float64NumberKind, oos...) +} + +func wrapFloat64ObserverCallback(callback apimetric.Float64ObserverCallback) observerCallback { + if callback == nil { + return func(result observerResult) {} + } + return func(result observerResult) { + typeSafeResult := float64ObserverResult{ + result: result, + } + callback(typeSafeResult) + } +} + +func (m *Meter) newObserver(name string, callback observerCallback, numberKind core.NumberKind, oos ...apimetric.ObserverOptionApplier) *Observer { + opts := apimetric.Options{} + apimetric.ApplyObserverOptions(&opts, oos...) + obs := &Observer{ + Instrument: &Instrument{ + Name: name, + Kind: KindObserver, + NumberKind: numberKind, + Opts: opts, + }, + Meter: m, + Dead: false, + callback: callback, + } + m.Observers = append(m.Observers, obs) + return obs +} + func (m *Meter) RecordBatch(ctx context.Context, labels apimetric.LabelSet, measurements ...apimetric.Measurement) { ourLabelSet := labels.(*LabelSet) mm := make([]Measurement, len(measurements)) @@ -229,3 +327,14 @@ func (m *Meter) recordMockBatch(ctx context.Context, labelSet *LabelSet, measure Measurements: measurements, }) } + +func (m *Meter) RunObservers() { + for _, observer := range m.Observers { + if observer.Dead { + continue + } + observer.callback(observerResult{ + instrument: observer.Instrument, + }) + } +} diff --git a/sdk/export/metric/kind_string.go b/sdk/export/metric/kind_string.go index e2d2b88f466..d5f388694a8 100644 --- a/sdk/export/metric/kind_string.go +++ b/sdk/export/metric/kind_string.go @@ -11,11 +11,12 @@ func _() { _ = x[CounterKind-0] _ = x[GaugeKind-1] _ = x[MeasureKind-2] + _ = x[ObserverKind-3] } -const _Kind_name = "CounterKindGaugeKindMeasureKind" +const _Kind_name = "CounterKindGaugeKindMeasureKindObserverKind" -var _Kind_index = [...]uint8{0, 11, 20, 31} +var _Kind_index = [...]uint8{0, 11, 20, 31, 43} func (i Kind) String() string { if i < 0 || i >= Kind(len(_Kind_index)-1) { diff --git a/sdk/export/metric/metric.go b/sdk/export/metric/metric.go index dc8e5f7182c..f4cb2042100 100644 --- a/sdk/export/metric/metric.go +++ b/sdk/export/metric/metric.go @@ -295,6 +295,9 @@ const ( // Measure kind indicates a measure instrument. MeasureKind + + // Observer kind indicates an observer instrument + ObserverKind ) // Descriptor describes a metric instrument to the exporter. diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index 03fa6eb5e7b..44d2eb3558b 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -52,6 +52,8 @@ func (*benchFixture) AggregatorFor(descriptor *export.Descriptor) export.Aggrega return counter.New() case export.GaugeKind: return gauge.New() + case export.ObserverKind: + fallthrough case export.MeasureKind: if strings.HasSuffix(descriptor.Name(), "minmaxsumcount") { return minmaxsumcount.New(descriptor) @@ -357,6 +359,89 @@ func benchmarkFloat64MeasureHandleAdd(b *testing.B, name string) { } } +// Observers + +func BenchmarkObserverRegistration(b *testing.B) { + fix := newFixture(b) + names := make([]string, 0, b.N) + for i := 0; i < b.N; i++ { + names = append(names, fmt.Sprintf("test.observer.%d", i)) + } + cb := func(result metric.Int64ObserverResult) {} + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + fix.sdk.RegisterInt64Observer(names[i], cb) + } +} + +func BenchmarkObserverRegistrationUnregistration(b *testing.B) { + fix := newFixture(b) + names := make([]string, 0, b.N) + for i := 0; i < b.N; i++ { + names = append(names, fmt.Sprintf("test.observer.%d", i)) + } + cb := func(result metric.Int64ObserverResult) {} + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + fix.sdk.RegisterInt64Observer(names[i], cb).Unregister() + } +} + +func BenchmarkObserverRegistrationUnregistrationBatched(b *testing.B) { + fix := newFixture(b) + names := make([]string, 0, b.N) + for i := 0; i < b.N; i++ { + names = append(names, fmt.Sprintf("test.observer.%d", i)) + } + observers := make([]metric.Int64Observer, 0, b.N) + cb := func(result metric.Int64ObserverResult) {} + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + observers = append(observers, fix.sdk.RegisterInt64Observer(names[i], cb)) + } + for i := 0; i < b.N; i++ { + observers[i].Unregister() + } +} + +func BenchmarkObserverObservationInt64(b *testing.B) { + ctx := context.Background() + fix := newFixture(b) + labs := fix.sdk.Labels(makeLabels(1)...) + _ = fix.sdk.RegisterInt64Observer("test.observer", func(result metric.Int64ObserverResult) { + b.StartTimer() + defer b.StopTimer() + for i := 0; i < b.N; i++ { + result.Observe((int64)(i), labs) + } + }) + b.StopTimer() + b.ResetTimer() + fix.sdk.Collect(ctx) +} + +func BenchmarkObserverObservationFloat64(b *testing.B) { + ctx := context.Background() + fix := newFixture(b) + labs := fix.sdk.Labels(makeLabels(1)...) + _ = fix.sdk.RegisterFloat64Observer("test.observer", func(result metric.Float64ObserverResult) { + b.StartTimer() + defer b.StopTimer() + for i := 0; i < b.N; i++ { + result.Observe((float64)(i), labs) + } + }) + b.StopTimer() + b.ResetTimer() + fix.sdk.Collect(ctx) +} + // MaxSumCount func BenchmarkInt64MaxSumCountAdd(b *testing.B) { diff --git a/sdk/metric/doc.go b/sdk/metric/doc.go index 00071a3233e..17784e35a5a 100644 --- a/sdk/metric/doc.go +++ b/sdk/metric/doc.go @@ -18,10 +18,10 @@ supports configurable metrics export behavior through a collection of export interfaces that support various export strategies, described below. The metric.Meter API consists of methods for constructing each of the -basic kinds of metric instrument. There are six types of instrument -available to the end user, comprised of three basic kinds of metric -instrument (Counter, Gauge, Measure) crossed with two kinds of number -(int64, float64). +basic kinds of metric instrument. There are eight types of instrument +available to the end user, comprised of four basic kinds of metric +instrument (Counter, Gauge, Measure, Observer) crossed with two kinds +of number (int64, float64). The API assists the SDK by consolidating the variety of metric instruments into a narrower interface, allowing the SDK to avoid repetition of @@ -31,17 +31,25 @@ numerical value. To this end, the API uses a core.Number type to represent either an int64 or a float64, depending on the instrument's definition. A single -implementation interface is used for instruments, metric.InstrumentImpl, -and a single implementation interface is used for handles, -metric.HandleImpl. - -There are three entry points for events in the Metrics API: via instrument -handles, via direct instrument calls, and via BatchRecord. The SDK is -designed with handles as the primary entry point, the other two entry -points are implemented in terms of short-lived handles. For example, the -implementation of a direct call allocates a handle, operates on the -handle, and releases the handle. Similarly, the implementation of -RecordBatch uses a short-lived handle for each measurement in the batch. +implementation interface is used for counter, gauge and measure +instruments, metric.InstrumentImpl, and a single implementation interface +is used for their handles, metric.HandleImpl. For observers, the API +defines interfaces, for which the SDK provides an implementation. + +There are four entry points for events in the Metrics API - three for +synchronous instruments (counters, gauges and measures) and one for +asynchronous instruments (observers). The entry points for synchronous +instruments are: via instrument handles, via direct instrument calls, and +via BatchRecord. The SDK is designed with handles as the primary entry +point, the other two entry points are implemented in terms of short-lived +handles. For example, the implementation of a direct call allocates a +handle, operates on the handle, and releases the handle. Similarly, the +implementation of RecordBatch uses a short-lived handle for each +measurement in the batch. The entry point for asynchronous instruments is +via observer callbacks. Observer callbacks behave like a set of instrument +handles - one for each observation for a distinct label set. The observer +handles are alive as long as they are used. If the callback stops +reporting values for a certain label set, the associated handle is dropped. Internal Structure @@ -51,6 +59,10 @@ user-level code or a short-lived device, there exists an internal record managed by the SDK. Each internal record corresponds to a specific instrument and label set combination. +Each observer also has its own kind of record stored in the SDK. This +record contains a set of recorders for every specific label set used in the +callback. + A sync.Map maintains the mapping of current instruments and label sets to internal records. To create a new handle, the SDK consults the Map to locate an existing record, otherwise it constructs a new record. The SDK @@ -61,31 +73,18 @@ from the user's perspective. Metric collection is performed via a single-threaded call to Collect that sweeps through all records in the SDK, checkpointing their state. When a record is discovered that has no references and has not been updated since -the prior collection pass, it is marked for reclamation and removed from -the Map. There exists, at this moment, a race condition since another -goroutine could, in the same instant, obtain a reference to the handle. - -The SDK is designed to tolerate this sort of race condition, in the name -of reducing lock contention. It is possible for more than one record with -identical instrument and label set to exist simultaneously, though only -one can be linked from the Map at a time. To avoid lost updates, the SDK -maintains two additional linked lists of records, one managed by the -collection code path and one managed by the instrumentation code path. +the prior collection pass, it is removed from the Map. The SDK maintains a current epoch number, corresponding to the number of -completed collections. Each record contains the last epoch during which -it was collected and updated. These variables allow the collection code -path to detect stale records while allowing the instrumentation code path -to detect potential reclamations. When the instrumentation code path -detects a potential reclamation, it adds itself to the second linked list, -where records are saved from reclamation. - -Each record has an associated aggregator, which maintains the current -state resulting from all metric events since its last checkpoint. -Aggregators may be lock-free or they may use locking, but they should -expect to be called concurrently. Because of the tolerated race condition -described above, aggregators must be capable of merging with another -aggregator of the same type. +completed collections. Each recorder of an observer record contains the +last epoch during which it was updated. This variable allows the collection +code path to detect stale recorders and remove them. + +Each record of a handle and recorder of an observer has an associated +aggregator, which maintains the current state resulting from all metric +events since its last checkpoint. Aggregators may be lock-free or they may +use locking, but they should expect to be called concurrently. Aggregators +must be capable of merging with another aggregator of the same type. Export Pipeline diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index ce98abf52d5..a8d57a5fa89 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -43,6 +43,9 @@ type ( // current maps `mapkey` to *record. current sync.Map + // observers is a set of `*observer` instances + observers sync.Map + // empty is the (singleton) result of Labels() // w/ zero arguments. empty labels @@ -115,16 +118,121 @@ type ( recorder export.Aggregator } + observerResult struct { + observer *observer + } + + int64ObserverResult struct { + result observerResult + } + + float64ObserverResult struct { + result observerResult + } + + observerCallback func(result observerResult) + + observer struct { + meter *SDK + descriptor *export.Descriptor + // recorders maps encoded labelset to the pair of + // labelset and recorder + recorders map[string]labeledRecorder + callback observerCallback + } + + labeledRecorder struct { + recorder export.Aggregator + labels *labels + modifiedEpoch int64 + } + + int64Observer struct { + observer *observer + } + + float64Observer struct { + observer *observer + } + ErrorHandler func(error) ) var ( - _ api.Meter = &SDK{} - _ api.LabelSet = &labels{} - _ api.InstrumentImpl = &instrument{} - _ api.BoundInstrumentImpl = &record{} + _ api.Meter = &SDK{} + _ api.LabelSet = &labels{} + _ api.InstrumentImpl = &instrument{} + _ api.BoundInstrumentImpl = &record{} + _ api.Int64Observer = int64Observer{} + _ api.Float64Observer = float64Observer{} + _ api.Int64ObserverResult = int64ObserverResult{} + _ api.Float64ObserverResult = float64ObserverResult{} ) +func (r observerResult) observe(number core.Number, ls api.LabelSet) { + r.observer.recordOne(number, ls) +} + +func (o *observer) recordOne(number core.Number, ls api.LabelSet) { + if err := aggregator.RangeTest(number, o.descriptor); err != nil { + o.meter.errorHandler(err) + return + } + recorder := o.getRecorder(ls) + if recorder == nil { + // The instrument is disabled according to the + // AggregationSelector. + return + } + if err := recorder.Update(context.Background(), number, o.descriptor); err != nil { + o.meter.errorHandler(err) + return + } +} + +func (o *observer) getRecorder(ls api.LabelSet) export.Aggregator { + labels := o.meter.labsFor(ls) + lrec, ok := o.recorders[labels.encoded] + if ok { + lrec.modifiedEpoch = o.meter.currentEpoch + o.recorders[labels.encoded] = lrec + return lrec.recorder + } + rec := o.meter.batcher.AggregatorFor(o.descriptor) + if o.recorders == nil { + o.recorders = make(map[string]labeledRecorder) + } + // This may store nil recorder in the map, thus disabling the + // observer for the labelset for good. This is intentional, + // but will be revisited later. + o.recorders[labels.encoded] = labeledRecorder{ + recorder: rec, + labels: labels, + modifiedEpoch: o.meter.currentEpoch, + } + return rec +} + +func (o *observer) unregister() { + o.meter.observers.Delete(o) +} + +func (r int64ObserverResult) Observe(value int64, labels api.LabelSet) { + r.result.observe(core.NewInt64Number(value), labels) +} + +func (r float64ObserverResult) Observe(value float64, labels api.LabelSet) { + r.result.observe(core.NewFloat64Number(value), labels) +} + +func (o int64Observer) Unregister() { + o.observer.unregister() +} + +func (o float64Observer) Unregister() { + o.observer.unregister() +} + func (i *instrument) Meter() api.Meter { return i.meter } @@ -275,8 +383,8 @@ func (m *SDK) labsFor(ls api.LabelSet) *labels { return &m.empty } -func (m *SDK) newInstrument(name string, metricKind export.Kind, numberKind core.NumberKind, opts *api.Options) *instrument { - descriptor := export.NewDescriptor( +func newDescriptor(name string, metricKind export.Kind, numberKind core.NumberKind, opts *api.Options) *export.Descriptor { + return export.NewDescriptor( name, metricKind, opts.Keys, @@ -284,6 +392,10 @@ func (m *SDK) newInstrument(name string, metricKind export.Kind, numberKind core opts.Unit, numberKind, opts.Alternate) +} + +func (m *SDK) newInstrument(name string, metricKind export.Kind, numberKind core.NumberKind, opts *api.Options) *instrument { + descriptor := newDescriptor(name, metricKind, numberKind, opts) return &instrument{ descriptor: descriptor, meter: m, @@ -332,8 +444,66 @@ func (m *SDK) NewFloat64Measure(name string, mos ...api.MeasureOptionApplier) ap return api.WrapFloat64MeasureInstrument(m.newMeasureInstrument(name, core.Float64NumberKind, mos...)) } -// Collect traverses the list of active records and exports data for -// each active instrument. Collect() may not be called concurrently. +func (m *SDK) RegisterInt64Observer(name string, callback api.Int64ObserverCallback, oos ...api.ObserverOptionApplier) api.Int64Observer { + if callback == nil { + return api.NoopMeter{}.RegisterInt64Observer("", nil) + } + opts := api.Options{} + api.ApplyObserverOptions(&opts, oos...) + descriptor := newDescriptor(name, export.ObserverKind, core.Int64NumberKind, &opts) + cb := wrapInt64ObserverCallback(callback) + obs := m.newObserver(descriptor, cb) + return int64Observer{ + observer: obs, + } +} + +func wrapInt64ObserverCallback(callback api.Int64ObserverCallback) observerCallback { + return func(result observerResult) { + typeSafeResult := int64ObserverResult{ + result: result, + } + callback(typeSafeResult) + } +} + +func (m *SDK) RegisterFloat64Observer(name string, callback api.Float64ObserverCallback, oos ...api.ObserverOptionApplier) api.Float64Observer { + if callback == nil { + return api.NoopMeter{}.RegisterFloat64Observer("", nil) + } + opts := api.Options{} + api.ApplyObserverOptions(&opts, oos...) + descriptor := newDescriptor(name, export.ObserverKind, core.Float64NumberKind, &opts) + cb := wrapFloat64ObserverCallback(callback) + obs := m.newObserver(descriptor, cb) + return float64Observer{ + observer: obs, + } +} + +func wrapFloat64ObserverCallback(callback api.Float64ObserverCallback) observerCallback { + return func(result observerResult) { + typeSafeResult := float64ObserverResult{ + result: result, + } + callback(typeSafeResult) + } +} + +func (m *SDK) newObserver(descriptor *export.Descriptor, callback observerCallback) *observer { + obs := &observer{ + meter: m, + descriptor: descriptor, + recorders: nil, + callback: callback, + } + m.observers.Store(obs, nil) + return obs +} + +// Collect traverses the list of active records and observers and +// exports data for each active instrument. Collect() may not be +// called concurrently. // // During the collection pass, the export.Batcher will receive // one Export() call per current aggregation. @@ -343,6 +513,13 @@ func (m *SDK) Collect(ctx context.Context) int { m.collectLock.Lock() defer m.collectLock.Unlock() + checkpointed := m.collectRecords(ctx) + checkpointed += m.collectObservers(ctx) + m.currentEpoch++ + return checkpointed +} + +func (m *SDK) collectRecords(ctx context.Context) int { checkpointed := 0 m.current.Range(func(key interface{}, value interface{}) bool { @@ -358,25 +535,66 @@ func (m *SDK) Collect(ctx context.Context) int { // TODO: Reconsider this logic. if inuse.refMapped.inUse() || atomic.LoadInt64(&inuse.modified) != 0 { atomic.StoreInt64(&inuse.modified, 0) - checkpointed += m.checkpoint(ctx, inuse) + checkpointed += m.checkpointRecord(ctx, inuse) } // Always continue to iterate over the entire map. return true }) - m.currentEpoch++ return checkpointed } -func (m *SDK) checkpoint(ctx context.Context, r *record) int { - if r.recorder == nil { +func (m *SDK) collectObservers(ctx context.Context) int { + checkpointed := 0 + + m.observers.Range(func(key, value interface{}) bool { + obs := key.(*observer) + result := observerResult{ + observer: obs, + } + obs.callback(result) + checkpointed += m.checkpointObserver(ctx, obs) + return true + }) + + return checkpointed +} + +func (m *SDK) checkpointRecord(ctx context.Context, r *record) int { + return m.checkpoint(ctx, r.descriptor, r.recorder, r.labels) +} + +func (m *SDK) checkpointObserver(ctx context.Context, obs *observer) int { + if len(obs.recorders) == 0 { return 0 } - r.recorder.Checkpoint(ctx, r.descriptor) - labels := export.NewLabels(r.labels.sorted, r.labels.encoded, m.labelEncoder) - err := m.batcher.Process(ctx, export.NewRecord(r.descriptor, labels, r.recorder)) + checkpointed := 0 + for encodedLabels, lrec := range obs.recorders { + epochDiff := m.currentEpoch - lrec.modifiedEpoch + if epochDiff == 0 { + checkpointed += m.checkpoint(ctx, obs.descriptor, lrec.recorder, lrec.labels) + } else if epochDiff > 1 { + // This is second collection cycle with no + // observations for this labelset. Remove the + // recorder. + delete(obs.recorders, encodedLabels) + } + } + if len(obs.recorders) == 0 { + obs.recorders = nil + } + return checkpointed +} +func (m *SDK) checkpoint(ctx context.Context, descriptor *export.Descriptor, recorder export.Aggregator, labels *labels) int { + if recorder == nil { + return 0 + } + recorder.Checkpoint(ctx, descriptor) + exportLabels := export.NewLabels(labels.sorted, labels.encoded, m.labelEncoder) + exportRecord := export.NewRecord(descriptor, exportLabels, recorder) + err := m.batcher.Process(ctx, exportRecord) if err != nil { m.errorHandler(err) } diff --git a/sdk/metric/selector/simple/simple.go b/sdk/metric/selector/simple/simple.go index d86005b0365..a60dc3fbd71 100644 --- a/sdk/metric/selector/simple/simple.go +++ b/sdk/metric/selector/simple/simple.go @@ -38,19 +38,19 @@ var ( ) // NewWithInexpensiveMeasure returns a simple aggregation selector -// that uses counter, gauge, and minmaxsumcount aggregators for the three -// kinds of metric. This selector is faster and uses less memory than -// the others because minmaxsumcount does not aggregate quantile +// that uses counter, gauge, and minmaxsumcount aggregators for the +// four kinds of metric. This selector is faster and uses less memory +// than the others because minmaxsumcount does not aggregate quantile // information. func NewWithInexpensiveMeasure() export.AggregationSelector { return selectorInexpensive{} } // NewWithSketchMeasure returns a simple aggregation selector that -// uses counter, gauge, and ddsketch aggregators for the three kinds -// of metric. This selector uses more cpu and memory than the +// uses counter, gauge, and ddsketch aggregators for the four kinds of +// metric. This selector uses more cpu and memory than the // NewWithInexpensiveMeasure because it uses one DDSketch per distinct -// measure and labelset. +// measure/observer and labelset. func NewWithSketchMeasure(config *ddsketch.Config) export.AggregationSelector { return selectorSketch{ config: config, @@ -58,7 +58,7 @@ func NewWithSketchMeasure(config *ddsketch.Config) export.AggregationSelector { } // NewWithExactMeasure returns a simple aggregation selector that uses -// counter, gauge, and array behavior for the three kinds of metric. +// counter, gauge, and array aggregators for the four kinds of metric. // This selector uses more memory than the NewWithSketchMeasure // because it aggregates an array of all values, therefore is able to // compute exact quantiles. @@ -70,6 +70,8 @@ func (selectorInexpensive) AggregatorFor(descriptor *export.Descriptor) export.A switch descriptor.MetricKind() { case export.GaugeKind: return gauge.New() + case export.ObserverKind: + fallthrough case export.MeasureKind: return minmaxsumcount.New(descriptor) default: @@ -81,6 +83,8 @@ func (s selectorSketch) AggregatorFor(descriptor *export.Descriptor) export.Aggr switch descriptor.MetricKind() { case export.GaugeKind: return gauge.New() + case export.ObserverKind: + fallthrough case export.MeasureKind: return ddsketch.New(s.config, descriptor) default: @@ -92,6 +96,8 @@ func (selectorExact) AggregatorFor(descriptor *export.Descriptor) export.Aggrega switch descriptor.MetricKind() { case export.GaugeKind: return gauge.New() + case export.ObserverKind: + fallthrough case export.MeasureKind: return array.New() default: diff --git a/sdk/metric/selector/simple/simple_test.go b/sdk/metric/selector/simple/simple_test.go index 4e03b2c3daf..e1667195894 100644 --- a/sdk/metric/selector/simple/simple_test.go +++ b/sdk/metric/selector/simple/simple_test.go @@ -30,9 +30,10 @@ import ( ) var ( - testGaugeDesc = export.NewDescriptor("gauge", export.GaugeKind, nil, "", "", core.Int64NumberKind, false) - testCounterDesc = export.NewDescriptor("counter", export.CounterKind, nil, "", "", core.Int64NumberKind, false) - testMeasureDesc = export.NewDescriptor("measure", export.MeasureKind, nil, "", "", core.Int64NumberKind, false) + testGaugeDesc = export.NewDescriptor("gauge", export.GaugeKind, nil, "", "", core.Int64NumberKind, false) + testCounterDesc = export.NewDescriptor("counter", export.CounterKind, nil, "", "", core.Int64NumberKind, false) + testMeasureDesc = export.NewDescriptor("measure", export.MeasureKind, nil, "", "", core.Int64NumberKind, false) + testObserverDesc = export.NewDescriptor("observer", export.ObserverKind, nil, "", "", core.Int64NumberKind, false) ) func TestInexpensiveMeasure(t *testing.T) { @@ -40,6 +41,7 @@ func TestInexpensiveMeasure(t *testing.T) { require.NotPanics(t, func() { _ = inex.AggregatorFor(testGaugeDesc).(*gauge.Aggregator) }) require.NotPanics(t, func() { _ = inex.AggregatorFor(testCounterDesc).(*counter.Aggregator) }) require.NotPanics(t, func() { _ = inex.AggregatorFor(testMeasureDesc).(*minmaxsumcount.Aggregator) }) + require.NotPanics(t, func() { _ = inex.AggregatorFor(testObserverDesc).(*minmaxsumcount.Aggregator) }) } func TestSketchMeasure(t *testing.T) { @@ -47,6 +49,7 @@ func TestSketchMeasure(t *testing.T) { require.NotPanics(t, func() { _ = sk.AggregatorFor(testGaugeDesc).(*gauge.Aggregator) }) require.NotPanics(t, func() { _ = sk.AggregatorFor(testCounterDesc).(*counter.Aggregator) }) require.NotPanics(t, func() { _ = sk.AggregatorFor(testMeasureDesc).(*ddsketch.Aggregator) }) + require.NotPanics(t, func() { _ = sk.AggregatorFor(testObserverDesc).(*ddsketch.Aggregator) }) } func TestExactMeasure(t *testing.T) { @@ -54,4 +57,5 @@ func TestExactMeasure(t *testing.T) { require.NotPanics(t, func() { _ = ex.AggregatorFor(testGaugeDesc).(*gauge.Aggregator) }) require.NotPanics(t, func() { _ = ex.AggregatorFor(testCounterDesc).(*counter.Aggregator) }) require.NotPanics(t, func() { _ = ex.AggregatorFor(testMeasureDesc).(*array.Aggregator) }) + require.NotPanics(t, func() { _ = ex.AggregatorFor(testObserverDesc).(*array.Aggregator) }) }