From 227f1aea51c7c51da767059158d8ec7a7e791bf0 Mon Sep 17 00:00:00 2001 From: beanliu Date: Mon, 18 Jan 2021 10:33:40 +1100 Subject: [PATCH] support the Metrics View API --- sdk/metric/correct_test.go | 73 +++++++++++++++ sdk/metric/sdk.go | 183 ++++++++++++++++++++++++++++++++----- 2 files changed, 233 insertions(+), 23 deletions(-) diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index 09f0b1130b66..33b475b26079 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -30,6 +30,7 @@ import ( "go.opentelemetry.io/otel/metric/number" export "go.opentelemetry.io/otel/sdk/export/metric" metricsdk "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/processor/processortest" "go.opentelemetry.io/otel/sdk/resource" ) @@ -89,6 +90,13 @@ func (ts *testSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*metri processortest.AggregatorSelector().AggregatorFor(desc, aggPtrs...) } +type testAggregatorFactory struct { +} + +func (t *testAggregatorFactory) NewInstance() metric.Aggregator { + return &lastvalue.New(1)[0] +} + func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessProcessor) { testHandler.Reset() processor := &correctnessProcessor{ @@ -174,6 +182,71 @@ func TestInputRangeValueRecorder(t *testing.T) { require.Nil(t, err) } +func TestViewDropAllLabels(t *testing.T) { + ctx := context.Background() + meter, sdk, processor := newSDK(t) + + valuerecorder := Must(meter).NewFloat64ValueRecorder("name.exact") + meter.RegisterView(metric.NewView(valuerecorder.SyncImpl(), metric.DropAll, nil, nil)) + + valuerecorder.Record(ctx, 1.0, label.String("A", "a")) + + checkpointed := sdk.Collect(ctx) + require.Equal(t, 1, checkpointed) + + require.Equal(t, 0, processor.accumulations[0].Labels().Len()) +} + +func TestViewUngroupLabels(t *testing.T) { + ctx := context.Background() + meter, sdk, processor := newSDK(t) + + valuerecorder := Must(meter).NewFloat64ValueRecorder("name.exact") + meter.RegisterView(metric.NewView(valuerecorder.SyncImpl(), metric.Ungroup, nil, nil)) + + valuerecorder.Record(ctx, 1.0, label.String("A", "a")) + + checkpointed := sdk.Collect(ctx) + require.Equal(t, 1, checkpointed) + + labels := processor.accumulations[0].Labels().ToSlice() + require.ElementsMatch(t, labels, []label.KeyValue{label.String("A", "a")}) +} + +func TestViewLabelKeys(t *testing.T) { + ctx := context.Background() + meter, sdk, processor := newSDK(t) + + valuerecorder := Must(meter).NewFloat64ValueRecorder("name.exact") + meter.RegisterView(metric.NewView(valuerecorder.SyncImpl(), metric.LabelKeys, []label.Key{"A", "B"}, nil)) + + valuerecorder.Record(ctx, 1.0, label.String("A", "a")) + + checkpointed := sdk.Collect(ctx) + require.Equal(t, 1, checkpointed) + + labels := processor.accumulations[0].Labels() + expected := label.NewSet(label.String("A", "a"), label.Empty("B")) + require.True(t, labels.Equals(&expected)) +} + +func TestViewCustomAggregator(t *testing.T) { + ctx := context.Background() + meter, sdk, processor := newSDK(t) + + valuerecorder := Must(meter).NewFloat64ValueRecorder("name.exact") + meter.RegisterView(metric.NewView(valuerecorder.SyncImpl(), metric.Ungroup, nil, &testAggregatorFactory{})) + + valuerecorder.Record(ctx, 1.0, label.String("A", "a")) + valuerecorder.Record(ctx, 2.0, label.String("A", "a")) + + checkpointed := sdk.Collect(ctx) + require.Equal(t, 1, checkpointed) + + agg := processor.accumulations[0].Aggregator() + require.Equal(t, aggregation.LastValueKind, agg.Aggregation().Kind()) +} + func TestDisabledInstrument(t *testing.T) { ctx := context.Background() meter, sdk, processor := newSDK(t) diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index d2cc369b8c8b..83e7a0187ad8 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -18,6 +18,8 @@ import ( "context" "fmt" "runtime" + "sort" + "strings" "sync" "sync/atomic" @@ -71,6 +73,21 @@ type ( syncInstrument struct { instrument + + // views is a logical set of registered views for this instrument. + views sync.Map + } + + // syncInstrumentViewKey is used only in syncInstrument as a key representation of a view. + syncInstrumentViewKey struct { + aggregatorFactory metric.AggregatorFactory + labelConfig metric.ViewLabelConfig + labelKeysSignature string + } + + // syncInstrumentViewValue is used only in syncInstrument as a value representation of a view. + syncInstrumentViewValue struct { + labelKeys map[label.Key]struct{} } // mapkey uniquely describes a metric instrument in terms of @@ -80,6 +97,11 @@ type ( ordered label.Distinct } + // boundInstrument is a snapshot of the view data for a given instrument. + boundInstrument struct { + records []*record + } + // record maintains the state of one metric instrument. Due // the use of lock-free algorithms, there may be more than one // `record` in existence at a time, although at most one can @@ -115,6 +137,9 @@ type ( // inst is a pointer to the corresponding instrument. inst *syncInstrument + // aggregatorFactory is a reference to the corresponding view's aggregator factory. + aggregatorFactory metric.AggregatorFactory + // current implements the actual RecordOne() API, // depending on the type of aggregation. If nil, the // metric was disabled by the exporter. @@ -146,6 +171,7 @@ var ( _ metric.AsyncImpl = &asyncInstrument{} _ metric.SyncImpl = &syncInstrument{} _ metric.BoundSyncImpl = &record{} + _ metric.BoundSyncImpl = &boundInstrument{} ErrUninitializedInstrument = fmt.Errorf("use of an uninitialized instrument") ) @@ -158,6 +184,99 @@ func (a *asyncInstrument) Implementation() interface{} { return a } +func (s *syncInstrument) registerView(labelConfig metric.ViewLabelConfig, labelKeys []label.Key, factory metric.AggregatorFactory) { + key, val := newSyncInstrumentView(labelConfig, labelKeys, factory) + + if _, loaded := s.views.LoadOrStore(key, val); loaded { + // Duplicated view, first-write-win so ignore here. + // maybe return/record an error? + } +} + +func (s *syncInstrument) rangeViews(kvs []label.KeyValue, f func([]label.KeyValue, metric.AggregatorFactory)) { + empty := true + s.views.Range(func(key, value interface{}) bool { + empty = false + return false + }) + if empty { + s.registerView(metric.Ungroup, nil, nil) + } + + // compute on need + var labelKvs map[label.Key]label.KeyValue + + s.views.Range(func(key, value interface{}) bool { + viewKey := key.(*syncInstrumentViewKey) + viewValue := value.(*syncInstrumentViewValue) + + var viewLabelKeyValues []label.KeyValue + switch viewKey.labelConfig { + case metric.Ungroup: + viewLabelKeyValues = kvs + + case metric.LabelKeys: + if labelKvs == nil { + labelKvs = map[label.Key]label.KeyValue{} + for _, kv := range kvs { + labelKvs[kv.Key] = kv + } + } + for k, kv := range labelKvs { + if _, p := viewValue.labelKeys[k]; p { + viewLabelKeyValues = append(viewLabelKeyValues, kv) + } + } + for k := range viewValue.labelKeys { + if _, p := labelKvs[k]; !p { + viewLabelKeyValues = append(viewLabelKeyValues, k.Empty()) + } + } + + case metric.DropAll: + // keep nil + default: + panic(fmt.Sprintf("impossible label config: %s", viewKey.labelConfig)) + } + + f(viewLabelKeyValues, viewKey.aggregatorFactory) + return true + }) +} + +// newSyncInstrumentView returns a view key/value representation for the given parameters. +func newSyncInstrumentView(labelConfig metric.ViewLabelConfig, labelKeys []label.Key, factory metric.AggregatorFactory) (*syncInstrumentViewKey, *syncInstrumentViewValue) { + key := &syncInstrumentViewKey{ + aggregatorFactory: factory, + labelConfig: labelConfig, + labelKeysSignature: "", + } + value := &syncInstrumentViewValue{ + labelKeys: nil, + } + + if labelConfig == metric.LabelKeys { + if len(labelKeys) > 0 { + value.labelKeys = map[label.Key]struct{}{} + for _, k := range labelKeys { + value.labelKeys[k] = struct{}{} + } + + keys := make([]string, 0, len(value.labelKeys)) + for k := range value.labelKeys { + keys = append(keys, string(k)) + } + sort.Strings(keys) + key.labelKeysSignature = strings.Join(keys, "|") + } else { + // make sure there is only one representation for aggregations without any labels + key.labelConfig = metric.DropAll + } + } + + return key, value +} + func (s *syncInstrument) Implementation() interface{} { return s } @@ -209,7 +328,7 @@ func (a *asyncInstrument) getRecorder(labels *label.Set) metric.Aggregator { // support re-use of the orderedLabels computed by a previous // measurement in the same batch. This performs two allocations // in the common case. -func (s *syncInstrument) acquireHandle(kvs []label.KeyValue, labelPtr *label.Set) *record { +func (s *syncInstrument) acquireHandle(kvs []label.KeyValue, labelPtr *label.Set, factory metric.AggregatorFactory) *record { var rec *record var equiv label.Distinct @@ -220,6 +339,7 @@ func (s *syncInstrument) acquireHandle(kvs []label.KeyValue, labelPtr *label.Set rec = &record{} rec.storage = label.NewSetWithSortable(kvs, &rec.sortSlice) rec.labels = &rec.storage + rec.aggregatorFactory = factory equiv = rec.storage.Equivalent() } else { equiv = labelPtr.Equivalent() @@ -246,11 +366,17 @@ func (s *syncInstrument) acquireHandle(kvs []label.KeyValue, labelPtr *label.Set if rec == nil { rec = &record{} rec.labels = labelPtr + rec.aggregatorFactory = factory } rec.refMapped = refcountMapped{value: 2} rec.inst = s - s.meter.processor.AggregatorFor(&s.descriptor, &rec.current, &rec.checkpoint) + if factory != nil { + rec.current = factory.NewInstance() + rec.checkpoint = factory.NewInstance() + } else { + s.meter.processor.AggregatorFor(&s.descriptor, &rec.current, &rec.checkpoint) + } for { // Load/Store: there's a memory allocation to place `mk` into @@ -283,13 +409,19 @@ func (s *syncInstrument) acquireHandle(kvs []label.KeyValue, labelPtr *label.Set } func (s *syncInstrument) Bind(kvs []label.KeyValue) metric.BoundSyncImpl { - return s.acquireHandle(kvs, nil) + result := &boundInstrument{} + s.rangeViews(kvs, func(viewKvs []label.KeyValue, factory metric.AggregatorFactory) { + result.records = append(result.records, s.acquireHandle(viewKvs, nil, factory)) + }) + return result } func (s *syncInstrument) RecordOne(ctx context.Context, num number.Number, kvs []label.KeyValue) { - h := s.acquireHandle(kvs, nil) - defer h.Unbind() - h.RecordOne(ctx, num) + s.rangeViews(kvs, func(viewKvs []label.KeyValue, factory metric.AggregatorFactory) { + h := s.acquireHandle(viewKvs, nil, factory) + h.RecordOne(ctx, num) + h.Unbind() + }) } // NewAccumulator constructs a new Accumulator for the given @@ -310,7 +442,13 @@ func NewAccumulator(processor export.Processor, resource *resource.Resource) *Ac } func (m *Accumulator) RegisterView(v metric.View) { - panic("me") + s := m.fromSync(v.SyncImpl()) + if s == nil { + // maybe return an erros here? + return + } + + s.registerView(v.LabelConfig(), v.LabelKeys(), v.AggregatorFactory()) } // NewSyncInstrument implements metric.MetricImpl. @@ -437,7 +575,7 @@ func (m *Accumulator) checkpointRecord(r *record) int { return 0 } - a := export.NewAccumulation(&r.inst.descriptor, r.labels, m.resource, r.checkpoint, nil) + a := export.NewAccumulation(&r.inst.descriptor, r.labels, m.resource, r.checkpoint, r.aggregatorFactory) err = m.processor.Process(a) if err != nil { otel.Handle(err) @@ -477,25 +615,12 @@ func (m *Accumulator) checkpointAsync(a *asyncInstrument) int { // RecordBatch enters a batch of metric events. func (m *Accumulator) RecordBatch(ctx context.Context, kvs []label.KeyValue, measurements ...metric.Measurement) { - // Labels will be computed the first time acquireHandle is - // called. Subsequent calls to acquireHandle will re-use the - // previously computed value instead of recomputing the - // ordered labels. - var labelsPtr *label.Set - for i, meas := range measurements { + for _, meas := range measurements { s := m.fromSync(meas.SyncImpl()) if s == nil { continue } - h := s.acquireHandle(kvs, labelsPtr) - - // Re-use labels for the next measurement. - if i == 0 { - labelsPtr = h.labels - } - - defer h.Unbind() - h.RecordOne(ctx, meas.Number()) + s.RecordOne(ctx, meas.Number(), kvs) } } @@ -530,6 +655,18 @@ func (r *record) mapkey() mapkey { } } +func (b *boundInstrument) RecordOne(ctx context.Context, number number.Number) { + for _, r := range b.records { + r.RecordOne(ctx, number) + } +} + +func (b *boundInstrument) Unbind() { + for _, r := range b.records { + r.Unbind() + } +} + // fromSync gets a sync implementation object, checking for // uninitialized instruments and instruments created by another SDK. func (m *Accumulator) fromSync(sync metric.SyncImpl) *syncInstrument {