Skip to content

Commit

Permalink
Add observer metric (#474)
Browse files Browse the repository at this point in the history
* wip: observers

* wip: float observers

* fix copy pasta

* wip: rework observers in sdk

* small fix in global meter

* wip: aggregators and selectors

* wip: monotonicity option for observers

* some refactor

* wip: docs

needs more package docs (especially for api/metric and sdk/metric)

* fix ci

* Fix copy-pasta in docs

Co-Authored-By: Mauricio Vásquez <[email protected]>

* recycle unused recorders in observers

if a recorder for a labelset is unused for a second collection cycle
in a row, drop it

* unregister

* thread-safe set callback

* Fix docs

* Revert "wip: aggregators and selectors"

This reverts commit 37b7d05.

* update selector

* tests

* Rework number equality

Compare concrete numbers, so we can get actual numbers in the error
message when they are not equal, not some uint64 representation. This
also uses InDelta for comparing floats.

* Ensure that Observers are registered in the same order

* Run observers in fixed order

So the tests can be reproducible - iterating a map made the order of
measurements random.

* Ensure the proper alignment of the delegates

This wasn't checked at all. After adding the checks, the test-386
failed.

* Small tweaks to the global meter test

* Ensure proper alignment of the callback pointer

test-386 was complaining about it

* update docs

* update a TODO

* address review issues

* drop SetCallback

Co-authored-by: Mauricio Vásquez <[email protected]>
Co-authored-by: Rahul Patel <[email protected]>
  • Loading branch information
3 people authored Mar 5, 2020
1 parent 547d584 commit a202f16
Show file tree
Hide file tree
Showing 16 changed files with 1,074 additions and 126 deletions.
26 changes: 26 additions & 0 deletions api/global/internal/internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package internal_test

import (
"os"
"testing"

"go.opentelemetry.io/otel/api/global/internal"
ottest "go.opentelemetry.io/otel/internal/testing"
)

// Ensure struct alignment prior to running tests.
func TestMain(m *testing.M) {
fieldsMap := internal.AtomicFieldOffsets()
fields := make([]ottest.FieldOffset, 0, len(fieldsMap))
for name, offset := range fieldsMap {
fields = append(fields, ottest.FieldOffset{
Name: name,
Offset: offset,
})
}
if !ottest.Aligned8Byte(fields, os.Stderr) {
os.Exit(1)
}

os.Exit(m.Run())
}
184 changes: 175 additions & 9 deletions api/global/internal/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,44 +40,76 @@ const (
)

type meterProvider struct {
lock sync.Mutex
meters []*meter
delegate metric.Provider

lock sync.Mutex
meters []*meter
}

type meter struct {
delegate unsafe.Pointer // (*metric.Meter)

provider *meterProvider
name string

lock sync.Mutex
instruments []*instImpl

delegate unsafe.Pointer // (*metric.Meter)
lock sync.Mutex
instruments []*instImpl
liveObservers map[*obsImpl]struct{}
// orderedObservers slice contains observers in their order of
// registration. It may also contain unregistered
// observers. The liveObservers map should be consulted to
// check if the observer is registered or not.
orderedObservers []*obsImpl
}

type instImpl struct {
delegate unsafe.Pointer // (*metric.InstrumentImpl)

name string
mkind metricKind
nkind core.NumberKind
opts interface{}
}

delegate unsafe.Pointer // (*metric.InstrumentImpl)
type obsImpl struct {
delegate unsafe.Pointer // (*metric.Int64Observer or *metric.Float64Observer)

name string
nkind core.NumberKind
opts []metric.ObserverOptionApplier
meter *meter
callback interface{}
}

type int64ObsImpl struct {
observer *obsImpl
}

type float64ObsImpl struct {
observer *obsImpl
}

// this is a common subset of the metric observers interfaces
type observerUnregister interface {
Unregister()
}

type labelSet struct {
delegate unsafe.Pointer // (* metric.LabelSet)

meter *meter
value []core.KeyValue

initialize sync.Once
delegate unsafe.Pointer // (* metric.LabelSet)
}

type instHandle struct {
delegate unsafe.Pointer // (*metric.HandleImpl)

inst *instImpl
labels metric.LabelSet

initialize sync.Once
delegate unsafe.Pointer // (*metric.HandleImpl)
}

var _ metric.Provider = &meterProvider{}
Expand All @@ -86,6 +118,10 @@ var _ metric.LabelSet = &labelSet{}
var _ metric.LabelSetDelegate = &labelSet{}
var _ metric.InstrumentImpl = &instImpl{}
var _ metric.BoundInstrumentImpl = &instHandle{}
var _ metric.Int64Observer = int64ObsImpl{}
var _ metric.Float64Observer = float64ObsImpl{}
var _ observerUnregister = (metric.Int64Observer)(nil)
var _ observerUnregister = (metric.Float64Observer)(nil)

// Provider interface and delegation

Expand Down Expand Up @@ -130,6 +166,13 @@ func (m *meter) setDelegate(provider metric.Provider) {
inst.setDelegate(*d)
}
m.instruments = nil
for _, obs := range m.orderedObservers {
if _, ok := m.liveObservers[obs]; ok {
obs.setDelegate(*d)
}
}
m.liveObservers = nil
m.orderedObservers = nil
}

func (m *meter) newInst(name string, mkind metricKind, nkind core.NumberKind, opts interface{}) metric.InstrumentImpl {
Expand Down Expand Up @@ -203,6 +246,68 @@ func (bound *instHandle) Unbind() {
(*implPtr).Unbind()
}

// Any Observer delegation

func (obs *obsImpl) setDelegate(d metric.Meter) {
if obs.nkind == core.Int64NumberKind {
obs.setInt64Delegate(d)
} else {
obs.setFloat64Delegate(d)
}
}

func (obs *obsImpl) unregister() {
unreg := obs.getUnregister()
if unreg != nil {
unreg.Unregister()
return
}
obs.meter.lock.Lock()
defer obs.meter.lock.Unlock()
delete(obs.meter.liveObservers, obs)
if len(obs.meter.liveObservers) == 0 {
obs.meter.liveObservers = nil
obs.meter.orderedObservers = nil
}
}

func (obs *obsImpl) getUnregister() observerUnregister {
ptr := atomic.LoadPointer(&obs.delegate)
if ptr == nil {
return nil
}
if obs.nkind == core.Int64NumberKind {
return *(*metric.Int64Observer)(ptr)
}
return *(*metric.Float64Observer)(ptr)
}

// Int64Observer delegation

func (obs *obsImpl) setInt64Delegate(d metric.Meter) {
obsPtr := new(metric.Int64Observer)
cb := obs.callback.(metric.Int64ObserverCallback)
*obsPtr = d.RegisterInt64Observer(obs.name, cb, obs.opts...)
atomic.StorePointer(&obs.delegate, unsafe.Pointer(obsPtr))
}

func (obs int64ObsImpl) Unregister() {
obs.observer.unregister()
}

// Float64Observer delegation

func (obs *obsImpl) setFloat64Delegate(d metric.Meter) {
obsPtr := new(metric.Float64Observer)
cb := obs.callback.(metric.Float64ObserverCallback)
*obsPtr = d.RegisterFloat64Observer(obs.name, cb, obs.opts...)
atomic.StorePointer(&obs.delegate, unsafe.Pointer(obsPtr))
}

func (obs float64ObsImpl) Unregister() {
obs.observer.unregister()
}

// Metric updates

func (m *meter) RecordBatch(ctx context.Context, labels metric.LabelSet, measurements ...metric.Measurement) {
Expand Down Expand Up @@ -296,3 +401,64 @@ func (m *meter) NewInt64Measure(name string, opts ...metric.MeasureOptionApplier
func (m *meter) NewFloat64Measure(name string, opts ...metric.MeasureOptionApplier) metric.Float64Measure {
return metric.WrapFloat64MeasureInstrument(m.newInst(name, measureKind, core.Float64NumberKind, opts))
}

func (m *meter) RegisterInt64Observer(name string, callback metric.Int64ObserverCallback, oos ...metric.ObserverOptionApplier) metric.Int64Observer {
m.lock.Lock()
defer m.lock.Unlock()

if meterPtr := (*metric.Meter)(atomic.LoadPointer(&m.delegate)); meterPtr != nil {
return (*meterPtr).RegisterInt64Observer(name, callback, oos...)
}

obs := &obsImpl{
name: name,
nkind: core.Int64NumberKind,
opts: oos,
meter: m,
callback: callback,
}
m.addObserver(obs)
return int64ObsImpl{
observer: obs,
}
}

func (m *meter) RegisterFloat64Observer(name string, callback metric.Float64ObserverCallback, oos ...metric.ObserverOptionApplier) metric.Float64Observer {
m.lock.Lock()
defer m.lock.Unlock()

if meterPtr := (*metric.Meter)(atomic.LoadPointer(&m.delegate)); meterPtr != nil {
return (*meterPtr).RegisterFloat64Observer(name, callback, oos...)
}

obs := &obsImpl{
name: name,
nkind: core.Float64NumberKind,
opts: oos,
meter: m,
callback: callback,
}
m.addObserver(obs)
return float64ObsImpl{
observer: obs,
}
}

func (m *meter) addObserver(obs *obsImpl) {
if m.liveObservers == nil {
m.liveObservers = make(map[*obsImpl]struct{})
}
m.liveObservers[obs] = struct{}{}
m.orderedObservers = append(m.orderedObservers, obs)
}

func AtomicFieldOffsets() map[string]uintptr {
return map[string]uintptr{
"meterProvider.delegate": unsafe.Offsetof(meterProvider{}.delegate),
"meter.delegate": unsafe.Offsetof(meter{}.delegate),
"instImpl.delegate": unsafe.Offsetof(instImpl{}.delegate),
"obsImpl.delegate": unsafe.Offsetof(obsImpl{}.delegate),
"labelSet.delegate": unsafe.Offsetof(labelSet{}.delegate),
"instHandle.delegate": unsafe.Offsetof(instHandle{}.delegate),
}
}
Loading

0 comments on commit a202f16

Please sign in to comment.