Skip to content

Commit

Permalink
Add the UpDownSumObserver instrument (#750)
Browse files Browse the repository at this point in the history
* Add the UpDownSumObserver instrument

* Precommit

* Downcase error message
  • Loading branch information
jmacd authored May 20, 2020
1 parent 244ed23 commit 15e8edd
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 20 deletions.
22 changes: 22 additions & 0 deletions api/metric/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,28 @@ func TestObserverInstruments(t *testing.T) {
-142,
)
})
t.Run("float updownsumobserver", func(t *testing.T) {
labels := []kv.KeyValue{kv.String("O", "P")}
mockSDK, meter := mockTest.NewMeter()
o := Must(meter).RegisterFloat64UpDownSumObserver("test.updownsumobserver.float", func(_ context.Context, result metric.Float64ObserverResult) {
result.Observe(42.1, labels...)
})
mockSDK.RunAsyncInstruments()
checkObserverBatch(t, labels, mockSDK, metric.Float64NumberKind, metric.UpDownSumObserverKind, o.AsyncImpl(),
42.1,
)
})
t.Run("int updownsumobserver", func(t *testing.T) {
labels := []kv.KeyValue{}
mockSDK, meter := mockTest.NewMeter()
o := Must(meter).RegisterInt64UpDownSumObserver("test.observer.int", func(_ context.Context, result metric.Int64ObserverResult) {
result.Observe(-142, labels...)
})
mockSDK.RunAsyncInstruments()
checkObserverBatch(t, labels, mockSDK, metric.Int64NumberKind, metric.UpDownSumObserverKind, o.AsyncImpl(),
-142,
)
})
}

func checkSyncBatches(t *testing.T, ctx context.Context, labels []kv.KeyValue, mock *mockTest.MeterImpl, nkind metric.NumberKind, mkind metric.Kind, instrument metric.InstrumentImpl, expected ...float64) {
Expand Down
12 changes: 12 additions & 0 deletions api/metric/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,15 @@ func wrapFloat64SumObserverInstrument(asyncInst AsyncImpl, err error) (Float64Su
common, err := checkNewAsync(asyncInst, err)
return Float64SumObserver{asyncInstrument: common}, err
}

// wrapInt64UpDownSumObserverInstrument converts an AsyncImpl into Int64UpDownSumObserver.
func wrapInt64UpDownSumObserverInstrument(asyncInst AsyncImpl, err error) (Int64UpDownSumObserver, error) {
common, err := checkNewAsync(asyncInst, err)
return Int64UpDownSumObserver{asyncInstrument: common}, err
}

// wrapFloat64UpDownSumObserverInstrument converts an AsyncImpl into Float64UpDownSumObserver.
func wrapFloat64UpDownSumObserverInstrument(asyncInst AsyncImpl, err error) (Float64UpDownSumObserver, error) {
common, err := checkNewAsync(asyncInst, err)
return Float64UpDownSumObserver{asyncInstrument: common}, err
}
2 changes: 2 additions & 0 deletions api/metric/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@ const (

// SumObserverKind indicates a SumObserver instrument.
SumObserverKind
// UpDownSumObserverKind indicates a UpDownSumObserver instrument.
UpDownSumObserverKind
)
5 changes: 3 additions & 2 deletions api/metric/kind_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 51 additions & 0 deletions api/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,32 @@ func (m Meter) RegisterFloat64SumObserver(name string, callback Float64ObserverC
newFloat64AsyncRunner(callback)))
}

// RegisterInt64UpDownSumObserver creates a new integer UpDownSumObserver instrument
// with the given name, running a given callback, and customized with
// options. May return an error if the name is invalid (e.g., empty)
// or improperly registered (e.g., duplicate registration).
func (m Meter) RegisterInt64UpDownSumObserver(name string, callback Int64ObserverCallback, opts ...Option) (Int64UpDownSumObserver, error) {
if callback == nil {
return wrapInt64UpDownSumObserverInstrument(NoopAsync{}, nil)
}
return wrapInt64UpDownSumObserverInstrument(
m.newAsync(name, UpDownSumObserverKind, Int64NumberKind, opts,
newInt64AsyncRunner(callback)))
}

// RegisterFloat64UpDownSumObserver creates a new floating point UpDownSumObserver with
// the given name, running a given callback, and customized with
// options. May return an error if the name is invalid (e.g., empty)
// or improperly registered (e.g., duplicate registration).
func (m Meter) RegisterFloat64UpDownSumObserver(name string, callback Float64ObserverCallback, opts ...Option) (Float64UpDownSumObserver, error) {
if callback == nil {
return wrapFloat64UpDownSumObserverInstrument(NoopAsync{}, nil)
}
return wrapFloat64UpDownSumObserverInstrument(
m.newAsync(name, UpDownSumObserverKind, Float64NumberKind, opts,
newFloat64AsyncRunner(callback)))
}

// RegisterInt64ValueObserver creates a new integer ValueObserver instrument
// with the given name, running in a batch callback, and customized with
// options. May return an error if the name is invalid (e.g., empty)
Expand Down Expand Up @@ -220,6 +246,31 @@ func (b BatchObserver) RegisterFloat64SumObserver(name string, opts ...Option) (
b.runner))
}

// RegisterInt64UpDownSumObserver creates a new integer UpDownSumObserver instrument
// with the given name, running in a batch callback, and customized with
// options. May return an error if the name is invalid (e.g., empty)
// or improperly registered (e.g., duplicate registration).
func (b BatchObserver) RegisterInt64UpDownSumObserver(name string, opts ...Option) (Int64UpDownSumObserver, error) {
if b.runner == nil {
return wrapInt64UpDownSumObserverInstrument(NoopAsync{}, nil)
}
return wrapInt64UpDownSumObserverInstrument(
b.meter.newAsync(name, UpDownSumObserverKind, Int64NumberKind, opts, b.runner))
}

// RegisterFloat64UpDownSumObserver creates a new floating point UpDownSumObserver with
// the given name, running in a batch callback, and customized with
// options. May return an error if the name is invalid (e.g., empty)
// or improperly registered (e.g., duplicate registration).
func (b BatchObserver) RegisterFloat64UpDownSumObserver(name string, opts ...Option) (Float64UpDownSumObserver, error) {
if b.runner == nil {
return wrapFloat64UpDownSumObserverInstrument(NoopAsync{}, nil)
}
return wrapFloat64UpDownSumObserverInstrument(
b.meter.newAsync(name, UpDownSumObserverKind, Float64NumberKind, opts,
b.runner))
}

// MeterImpl returns the underlying MeterImpl of this Meter.
func (m Meter) MeterImpl() MeterImpl {
return m.impl
Expand Down
40 changes: 40 additions & 0 deletions api/metric/must.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,26 @@ func (mm MeterMust) RegisterFloat64SumObserver(name string, callback Float64Obse
}
}

// RegisterInt64UpDownSumObserver calls `Meter.RegisterInt64UpDownSumObserver` and
// returns the instrument, panicking if it encounters an error.
func (mm MeterMust) RegisterInt64UpDownSumObserver(name string, callback Int64ObserverCallback, oos ...Option) Int64UpDownSumObserver {
if inst, err := mm.meter.RegisterInt64UpDownSumObserver(name, callback, oos...); err != nil {
panic(err)
} else {
return inst
}
}

// RegisterFloat64UpDownSumObserver calls `Meter.RegisterFloat64UpDownSumObserver` and
// returns the instrument, panicking if it encounters an error.
func (mm MeterMust) RegisterFloat64UpDownSumObserver(name string, callback Float64ObserverCallback, oos ...Option) Float64UpDownSumObserver {
if inst, err := mm.meter.RegisterFloat64UpDownSumObserver(name, callback, oos...); err != nil {
panic(err)
} else {
return inst
}
}

// NewBatchObserver returns a wrapper around BatchObserver that panics
// when any instrument constructor returns an error.
func (mm MeterMust) NewBatchObserver(callback BatchObserverCallback) BatchObserverMust {
Expand Down Expand Up @@ -180,3 +200,23 @@ func (bm BatchObserverMust) RegisterFloat64SumObserver(name string, oos ...Optio
return inst
}
}

// RegisterInt64UpDownSumObserver calls `BatchObserver.RegisterInt64UpDownSumObserver` and
// returns the instrument, panicking if it encounters an error.
func (bm BatchObserverMust) RegisterInt64UpDownSumObserver(name string, oos ...Option) Int64UpDownSumObserver {
if inst, err := bm.batch.RegisterInt64UpDownSumObserver(name, oos...); err != nil {
panic(err)
} else {
return inst
}
}

// RegisterFloat64UpDownSumObserver calls `BatchObserver.RegisterFloat64UpDownSumObserver` and
// returns the instrument, panicking if it encounters an error.
func (bm BatchObserverMust) RegisterFloat64UpDownSumObserver(name string, oos ...Option) Float64UpDownSumObserver {
if inst, err := bm.batch.RegisterFloat64UpDownSumObserver(name, oos...); err != nil {
panic(err)
} else {
return inst
}
}
34 changes: 34 additions & 0 deletions api/metric/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ type Float64SumObserver struct {
asyncInstrument
}

// Int64UpDownSumObserver is a metric that captures a precomputed sum of
// int64 values at a point in time.
type Int64UpDownSumObserver struct {
asyncInstrument
}

// Float64UpDownSumObserver is a metric that captures a precomputed sum of
// float64 values at a point in time.
type Float64UpDownSumObserver struct {
asyncInstrument
}

// Observation returns an Observation, a BatchObserverCallback
// argument, for an asynchronous integer instrument.
// This returns an implementation-level object for use by the SDK,
Expand Down Expand Up @@ -88,3 +100,25 @@ func (f Float64SumObserver) Observation(v float64) Observation {
instrument: f.instrument,
}
}

// Observation returns an Observation, a BatchObserverCallback
// argument, for an asynchronous integer instrument.
// This returns an implementation-level object for use by the SDK,
// users should not refer to this.
func (i Int64UpDownSumObserver) Observation(v int64) Observation {
return Observation{
number: NewInt64Number(v),
instrument: i.instrument,
}
}

// Observation returns an Observation, a BatchObserverCallback
// argument, for an asynchronous integer instrument.
// This returns an implementation-level object for use by the SDK,
// users should not refer to this.
func (f Float64UpDownSumObserver) Observation(v float64) Observation {
return Observation{
number: NewFloat64Number(v),
instrument: f.instrument,
}
}
91 changes: 79 additions & 12 deletions sdk/metric/correct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,27 +347,45 @@ func TestObserverCollection(t *testing.T) {
result.Observe(1)
})

_ = Must(meter).RegisterFloat64UpDownSumObserver("float.updownsumobserver", func(_ context.Context, result metric.Float64ObserverResult) {
result.Observe(1, kv.String("A", "B"))
result.Observe(-2, kv.String("A", "B"))
result.Observe(1, kv.String("C", "D"))
})
_ = Must(meter).RegisterInt64UpDownSumObserver("int.updownsumobserver", func(_ context.Context, result metric.Int64ObserverResult) {
result.Observe(2, kv.String("A", "B"))
result.Observe(1)
// last value wins
result.Observe(1, kv.String("A", "B"))
result.Observe(-1)
})

_ = Must(meter).RegisterInt64ValueObserver("empty.valueobserver", func(_ context.Context, result metric.Int64ObserverResult) {
})

collected := sdk.Collect(ctx)

require.Equal(t, 8, collected)
require.Equal(t, 8, len(integrator.records))
require.Equal(t, collected, len(integrator.records))

out := batchTest.NewOutput(label.DefaultEncoder())
for _, rec := range integrator.records {
_ = out.AddTo(rec)
}
require.EqualValues(t, map[string]float64{
"float.sumobserver/A=B/R=V": 2,
"float.sumobserver/C=D/R=V": 1,
"int.sumobserver//R=V": 1,
"int.sumobserver/A=B/R=V": 1,
"float.valueobserver/A=B/R=V": -1,
"float.valueobserver/C=D/R=V": -1,
"int.valueobserver//R=V": 1,
"int.valueobserver/A=B/R=V": 1,

"float.sumobserver/A=B/R=V": 2,
"float.sumobserver/C=D/R=V": 1,
"int.sumobserver//R=V": 1,
"int.sumobserver/A=B/R=V": 1,

"float.updownsumobserver/A=B/R=V": -2,
"float.updownsumobserver/C=D/R=V": 1,
"int.updownsumobserver//R=V": -1,
"int.updownsumobserver/A=B/R=V": 1,
}, out.Map)
}

Expand Down Expand Up @@ -405,6 +423,8 @@ func TestObserverBatch(t *testing.T) {
var intValueObs metric.Int64ValueObserver
var floatSumObs metric.Float64SumObserver
var intSumObs metric.Int64SumObserver
var floatUpDownSumObs metric.Float64UpDownSumObserver
var intUpDownSumObs metric.Int64UpDownSumObserver

var batch = Must(meter).NewBatchObserver(
func(_ context.Context, result metric.BatchObserverResult) {
Expand All @@ -418,41 +438,52 @@ func TestObserverBatch(t *testing.T) {
intValueObs.Observation(1),
floatSumObs.Observation(1000),
intSumObs.Observation(100),
floatUpDownSumObs.Observation(-1000),
intUpDownSumObs.Observation(-100),
)
result.Observe(
[]kv.KeyValue{
kv.String("C", "D"),
},
floatValueObs.Observation(-1),
floatSumObs.Observation(-1),
floatUpDownSumObs.Observation(-1),
)
result.Observe(
nil,
intValueObs.Observation(1),
intValueObs.Observation(1),
intSumObs.Observation(10),
floatSumObs.Observation(1.1),
intUpDownSumObs.Observation(10),
)
})
floatValueObs = batch.RegisterFloat64ValueObserver("float.valueobserver")
intValueObs = batch.RegisterInt64ValueObserver("int.valueobserver")
floatSumObs = batch.RegisterFloat64SumObserver("float.sumobserver")
intSumObs = batch.RegisterInt64SumObserver("int.sumobserver")
floatUpDownSumObs = batch.RegisterFloat64UpDownSumObserver("float.updownsumobserver")
intUpDownSumObs = batch.RegisterInt64UpDownSumObserver("int.updownsumobserver")

collected := sdk.Collect(ctx)

require.Equal(t, 8, collected)
require.Equal(t, 8, len(integrator.records))
require.Equal(t, collected, len(integrator.records))

out := batchTest.NewOutput(label.DefaultEncoder())
for _, rec := range integrator.records {
_ = out.AddTo(rec)
}
require.EqualValues(t, map[string]float64{
"float.sumobserver//R=V": 1.1,
"float.sumobserver/A=B/R=V": 1000,
"int.sumobserver//R=V": 10,
"int.sumobserver/A=B/R=V": 100,
"float.sumobserver//R=V": 1.1,
"float.sumobserver/A=B/R=V": 1000,
"int.sumobserver//R=V": 10,
"int.sumobserver/A=B/R=V": 100,

"int.updownsumobserver/A=B/R=V": -100,
"float.updownsumobserver/A=B/R=V": -1000,
"int.updownsumobserver//R=V": 10,
"float.updownsumobserver/C=D/R=V": -1,

"float.valueobserver/A=B/R=V": -1,
"float.valueobserver/C=D/R=V": -1,
"int.valueobserver//R=V": 1,
Expand Down Expand Up @@ -515,6 +546,42 @@ func TestRecordPersistence(t *testing.T) {
require.Equal(t, int64(2), integrator.newAggCount)
}

func TestIncorrectInstruments(t *testing.T) {
// The Batch observe/record APIs are susceptible to
// uninitialized instruments.
var counter metric.Int64Counter
var observer metric.Int64ValueObserver

ctx := context.Background()
meter, sdk, integrator := newSDK(t)

// Now try with uninitialized instruments.
meter.RecordBatch(ctx, nil, counter.Measurement(1))
meter.NewBatchObserver(func(_ context.Context, result metric.BatchObserverResult) {
result.Observe(nil, observer.Observation(1))
})

collected := sdk.Collect(ctx)
require.Equal(t, metricsdk.ErrUninitializedInstrument, integrator.sdkErr())
require.Equal(t, 0, collected)

// Now try with instruments from another SDK.
var noopMeter metric.Meter
counter = metric.Must(noopMeter).NewInt64Counter("counter")
observer = metric.Must(noopMeter).NewBatchObserver(
func(context.Context, metric.BatchObserverResult) {},
).RegisterInt64ValueObserver("observer")

meter.RecordBatch(ctx, nil, counter.Measurement(1))
meter.NewBatchObserver(func(_ context.Context, result metric.BatchObserverResult) {
result.Observe(nil, observer.Observation(1))
})

collected = sdk.Collect(ctx)
require.Equal(t, 0, collected)
require.Equal(t, metricsdk.ErrUninitializedInstrument, integrator.sdkErr())
}

func TestSyncInAsync(t *testing.T) {
ctx := context.Background()
meter, sdk, integrator := newSDK(t)
Expand Down
Loading

0 comments on commit 15e8edd

Please sign in to comment.