Skip to content

Commit

Permalink
support the Metrics View API
Browse files Browse the repository at this point in the history
  • Loading branch information
beanliu committed Jan 17, 2021
1 parent 3a662b2 commit 227f1ae
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 23 deletions.
73 changes: 73 additions & 0 deletions sdk/metric/correct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.opentelemetry.io/otel/metric/number"
export "go.opentelemetry.io/otel/sdk/export/metric"
metricsdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/processor/processortest"
"go.opentelemetry.io/otel/sdk/resource"
)
Expand Down Expand Up @@ -89,6 +90,13 @@ func (ts *testSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*metri
processortest.AggregatorSelector().AggregatorFor(desc, aggPtrs...)
}

type testAggregatorFactory struct {
}

func (t *testAggregatorFactory) NewInstance() metric.Aggregator {
return &lastvalue.New(1)[0]
}

func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessProcessor) {
testHandler.Reset()
processor := &correctnessProcessor{
Expand Down Expand Up @@ -174,6 +182,71 @@ func TestInputRangeValueRecorder(t *testing.T) {
require.Nil(t, err)
}

func TestViewDropAllLabels(t *testing.T) {
ctx := context.Background()
meter, sdk, processor := newSDK(t)

valuerecorder := Must(meter).NewFloat64ValueRecorder("name.exact")
meter.RegisterView(metric.NewView(valuerecorder.SyncImpl(), metric.DropAll, nil, nil))

valuerecorder.Record(ctx, 1.0, label.String("A", "a"))

checkpointed := sdk.Collect(ctx)
require.Equal(t, 1, checkpointed)

require.Equal(t, 0, processor.accumulations[0].Labels().Len())
}

func TestViewUngroupLabels(t *testing.T) {
ctx := context.Background()
meter, sdk, processor := newSDK(t)

valuerecorder := Must(meter).NewFloat64ValueRecorder("name.exact")
meter.RegisterView(metric.NewView(valuerecorder.SyncImpl(), metric.Ungroup, nil, nil))

valuerecorder.Record(ctx, 1.0, label.String("A", "a"))

checkpointed := sdk.Collect(ctx)
require.Equal(t, 1, checkpointed)

labels := processor.accumulations[0].Labels().ToSlice()
require.ElementsMatch(t, labels, []label.KeyValue{label.String("A", "a")})
}

func TestViewLabelKeys(t *testing.T) {
ctx := context.Background()
meter, sdk, processor := newSDK(t)

valuerecorder := Must(meter).NewFloat64ValueRecorder("name.exact")
meter.RegisterView(metric.NewView(valuerecorder.SyncImpl(), metric.LabelKeys, []label.Key{"A", "B"}, nil))

valuerecorder.Record(ctx, 1.0, label.String("A", "a"))

checkpointed := sdk.Collect(ctx)
require.Equal(t, 1, checkpointed)

labels := processor.accumulations[0].Labels()
expected := label.NewSet(label.String("A", "a"), label.Empty("B"))
require.True(t, labels.Equals(&expected))
}

func TestViewCustomAggregator(t *testing.T) {
ctx := context.Background()
meter, sdk, processor := newSDK(t)

valuerecorder := Must(meter).NewFloat64ValueRecorder("name.exact")
meter.RegisterView(metric.NewView(valuerecorder.SyncImpl(), metric.Ungroup, nil, &testAggregatorFactory{}))

valuerecorder.Record(ctx, 1.0, label.String("A", "a"))
valuerecorder.Record(ctx, 2.0, label.String("A", "a"))

checkpointed := sdk.Collect(ctx)
require.Equal(t, 1, checkpointed)

agg := processor.accumulations[0].Aggregator()
require.Equal(t, aggregation.LastValueKind, agg.Aggregation().Kind())
}

func TestDisabledInstrument(t *testing.T) {
ctx := context.Background()
meter, sdk, processor := newSDK(t)
Expand Down
183 changes: 160 additions & 23 deletions sdk/metric/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"
"runtime"
"sort"
"strings"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -71,6 +73,21 @@ type (

syncInstrument struct {
instrument

// views is a logical set of registered views for this instrument.
views sync.Map
}

// syncInstrumentViewKey is used only in syncInstrument as a key representation of a view.
syncInstrumentViewKey struct {
aggregatorFactory metric.AggregatorFactory
labelConfig metric.ViewLabelConfig
labelKeysSignature string
}

// syncInstrumentViewValue is used only in syncInstrument as a value representation of a view.
syncInstrumentViewValue struct {
labelKeys map[label.Key]struct{}
}

// mapkey uniquely describes a metric instrument in terms of
Expand All @@ -80,6 +97,11 @@ type (
ordered label.Distinct
}

// boundInstrument is a snapshot of the view data for a given instrument.
boundInstrument struct {
records []*record
}

// record maintains the state of one metric instrument. Due
// the use of lock-free algorithms, there may be more than one
// `record` in existence at a time, although at most one can
Expand Down Expand Up @@ -115,6 +137,9 @@ type (
// inst is a pointer to the corresponding instrument.
inst *syncInstrument

// aggregatorFactory is a reference to the corresponding view's aggregator factory.
aggregatorFactory metric.AggregatorFactory

// current implements the actual RecordOne() API,
// depending on the type of aggregation. If nil, the
// metric was disabled by the exporter.
Expand Down Expand Up @@ -146,6 +171,7 @@ var (
_ metric.AsyncImpl = &asyncInstrument{}
_ metric.SyncImpl = &syncInstrument{}
_ metric.BoundSyncImpl = &record{}
_ metric.BoundSyncImpl = &boundInstrument{}

ErrUninitializedInstrument = fmt.Errorf("use of an uninitialized instrument")
)
Expand All @@ -158,6 +184,99 @@ func (a *asyncInstrument) Implementation() interface{} {
return a
}

func (s *syncInstrument) registerView(labelConfig metric.ViewLabelConfig, labelKeys []label.Key, factory metric.AggregatorFactory) {
key, val := newSyncInstrumentView(labelConfig, labelKeys, factory)

if _, loaded := s.views.LoadOrStore(key, val); loaded {
// Duplicated view, first-write-win so ignore here.
// maybe return/record an error?
}
}

func (s *syncInstrument) rangeViews(kvs []label.KeyValue, f func([]label.KeyValue, metric.AggregatorFactory)) {
empty := true
s.views.Range(func(key, value interface{}) bool {
empty = false
return false
})
if empty {
s.registerView(metric.Ungroup, nil, nil)
}

// compute on need
var labelKvs map[label.Key]label.KeyValue

s.views.Range(func(key, value interface{}) bool {
viewKey := key.(*syncInstrumentViewKey)
viewValue := value.(*syncInstrumentViewValue)

var viewLabelKeyValues []label.KeyValue
switch viewKey.labelConfig {
case metric.Ungroup:
viewLabelKeyValues = kvs

case metric.LabelKeys:
if labelKvs == nil {
labelKvs = map[label.Key]label.KeyValue{}
for _, kv := range kvs {
labelKvs[kv.Key] = kv
}
}
for k, kv := range labelKvs {
if _, p := viewValue.labelKeys[k]; p {
viewLabelKeyValues = append(viewLabelKeyValues, kv)
}
}
for k := range viewValue.labelKeys {
if _, p := labelKvs[k]; !p {
viewLabelKeyValues = append(viewLabelKeyValues, k.Empty())
}
}

case metric.DropAll:
// keep nil
default:
panic(fmt.Sprintf("impossible label config: %s", viewKey.labelConfig))
}

f(viewLabelKeyValues, viewKey.aggregatorFactory)
return true
})
}

// newSyncInstrumentView returns a view key/value representation for the given parameters.
func newSyncInstrumentView(labelConfig metric.ViewLabelConfig, labelKeys []label.Key, factory metric.AggregatorFactory) (*syncInstrumentViewKey, *syncInstrumentViewValue) {
key := &syncInstrumentViewKey{
aggregatorFactory: factory,
labelConfig: labelConfig,
labelKeysSignature: "",
}
value := &syncInstrumentViewValue{
labelKeys: nil,
}

if labelConfig == metric.LabelKeys {
if len(labelKeys) > 0 {
value.labelKeys = map[label.Key]struct{}{}
for _, k := range labelKeys {
value.labelKeys[k] = struct{}{}
}

keys := make([]string, 0, len(value.labelKeys))
for k := range value.labelKeys {
keys = append(keys, string(k))
}
sort.Strings(keys)
key.labelKeysSignature = strings.Join(keys, "|")
} else {
// make sure there is only one representation for aggregations without any labels
key.labelConfig = metric.DropAll
}
}

return key, value
}

func (s *syncInstrument) Implementation() interface{} {
return s
}
Expand Down Expand Up @@ -209,7 +328,7 @@ func (a *asyncInstrument) getRecorder(labels *label.Set) metric.Aggregator {
// support re-use of the orderedLabels computed by a previous
// measurement in the same batch. This performs two allocations
// in the common case.
func (s *syncInstrument) acquireHandle(kvs []label.KeyValue, labelPtr *label.Set) *record {
func (s *syncInstrument) acquireHandle(kvs []label.KeyValue, labelPtr *label.Set, factory metric.AggregatorFactory) *record {
var rec *record
var equiv label.Distinct

Expand All @@ -220,6 +339,7 @@ func (s *syncInstrument) acquireHandle(kvs []label.KeyValue, labelPtr *label.Set
rec = &record{}
rec.storage = label.NewSetWithSortable(kvs, &rec.sortSlice)
rec.labels = &rec.storage
rec.aggregatorFactory = factory
equiv = rec.storage.Equivalent()
} else {
equiv = labelPtr.Equivalent()
Expand All @@ -246,11 +366,17 @@ func (s *syncInstrument) acquireHandle(kvs []label.KeyValue, labelPtr *label.Set
if rec == nil {
rec = &record{}
rec.labels = labelPtr
rec.aggregatorFactory = factory
}
rec.refMapped = refcountMapped{value: 2}
rec.inst = s

s.meter.processor.AggregatorFor(&s.descriptor, &rec.current, &rec.checkpoint)
if factory != nil {
rec.current = factory.NewInstance()
rec.checkpoint = factory.NewInstance()
} else {
s.meter.processor.AggregatorFor(&s.descriptor, &rec.current, &rec.checkpoint)
}

for {
// Load/Store: there's a memory allocation to place `mk` into
Expand Down Expand Up @@ -283,13 +409,19 @@ func (s *syncInstrument) acquireHandle(kvs []label.KeyValue, labelPtr *label.Set
}

func (s *syncInstrument) Bind(kvs []label.KeyValue) metric.BoundSyncImpl {
return s.acquireHandle(kvs, nil)
result := &boundInstrument{}
s.rangeViews(kvs, func(viewKvs []label.KeyValue, factory metric.AggregatorFactory) {
result.records = append(result.records, s.acquireHandle(viewKvs, nil, factory))
})
return result
}

func (s *syncInstrument) RecordOne(ctx context.Context, num number.Number, kvs []label.KeyValue) {
h := s.acquireHandle(kvs, nil)
defer h.Unbind()
h.RecordOne(ctx, num)
s.rangeViews(kvs, func(viewKvs []label.KeyValue, factory metric.AggregatorFactory) {
h := s.acquireHandle(viewKvs, nil, factory)
h.RecordOne(ctx, num)
h.Unbind()
})
}

// NewAccumulator constructs a new Accumulator for the given
Expand All @@ -310,7 +442,13 @@ func NewAccumulator(processor export.Processor, resource *resource.Resource) *Ac
}

func (m *Accumulator) RegisterView(v metric.View) {
panic("me")
s := m.fromSync(v.SyncImpl())
if s == nil {
// maybe return an erros here?
return
}

s.registerView(v.LabelConfig(), v.LabelKeys(), v.AggregatorFactory())
}

// NewSyncInstrument implements metric.MetricImpl.
Expand Down Expand Up @@ -437,7 +575,7 @@ func (m *Accumulator) checkpointRecord(r *record) int {
return 0
}

a := export.NewAccumulation(&r.inst.descriptor, r.labels, m.resource, r.checkpoint, nil)
a := export.NewAccumulation(&r.inst.descriptor, r.labels, m.resource, r.checkpoint, r.aggregatorFactory)
err = m.processor.Process(a)
if err != nil {
otel.Handle(err)
Expand Down Expand Up @@ -477,25 +615,12 @@ func (m *Accumulator) checkpointAsync(a *asyncInstrument) int {

// RecordBatch enters a batch of metric events.
func (m *Accumulator) RecordBatch(ctx context.Context, kvs []label.KeyValue, measurements ...metric.Measurement) {
// Labels will be computed the first time acquireHandle is
// called. Subsequent calls to acquireHandle will re-use the
// previously computed value instead of recomputing the
// ordered labels.
var labelsPtr *label.Set
for i, meas := range measurements {
for _, meas := range measurements {
s := m.fromSync(meas.SyncImpl())
if s == nil {
continue
}
h := s.acquireHandle(kvs, labelsPtr)

// Re-use labels for the next measurement.
if i == 0 {
labelsPtr = h.labels
}

defer h.Unbind()
h.RecordOne(ctx, meas.Number())
s.RecordOne(ctx, meas.Number(), kvs)
}
}

Expand Down Expand Up @@ -530,6 +655,18 @@ func (r *record) mapkey() mapkey {
}
}

func (b *boundInstrument) RecordOne(ctx context.Context, number number.Number) {
for _, r := range b.records {
r.RecordOne(ctx, number)
}
}

func (b *boundInstrument) Unbind() {
for _, r := range b.records {
r.Unbind()
}
}

// fromSync gets a sync implementation object, checking for
// uninitialized instruments and instruments created by another SDK.
func (m *Accumulator) fromSync(sync metric.SyncImpl) *syncInstrument {
Expand Down

0 comments on commit 227f1ae

Please sign in to comment.