From f90057a4fabdb83f2c3cb810f02a4daf8a0375ca Mon Sep 17 00:00:00 2001 From: beanliu Date: Mon, 18 Jan 2021 10:31:50 +1100 Subject: [PATCH 1/9] move Metric Aggregator/Aggregation/AggretorSelector from otel/sdk/export/metric to otel/metric --- exporters/metric/prometheus/prometheus.go | 2 +- exporters/otlp/internal/transform/metric.go | 2 +- .../otlp/internal/transform/metric_test.go | 8 +- exporters/otlp/otlp.go | 2 +- exporters/otlp/otlp_metric_test.go | 4 +- exporters/stdout/metric.go | 2 +- exporters/stdout/metric_test.go | 2 +- .../aggregation/aggregation.go | 0 metric/metric_aggregator.go | 108 ++++++++++++++++++ sdk/export/metric/exportkind_test.go | 2 +- sdk/export/metric/metric.go | 99 +--------------- sdk/export/metric/metrictest/test.go | 18 +-- sdk/metric/aggregator/aggregator.go | 5 +- sdk/metric/aggregator/aggregator_test.go | 2 +- sdk/metric/aggregator/aggregatortest/test.go | 15 ++- sdk/metric/aggregator/exact/exact.go | 9 +- sdk/metric/aggregator/exact/exact_test.go | 5 +- sdk/metric/aggregator/histogram/histogram.go | 9 +- .../aggregator/histogram/histogram_test.go | 3 +- sdk/metric/aggregator/lastvalue/lastvalue.go | 9 +- .../aggregator/lastvalue/lastvalue_test.go | 7 +- sdk/metric/aggregator/minmaxsumcount/mmsc.go | 9 +- .../aggregator/minmaxsumcount/mmsc_test.go | 5 +- sdk/metric/aggregator/sum/sum.go | 10 +- sdk/metric/aggregator/sum/sum_test.go | 3 +- sdk/metric/benchmark_test.go | 2 +- .../controller/basic/controller_test.go | 2 +- sdk/metric/controller/basic/push_test.go | 2 +- sdk/metric/correct_test.go | 6 +- sdk/metric/processor/basic/basic.go | 12 +- sdk/metric/processor/basic/basic_test.go | 8 +- sdk/metric/processor/processortest/test.go | 16 +-- sdk/metric/sdk.go | 10 +- sdk/metric/selector/simple/simple.go | 23 ++-- sdk/metric/selector/simple/simple_test.go | 7 +- sdk/metric/stress_test.go | 4 +- 36 files changed, 220 insertions(+), 212 deletions(-) rename {sdk/export/metric => metric}/aggregation/aggregation.go (100%) create mode 100644 metric/metric_aggregator.go diff --git a/exporters/metric/prometheus/prometheus.go b/exporters/metric/prometheus/prometheus.go index e74f6524737..8c966d99b76 100644 --- a/exporters/metric/prometheus/prometheus.go +++ b/exporters/metric/prometheus/prometheus.go @@ -29,9 +29,9 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/label" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" "go.opentelemetry.io/otel/metric/number" export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" diff --git a/exporters/otlp/internal/transform/metric.go b/exporters/otlp/internal/transform/metric.go index ca913457950..f041a5f08e1 100644 --- a/exporters/otlp/internal/transform/metric.go +++ b/exporters/otlp/internal/transform/metric.go @@ -29,9 +29,9 @@ import ( resourcepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/resource/v1" "go.opentelemetry.io/otel/label" + "go.opentelemetry.io/otel/metric/aggregation" "go.opentelemetry.io/otel/metric/number" export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/resource" ) diff --git a/exporters/otlp/internal/transform/metric_test.go b/exporters/otlp/internal/transform/metric_test.go index a6698666550..8b85c709db6 100644 --- a/exporters/otlp/internal/transform/metric_test.go +++ b/exporters/otlp/internal/transform/metric_test.go @@ -28,9 +28,9 @@ import ( metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1" "go.opentelemetry.io/otel/label" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" "go.opentelemetry.io/otel/metric/number" export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/export/metric/metrictest" arrAgg "go.opentelemetry.io/otel/sdk/metric/aggregator/exact" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" @@ -328,10 +328,10 @@ func (t *testAgg) Aggregation() aggregation.Aggregation { func (t *testAgg) Update(ctx context.Context, number number.Number, descriptor *metric.Descriptor) error { return nil } -func (t *testAgg) SynchronizedMove(destination export.Aggregator, descriptor *metric.Descriptor) error { +func (t *testAgg) SynchronizedMove(destination metric.Aggregator, descriptor *metric.Descriptor) error { return nil } -func (t *testAgg) Merge(aggregator export.Aggregator, descriptor *metric.Descriptor) error { +func (t *testAgg) Merge(aggregator metric.Aggregator, descriptor *metric.Descriptor) error { return nil } @@ -373,7 +373,7 @@ func (te *testErrMinMaxSumCount) Count() (uint64, error) { return 0, te.err } -var _ export.Aggregator = &testAgg{} +var _ metric.Aggregator = &testAgg{} var _ aggregation.Aggregation = &testAgg{} var _ aggregation.Sum = &testErrSum{} var _ aggregation.LastValue = &testErrLastValue{} diff --git a/exporters/otlp/otlp.go b/exporters/otlp/otlp.go index 3380d6c2933..6bc488fe5d0 100644 --- a/exporters/otlp/otlp.go +++ b/exporters/otlp/otlp.go @@ -20,8 +20,8 @@ import ( "sync" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" metricsdk "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" tracesdk "go.opentelemetry.io/otel/sdk/export/trace" ) diff --git a/exporters/otlp/otlp_metric_test.go b/exporters/otlp/otlp_metric_test.go index 3da92f098ab..59598a7bfa5 100644 --- a/exporters/otlp/otlp_metric_test.go +++ b/exporters/otlp/otlp_metric_test.go @@ -29,9 +29,9 @@ import ( resourcepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/resource/v1" "go.opentelemetry.io/otel/label" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" "go.opentelemetry.io/otel/metric/number" metricsdk "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/export/metric/metrictest" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" @@ -752,7 +752,7 @@ func runMetricExportTests(t *testing.T, opts []otlp.ExporterOption, rs []record, desc := metric.NewDescriptor(r.name, r.iKind, r.nKind, r.opts...) labs := label.NewSet(lcopy...) - var agg, ckpt metricsdk.Aggregator + var agg, ckpt metric.Aggregator if r.iKind.Adding() { agg, ckpt = metrictest.Unslice2(sum.New(2)) } else { diff --git a/exporters/stdout/metric.go b/exporters/stdout/metric.go index 9b183a0022d..5d640c44083 100644 --- a/exporters/stdout/metric.go +++ b/exporters/stdout/metric.go @@ -23,8 +23,8 @@ import ( "go.opentelemetry.io/otel/label" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" exportmetric "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" ) type metricExporter struct { diff --git a/exporters/stdout/metric_test.go b/exporters/stdout/metric_test.go index d5ebcfca864..d134c7dab2a 100644 --- a/exporters/stdout/metric_test.go +++ b/exporters/stdout/metric_test.go @@ -212,7 +212,7 @@ func TestStdoutValueRecorderFormat(t *testing.T) { func TestStdoutNoData(t *testing.T) { desc := metric.NewDescriptor("test.name", metric.ValueRecorderInstrumentKind, number.Float64Kind) - runTwoAggs := func(agg, ckpt export.Aggregator) { + runTwoAggs := func(agg, ckpt metric.Aggregator) { t.Run(fmt.Sprintf("%T", agg), func(t *testing.T) { t.Parallel() diff --git a/sdk/export/metric/aggregation/aggregation.go b/metric/aggregation/aggregation.go similarity index 100% rename from sdk/export/metric/aggregation/aggregation.go rename to metric/aggregation/aggregation.go diff --git a/metric/metric_aggregator.go b/metric/metric_aggregator.go new file mode 100644 index 00000000000..21074bb62f4 --- /dev/null +++ b/metric/metric_aggregator.go @@ -0,0 +1,108 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric // import "go.opentelemetry.io/otel/metric" + +import ( + "context" + + "go.opentelemetry.io/otel/metric/aggregation" + "go.opentelemetry.io/otel/metric/number" +) + +// Aggregator implements a specific aggregation behavior, e.g., a +// behavior to track a sequence of updates to an instrument. Sum-only +// instruments commonly use a simple Sum aggregator, but for the +// distribution instruments (ValueRecorder, ValueObserver) there are a +// number of possible aggregators with different cost and accuracy +// tradeoffs. +// +// Note that any Aggregator may be attached to any instrument--this is +// the result of the OpenTelemetry API/SDK separation. It is possible +// to attach a Sum aggregator to a ValueRecorder instrument or a +// MinMaxSumCount aggregator to a Counter instrument. +type Aggregator interface { + // Aggregation returns an Aggregation interface to access the + // current state of this Aggregator. The caller is + // responsible for synchronization and must not call any the + // other methods in this interface concurrently while using + // the Aggregation. + Aggregation() aggregation.Aggregation + + // Update receives a new measured value and incorporates it + // into the aggregation. Update() calls may be called + // concurrently. + // + // Descriptor.NumberKind() should be consulted to determine + // whether the provided number is an int64 or float64. + // + // The Context argument comes from user-level code and could be + // inspected for a `correlation.Map` or `trace.SpanContext`. + Update(ctx context.Context, number number.Number, descriptor *Descriptor) error + + // SynchronizedMove is called during collection to finish one + // period of aggregation by atomically saving the + // currently-updating state into the argument Aggregator AND + // resetting the current value to the zero state. + // + // SynchronizedMove() is called concurrently with Update(). These + // two methods must be synchronized with respect to each + // other, for correctness. + // + // After saving a synchronized copy, the Aggregator can be converted + // into one or more of the interfaces in the `aggregation` sub-package, + // according to kind of Aggregator that was selected. + // + // This method will return an InconsistentAggregatorError if + // this Aggregator cannot be copied into the destination due + // to an incompatible type. + // + // This call has no Context argument because it is expected to + // perform only computation. + // + // When called with a nil `destination`, this Aggregator is reset + // and the current value is discarded. + SynchronizedMove(destination Aggregator, descriptor *Descriptor) error + + // Merge combines the checkpointed state from the argument + // Aggregator into this Aggregator. Merge is not synchronized + // with respect to Update or SynchronizedMove. + // + // The owner of an Aggregator being merged is responsible for + // synchronization of both Aggregator states. + Merge(aggregator Aggregator, descriptor *Descriptor) error +} + +// AggregatorSelector supports selecting the kind of Aggregator to +// use at runtime for a specific metric instrument. +type AggregatorSelector interface { + // AggregatorFor allocates a variable number of aggregators of + // a kind suitable for the requested export. This method + // initializes a `...*Aggregator`, to support making a single + // allocation. + // + // When the call returns without initializing the *Aggregator + // to a non-nil value, the metric instrument is explicitly + // disabled. + // + // This must return a consistent type to avoid confusion in + // later stages of the metrics export process, i.e., when + // Merging or Checkpointing aggregators for a specific + // instrument. + // + // Note: This is context-free because the aggregator should + // not relate to the incoming context. This call should not + // block. + AggregatorFor(descriptor *Descriptor, aggregator ...*Aggregator) +} diff --git a/sdk/export/metric/exportkind_test.go b/sdk/export/metric/exportkind_test.go index acaa12cd1dc..87ee959fe55 100644 --- a/sdk/export/metric/exportkind_test.go +++ b/sdk/export/metric/exportkind_test.go @@ -20,8 +20,8 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" "go.opentelemetry.io/otel/metric/number" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" ) func TestExportKindIncludes(t *testing.T) { diff --git a/sdk/export/metric/metric.go b/sdk/export/metric/metric.go index 525fe8f3a63..b1f01a6ec93 100644 --- a/sdk/export/metric/metric.go +++ b/sdk/export/metric/metric.go @@ -23,8 +23,7 @@ import ( "go.opentelemetry.io/otel/label" "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/number" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" + "go.opentelemetry.io/otel/metric/aggregation" "go.opentelemetry.io/otel/sdk/resource" ) @@ -63,7 +62,7 @@ type Processor interface { // Note that the SDK only calls AggregatorFor when new records // require an Aggregator. This does not provide a way to // disable metrics with active records. - AggregatorSelector + metric.AggregatorSelector // Process is called by the SDK once per internal record, // passing the export Accumulation (a Descriptor, the corresponding @@ -75,29 +74,6 @@ type Processor interface { Process(accum Accumulation) error } -// AggregatorSelector supports selecting the kind of Aggregator to -// use at runtime for a specific metric instrument. -type AggregatorSelector interface { - // AggregatorFor allocates a variable number of aggregators of - // a kind suitable for the requested export. This method - // initializes a `...*Aggregator`, to support making a single - // allocation. - // - // When the call returns without initializing the *Aggregator - // to a non-nil value, the metric instrument is explicitly - // disabled. - // - // This must return a consistent type to avoid confusion in - // later stages of the metrics export process, i.e., when - // Merging or Checkpointing aggregators for a specific - // instrument. - // - // Note: This is context-free because the aggregator should - // not relate to the incoming context. This call should not - // block. - AggregatorFor(descriptor *metric.Descriptor, aggregator ...*Aggregator) -} - // Checkpointer is the interface used by a Controller to coordinate // the Processor with Accumulator(s) and Exporter(s). The // StartCollection() and FinishCollection() methods start and finish a @@ -125,69 +101,6 @@ type Checkpointer interface { FinishCollection() error } -// Aggregator implements a specific aggregation behavior, e.g., a -// behavior to track a sequence of updates to an instrument. Sum-only -// instruments commonly use a simple Sum aggregator, but for the -// distribution instruments (ValueRecorder, ValueObserver) there are a -// number of possible aggregators with different cost and accuracy -// tradeoffs. -// -// Note that any Aggregator may be attached to any instrument--this is -// the result of the OpenTelemetry API/SDK separation. It is possible -// to attach a Sum aggregator to a ValueRecorder instrument or a -// MinMaxSumCount aggregator to a Counter instrument. -type Aggregator interface { - // Aggregation returns an Aggregation interface to access the - // current state of this Aggregator. The caller is - // responsible for synchronization and must not call any the - // other methods in this interface concurrently while using - // the Aggregation. - Aggregation() aggregation.Aggregation - - // Update receives a new measured value and incorporates it - // into the aggregation. Update() calls may be called - // concurrently. - // - // Descriptor.NumberKind() should be consulted to determine - // whether the provided number is an int64 or float64. - // - // The Context argument comes from user-level code and could be - // inspected for a `correlation.Map` or `trace.SpanContext`. - Update(ctx context.Context, number number.Number, descriptor *metric.Descriptor) error - - // SynchronizedMove is called during collection to finish one - // period of aggregation by atomically saving the - // currently-updating state into the argument Aggregator AND - // resetting the current value to the zero state. - // - // SynchronizedMove() is called concurrently with Update(). These - // two methods must be synchronized with respect to each - // other, for correctness. - // - // After saving a synchronized copy, the Aggregator can be converted - // into one or more of the interfaces in the `aggregation` sub-package, - // according to kind of Aggregator that was selected. - // - // This method will return an InconsistentAggregatorError if - // this Aggregator cannot be copied into the destination due - // to an incompatible type. - // - // This call has no Context argument because it is expected to - // perform only computation. - // - // When called with a nil `destination`, this Aggregator is reset - // and the current value is discarded. - SynchronizedMove(destination Aggregator, descriptor *metric.Descriptor) error - - // Merge combines the checkpointed state from the argument - // Aggregator into this Aggregator. Merge is not synchronized - // with respect to Update or SynchronizedMove. - // - // The owner of an Aggregator being merged is responsible for - // synchronization of both Aggregator states. - Merge(aggregator Aggregator, descriptor *metric.Descriptor) error -} - // Subtractor is an optional interface implemented by some // Aggregators. An Aggregator must support `Subtract()` in order to // be configured for a Precomputed-Sum instrument (SumObserver, @@ -195,7 +108,7 @@ type Aggregator interface { type Subtractor interface { // Subtract subtracts the `operand` from this Aggregator and // outputs the value in `result`. - Subtract(operand, result Aggregator, descriptor *metric.Descriptor) error + Subtract(operand, result metric.Aggregator, descriptor *metric.Descriptor) error } // Exporter handles presentation of the checkpoint of aggregate @@ -275,7 +188,7 @@ type Metadata struct { // and label set, as prepared by an Accumulator for the Processor. type Accumulation struct { Metadata - aggregator Aggregator + aggregator metric.Aggregator } // Record contains the exported data for a single metric instrument @@ -308,7 +221,7 @@ func (m Metadata) Resource() *resource.Resource { // Accumulations to send to Processors. The Descriptor, Labels, Resource, // and Aggregator represent aggregate metric events received over a single // collection period. -func NewAccumulation(descriptor *metric.Descriptor, labels *label.Set, resource *resource.Resource, aggregator Aggregator) Accumulation { +func NewAccumulation(descriptor *metric.Descriptor, labels *label.Set, resource *resource.Resource, aggregator metric.Aggregator) Accumulation { return Accumulation{ Metadata: Metadata{ descriptor: descriptor, @@ -321,7 +234,7 @@ func NewAccumulation(descriptor *metric.Descriptor, labels *label.Set, resource // Aggregator returns the checkpointed aggregator. It is safe to // access the checkpointed state without locking. -func (r Accumulation) Aggregator() Aggregator { +func (r Accumulation) Aggregator() metric.Aggregator { return r.aggregator } diff --git a/sdk/export/metric/metrictest/test.go b/sdk/export/metric/metrictest/test.go index 9905300d893..ca3a45d3d1a 100644 --- a/sdk/export/metric/metrictest/test.go +++ b/sdk/export/metric/metrictest/test.go @@ -23,9 +23,9 @@ import ( "go.opentelemetry.io/otel/label" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" "go.opentelemetry.io/otel/metric/number" export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/resource" ) @@ -46,7 +46,7 @@ type CheckpointSet struct { // NoopAggregator is useful for testing Exporters. type NoopAggregator struct{} -var _ export.Aggregator = (*NoopAggregator)(nil) +var _ metric.Aggregator = (*NoopAggregator)(nil) // Update implements export.Aggregator. func (NoopAggregator) Update(context.Context, number.Number, *metric.Descriptor) error { @@ -54,12 +54,12 @@ func (NoopAggregator) Update(context.Context, number.Number, *metric.Descriptor) } // SynchronizedMove implements export.Aggregator. -func (NoopAggregator) SynchronizedMove(export.Aggregator, *metric.Descriptor) error { +func (NoopAggregator) SynchronizedMove(metric.Aggregator, *metric.Descriptor) error { return nil } // Merge implements export.Aggregator. -func (NoopAggregator) Merge(export.Aggregator, *metric.Descriptor) error { +func (NoopAggregator) Merge(metric.Aggregator, *metric.Descriptor) error { return nil } @@ -92,7 +92,7 @@ func (p *CheckpointSet) Reset() { // // If there is an existing record with the same descriptor and labels, // the stored aggregator will be returned and should be merged. -func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg export.Aggregator, labels ...label.KeyValue) (agg export.Aggregator, added bool) { +func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg metric.Aggregator, labels ...label.KeyValue) (agg metric.Aggregator, added bool) { elabels := label.NewSet(labels...) key := mapkey{ @@ -100,7 +100,7 @@ func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg export.Aggregator, l distinct: elabels.Equivalent(), } if record, ok := p.records[key]; ok { - return record.Aggregation().(export.Aggregator), false + return record.Aggregation().(metric.Aggregator), false } rec := export.NewRecord(desc, &elabels, p.resource, newAgg.Aggregation(), time.Time{}, time.Time{}) @@ -121,7 +121,7 @@ func (p *CheckpointSet) ForEach(_ export.ExportKindSelector, f func(export.Recor } // Takes a slice of []some.Aggregator and returns a slice of []export.Aggregator -func Unslice2(sl interface{}) (one, two export.Aggregator) { +func Unslice2(sl interface{}) (one, two metric.Aggregator) { slv := reflect.ValueOf(sl) if slv.Type().Kind() != reflect.Slice { panic("Invalid Unslice2") @@ -129,7 +129,7 @@ func Unslice2(sl interface{}) (one, two export.Aggregator) { if slv.Len() != 2 { panic("Invalid Unslice2: length > 2") } - one = slv.Index(0).Addr().Interface().(export.Aggregator) - two = slv.Index(1).Addr().Interface().(export.Aggregator) + one = slv.Index(0).Addr().Interface().(metric.Aggregator) + two = slv.Index(1).Addr().Interface().(metric.Aggregator) return } diff --git a/sdk/metric/aggregator/aggregator.go b/sdk/metric/aggregator/aggregator.go index afda991e863..f0c06d29e97 100644 --- a/sdk/metric/aggregator/aggregator.go +++ b/sdk/metric/aggregator/aggregator.go @@ -19,15 +19,14 @@ import ( "math" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" "go.opentelemetry.io/otel/metric/number" - export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" ) // NewInconsistentAggregatorError formats an error describing an attempt to // Checkpoint or Merge different-type aggregators. The result can be unwrapped as // an ErrInconsistentType. -func NewInconsistentAggregatorError(a1, a2 export.Aggregator) error { +func NewInconsistentAggregatorError(a1, a2 metric.Aggregator) error { return fmt.Errorf("%w: %T and %T", aggregation.ErrInconsistentType, a1, a2) } diff --git a/sdk/metric/aggregator/aggregator_test.go b/sdk/metric/aggregator/aggregator_test.go index 674d08cd03c..d15f268b3b8 100644 --- a/sdk/metric/aggregator/aggregator_test.go +++ b/sdk/metric/aggregator/aggregator_test.go @@ -22,8 +22,8 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" "go.opentelemetry.io/otel/metric/number" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" diff --git a/sdk/metric/aggregator/aggregatortest/test.go b/sdk/metric/aggregator/aggregatortest/test.go index 268bea0d331..41125ec423a 100644 --- a/sdk/metric/aggregator/aggregatortest/test.go +++ b/sdk/metric/aggregator/aggregatortest/test.go @@ -27,9 +27,8 @@ import ( ottest "go.opentelemetry.io/otel/internal/internaltest" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" "go.opentelemetry.io/otel/metric/number" - export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/aggregator" ) @@ -43,7 +42,7 @@ type Profile struct { type NoopAggregator struct{} type NoopAggregation struct{} -var _ export.Aggregator = NoopAggregator{} +var _ metric.Aggregator = NoopAggregator{} var _ aggregation.Aggregation = NoopAggregation{} func newProfiles() []Profile { @@ -149,7 +148,7 @@ func (n *Numbers) Points() []number.Number { } // Performs the same range test the SDK does on behalf of the aggregator. -func CheckedUpdate(t *testing.T, agg export.Aggregator, number number.Number, descriptor *metric.Descriptor) { +func CheckedUpdate(t *testing.T, agg metric.Aggregator, number number.Number, descriptor *metric.Descriptor) { ctx := context.Background() // Note: Aggregator tests are written assuming that the SDK @@ -165,7 +164,7 @@ func CheckedUpdate(t *testing.T, agg export.Aggregator, number number.Number, de } } -func CheckedMerge(t *testing.T, aggInto, aggFrom export.Aggregator, descriptor *metric.Descriptor) { +func CheckedMerge(t *testing.T, aggInto, aggFrom metric.Aggregator, descriptor *metric.Descriptor) { if err := aggInto.Merge(aggFrom, descriptor); err != nil { t.Error("Unexpected Merge failure", err) } @@ -183,15 +182,15 @@ func (NoopAggregator) Update(context.Context, number.Number, *metric.Descriptor) return nil } -func (NoopAggregator) SynchronizedMove(export.Aggregator, *metric.Descriptor) error { +func (NoopAggregator) SynchronizedMove(metric.Aggregator, *metric.Descriptor) error { return nil } -func (NoopAggregator) Merge(export.Aggregator, *metric.Descriptor) error { +func (NoopAggregator) Merge(metric.Aggregator, *metric.Descriptor) error { return nil } -func SynchronizedMoveResetTest(t *testing.T, mkind metric.InstrumentKind, nf func(*metric.Descriptor) export.Aggregator) { +func SynchronizedMoveResetTest(t *testing.T, mkind metric.InstrumentKind, nf func(*metric.Descriptor) metric.Aggregator) { t.Run("reset on nil", func(t *testing.T) { // Ensures that SynchronizedMove(nil, descriptor) discards and // resets the aggregator. diff --git a/sdk/metric/aggregator/exact/exact.go b/sdk/metric/aggregator/exact/exact.go index c2c7adaf256..4302ebc6e8f 100644 --- a/sdk/metric/aggregator/exact/exact.go +++ b/sdk/metric/aggregator/exact/exact.go @@ -20,9 +20,8 @@ import ( "time" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" "go.opentelemetry.io/otel/metric/number" - export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/aggregator" ) @@ -35,7 +34,7 @@ type ( } ) -var _ export.Aggregator = &Aggregator{} +var _ metric.Aggregator = &Aggregator{} var _ aggregation.Points = &Aggregator{} var _ aggregation.Count = &Aggregator{} @@ -68,7 +67,7 @@ func (c *Aggregator) Points() ([]aggregation.Point, error) { // SynchronizedMove saves the current state to oa and resets the current state to // the empty set, taking a lock to prevent concurrent Update() calls. -func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descriptor) error { +func (c *Aggregator) SynchronizedMove(oa metric.Aggregator, desc *metric.Descriptor) error { o, _ := oa.(*Aggregator) if oa != nil && o == nil { @@ -102,7 +101,7 @@ func (c *Aggregator) Update(_ context.Context, number number.Number, desc *metri } // Merge combines two data sets into one. -func (c *Aggregator) Merge(oa export.Aggregator, desc *metric.Descriptor) error { +func (c *Aggregator) Merge(oa metric.Aggregator, desc *metric.Descriptor) error { o, _ := oa.(*Aggregator) if o == nil { return aggregator.NewInconsistentAggregatorError(c, oa) diff --git a/sdk/metric/aggregator/exact/exact_test.go b/sdk/metric/aggregator/exact/exact_test.go index 9a90d03ed68..1da7b117470 100644 --- a/sdk/metric/aggregator/exact/exact_test.go +++ b/sdk/metric/aggregator/exact/exact_test.go @@ -23,9 +23,8 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" "go.opentelemetry.io/otel/metric/number" - export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" ) @@ -293,7 +292,7 @@ func TestSynchronizedMoveReset(t *testing.T) { aggregatortest.SynchronizedMoveResetTest( t, metric.ValueRecorderInstrumentKind, - func(desc *metric.Descriptor) export.Aggregator { + func(desc *metric.Descriptor) metric.Aggregator { return &New(1)[0] }, ) diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go index ea3ecdbb5b2..a07e18b1c59 100644 --- a/sdk/metric/aggregator/histogram/histogram.go +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -20,9 +20,8 @@ import ( "sync" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" "go.opentelemetry.io/otel/metric/number" - export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/aggregator" ) @@ -97,7 +96,7 @@ var defaultInt64ExplicitBoundaries = func(bounds []float64) (asint []float64) { return }(defaultFloat64ExplicitBoundaries) -var _ export.Aggregator = &Aggregator{} +var _ metric.Aggregator = &Aggregator{} var _ aggregation.Sum = &Aggregator{} var _ aggregation.Count = &Aggregator{} var _ aggregation.Histogram = &Aggregator{} @@ -174,7 +173,7 @@ func (c *Aggregator) Histogram() (aggregation.Buckets, error) { // the empty set. Since no locks are taken, there is a chance that // the independent Sum, Count and Bucket Count are not consistent with each // other. -func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descriptor) error { +func (c *Aggregator) SynchronizedMove(oa metric.Aggregator, desc *metric.Descriptor) error { o, _ := oa.(*Aggregator) if oa != nil && o == nil { @@ -254,7 +253,7 @@ func (c *Aggregator) Update(_ context.Context, number number.Number, desc *metri } // Merge combines two histograms that have the same buckets into a single one. -func (c *Aggregator) Merge(oa export.Aggregator, desc *metric.Descriptor) error { +func (c *Aggregator) Merge(oa metric.Aggregator, desc *metric.Descriptor) error { o, _ := oa.(*Aggregator) if o == nil { return aggregator.NewInconsistentAggregatorError(c, oa) diff --git a/sdk/metric/aggregator/histogram/histogram_test.go b/sdk/metric/aggregator/histogram/histogram_test.go index ddc2195167e..2edef165137 100644 --- a/sdk/metric/aggregator/histogram/histogram_test.go +++ b/sdk/metric/aggregator/histogram/histogram_test.go @@ -25,7 +25,6 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/number" - export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" ) @@ -240,7 +239,7 @@ func TestSynchronizedMoveReset(t *testing.T) { aggregatortest.SynchronizedMoveResetTest( t, metric.ValueRecorderInstrumentKind, - func(desc *metric.Descriptor) export.Aggregator { + func(desc *metric.Descriptor) metric.Aggregator { return &histogram.New(1, desc, histogram.WithExplicitBoundaries(testBoundaries))[0] }, ) diff --git a/sdk/metric/aggregator/lastvalue/lastvalue.go b/sdk/metric/aggregator/lastvalue/lastvalue.go index 3cc5f7055cf..e43636bc712 100644 --- a/sdk/metric/aggregator/lastvalue/lastvalue.go +++ b/sdk/metric/aggregator/lastvalue/lastvalue.go @@ -21,9 +21,8 @@ import ( "unsafe" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" "go.opentelemetry.io/otel/metric/number" - export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/aggregator" ) @@ -51,7 +50,7 @@ type ( } ) -var _ export.Aggregator = &Aggregator{} +var _ metric.Aggregator = &Aggregator{} var _ aggregation.LastValue = &Aggregator{} // An unset lastValue has zero timestamp and zero value. @@ -92,7 +91,7 @@ func (g *Aggregator) LastValue() (number.Number, time.Time, error) { } // SynchronizedMove atomically saves the current value. -func (g *Aggregator) SynchronizedMove(oa export.Aggregator, _ *metric.Descriptor) error { +func (g *Aggregator) SynchronizedMove(oa metric.Aggregator, _ *metric.Descriptor) error { if oa == nil { atomic.StorePointer(&g.value, unsafe.Pointer(unsetLastValue)) return nil @@ -117,7 +116,7 @@ func (g *Aggregator) Update(_ context.Context, number number.Number, desc *metri // Merge combines state from two aggregators. The most-recently set // value is chosen. -func (g *Aggregator) Merge(oa export.Aggregator, desc *metric.Descriptor) error { +func (g *Aggregator) Merge(oa metric.Aggregator, desc *metric.Descriptor) error { o, _ := oa.(*Aggregator) if o == nil { return aggregator.NewInconsistentAggregatorError(g, oa) diff --git a/sdk/metric/aggregator/lastvalue/lastvalue_test.go b/sdk/metric/aggregator/lastvalue/lastvalue_test.go index 94baa69b71d..e8739a383db 100644 --- a/sdk/metric/aggregator/lastvalue/lastvalue_test.go +++ b/sdk/metric/aggregator/lastvalue/lastvalue_test.go @@ -26,15 +26,14 @@ import ( ottest "go.opentelemetry.io/otel/internal/internaltest" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" "go.opentelemetry.io/otel/metric/number" - export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" ) const count = 100 -var _ export.Aggregator = &Aggregator{} +var _ metric.Aggregator = &Aggregator{} // Ensure struct alignment prior to running tests. func TestMain(m *testing.M) { @@ -137,7 +136,7 @@ func TestSynchronizedMoveReset(t *testing.T) { aggregatortest.SynchronizedMoveResetTest( t, metric.ValueObserverInstrumentKind, - func(desc *metric.Descriptor) export.Aggregator { + func(desc *metric.Descriptor) metric.Aggregator { return &New(1)[0] }, ) diff --git a/sdk/metric/aggregator/minmaxsumcount/mmsc.go b/sdk/metric/aggregator/minmaxsumcount/mmsc.go index e21fd75ab73..9c886e9af1f 100644 --- a/sdk/metric/aggregator/minmaxsumcount/mmsc.go +++ b/sdk/metric/aggregator/minmaxsumcount/mmsc.go @@ -19,9 +19,8 @@ import ( "sync" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" "go.opentelemetry.io/otel/metric/number" - export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/aggregator" ) @@ -42,7 +41,7 @@ type ( } ) -var _ export.Aggregator = &Aggregator{} +var _ metric.Aggregator = &Aggregator{} var _ aggregation.MinMaxSumCount = &Aggregator{} // New returns a new aggregator for computing the min, max, sum, and @@ -103,7 +102,7 @@ func (c *Aggregator) Max() (number.Number, error) { // SynchronizedMove saves the current state into oa and resets the current state to // the empty set. -func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descriptor) error { +func (c *Aggregator) SynchronizedMove(oa metric.Aggregator, desc *metric.Descriptor) error { o, _ := oa.(*Aggregator) if oa != nil && o == nil { @@ -146,7 +145,7 @@ func (c *Aggregator) Update(_ context.Context, number number.Number, desc *metri } // Merge combines two data sets into one. -func (c *Aggregator) Merge(oa export.Aggregator, desc *metric.Descriptor) error { +func (c *Aggregator) Merge(oa metric.Aggregator, desc *metric.Descriptor) error { o, _ := oa.(*Aggregator) if o == nil { return aggregator.NewInconsistentAggregatorError(c, oa) diff --git a/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go b/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go index 085c7a546d2..cb8448b6c3b 100644 --- a/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go +++ b/sdk/metric/aggregator/minmaxsumcount/mmsc_test.go @@ -23,9 +23,8 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" "go.opentelemetry.io/otel/metric/number" - export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" ) @@ -241,7 +240,7 @@ func TestSynchronizedMoveReset(t *testing.T) { aggregatortest.SynchronizedMoveResetTest( t, metric.ValueRecorderInstrumentKind, - func(desc *metric.Descriptor) export.Aggregator { + func(desc *metric.Descriptor) metric.Aggregator { return &New(1, desc)[0] }, ) diff --git a/sdk/metric/aggregator/sum/sum.go b/sdk/metric/aggregator/sum/sum.go index fc96ddb4cba..1264cabef8f 100644 --- a/sdk/metric/aggregator/sum/sum.go +++ b/sdk/metric/aggregator/sum/sum.go @@ -18,9 +18,9 @@ import ( "context" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" "go.opentelemetry.io/otel/metric/number" export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/aggregator" ) @@ -31,7 +31,7 @@ type Aggregator struct { value number.Number } -var _ export.Aggregator = &Aggregator{} +var _ metric.Aggregator = &Aggregator{} var _ export.Subtractor = &Aggregator{} var _ aggregation.Sum = &Aggregator{} @@ -60,7 +60,7 @@ func (c *Aggregator) Sum() (number.Number, error) { // SynchronizedMove atomically saves the current value into oa and resets the // current sum to zero. -func (c *Aggregator) SynchronizedMove(oa export.Aggregator, _ *metric.Descriptor) error { +func (c *Aggregator) SynchronizedMove(oa metric.Aggregator, _ *metric.Descriptor) error { if oa == nil { c.value.SetRawAtomic(0) return nil @@ -80,7 +80,7 @@ func (c *Aggregator) Update(_ context.Context, num number.Number, desc *metric.D } // Merge combines two counters by adding their sums. -func (c *Aggregator) Merge(oa export.Aggregator, desc *metric.Descriptor) error { +func (c *Aggregator) Merge(oa metric.Aggregator, desc *metric.Descriptor) error { o, _ := oa.(*Aggregator) if o == nil { return aggregator.NewInconsistentAggregatorError(c, oa) @@ -89,7 +89,7 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *metric.Descriptor) error return nil } -func (c *Aggregator) Subtract(opAgg, resAgg export.Aggregator, descriptor *metric.Descriptor) error { +func (c *Aggregator) Subtract(opAgg, resAgg metric.Aggregator, descriptor *metric.Descriptor) error { op, _ := opAgg.(*Aggregator) if op == nil { return aggregator.NewInconsistentAggregatorError(c, opAgg) diff --git a/sdk/metric/aggregator/sum/sum_test.go b/sdk/metric/aggregator/sum/sum_test.go index e63d2f9d690..10beaba63e6 100644 --- a/sdk/metric/aggregator/sum/sum_test.go +++ b/sdk/metric/aggregator/sum/sum_test.go @@ -24,7 +24,6 @@ import ( ottest "go.opentelemetry.io/otel/internal/internaltest" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/number" - export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest" ) @@ -147,7 +146,7 @@ func TestSynchronizedMoveReset(t *testing.T) { aggregatortest.SynchronizedMoveResetTest( t, metric.SumObserverInstrumentKind, - func(desc *metric.Descriptor) export.Aggregator { + func(desc *metric.Descriptor) metric.Aggregator { return &New(1)[0] }, ) diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index 576d5c546a9..db5f6902dc7 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -32,7 +32,7 @@ type benchFixture struct { meter metric.Meter accumulator *sdk.Accumulator B *testing.B - export.AggregatorSelector + metric.AggregatorSelector } func newFixture(b *testing.B) *benchFixture { diff --git a/sdk/metric/controller/basic/controller_test.go b/sdk/metric/controller/basic/controller_test.go index a1b3a7640b5..9fbaabf6e80 100644 --- a/sdk/metric/controller/basic/controller_test.go +++ b/sdk/metric/controller/basic/controller_test.go @@ -24,8 +24,8 @@ import ( "go.opentelemetry.io/otel/label" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" "go.opentelemetry.io/otel/sdk/metric/controller/controllertest" processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" diff --git a/sdk/metric/controller/basic/push_test.go b/sdk/metric/controller/basic/push_test.go index 4add5f3f523..b3bf033c4ed 100644 --- a/sdk/metric/controller/basic/push_test.go +++ b/sdk/metric/controller/basic/push_test.go @@ -28,8 +28,8 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/label" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" "go.opentelemetry.io/otel/sdk/metric/controller/controllertest" processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index 3cc6ef22596..0b62bd42b0d 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -26,9 +26,9 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/label" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" "go.opentelemetry.io/otel/metric/number" export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" metricsdk "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/processor/processortest" "go.opentelemetry.io/otel/sdk/resource" @@ -80,11 +80,11 @@ type correctnessProcessor struct { } type testSelector struct { - selector export.AggregatorSelector + selector metric.AggregatorSelector newAggCount int } -func (ts *testSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*export.Aggregator) { +func (ts *testSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*metric.Aggregator) { ts.newAggCount += len(aggPtrs) processortest.AggregatorSelector().AggregatorFor(desc, aggPtrs...) } diff --git a/sdk/metric/processor/basic/basic.go b/sdk/metric/processor/basic/basic.go index 2e2a7c5df41..ce7359049ca 100644 --- a/sdk/metric/processor/basic/basic.go +++ b/sdk/metric/processor/basic/basic.go @@ -22,15 +22,15 @@ import ( "go.opentelemetry.io/otel/label" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/resource" ) type ( Processor struct { export.ExportKindSelector - export.AggregatorSelector + metric.AggregatorSelector state } @@ -79,17 +79,17 @@ type ( // (if !currentOwned) or it refers to an Aggregator // owned by the processor used to accumulate multiple // values in a single collection round. - current export.Aggregator + current metric.Aggregator // delta, if non-nil, refers to an Aggregator owned by // the processor used to compute deltas between // precomputed sums. - delta export.Aggregator + delta metric.Aggregator // cumulative, if non-nil, refers to an Aggregator owned // by the processor used to store the last cumulative // value. - cumulative export.Aggregator + cumulative metric.Aggregator } state struct { @@ -127,7 +127,7 @@ var ErrInvalidExportKind = fmt.Errorf("invalid export kind") // is consulted to determine the kind(s) of exporter that will consume // data, so that this Processor can prepare to compute Delta or // Cumulative Aggregations as needed. -func New(aselector export.AggregatorSelector, eselector export.ExportKindSelector, opts ...Option) *Processor { +func New(aselector metric.AggregatorSelector, eselector export.ExportKindSelector, opts ...Option) *Processor { now := time.Now() p := &Processor{ AggregatorSelector: aselector, diff --git a/sdk/metric/processor/basic/basic_test.go b/sdk/metric/processor/basic/basic_test.go index 11b59717ea5..5b578eb7604 100644 --- a/sdk/metric/processor/basic/basic_test.go +++ b/sdk/metric/processor/basic/basic_test.go @@ -26,9 +26,9 @@ import ( "go.opentelemetry.io/otel/label" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" "go.opentelemetry.io/otel/metric/number" export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/export/metric/metrictest" sdk "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/processor/basic" @@ -103,9 +103,9 @@ func asNumber(nkind number.Kind, value int64) number.Number { return number.NewFloat64Number(float64(value)) } -func updateFor(t *testing.T, desc *metric.Descriptor, selector export.AggregatorSelector, res *resource.Resource, value int64, labs ...label.KeyValue) export.Accumulation { +func updateFor(t *testing.T, desc *metric.Descriptor, selector metric.AggregatorSelector, res *resource.Resource, value int64, labs ...label.KeyValue) export.Accumulation { ls := label.NewSet(labs...) - var agg export.Aggregator + var agg metric.Aggregator selector.AggregatorFor(desc, &agg) require.NoError(t, agg.Update(context.Background(), asNumber(desc.NumberKind(), value), desc)) @@ -155,7 +155,7 @@ func testProcessor( err := processor.FinishCollection() if err == aggregation.ErrNoSubtraction { - var subr export.Aggregator + var subr metric.Aggregator selector.AggregatorFor(&desc1, &subr) _, canSub := subr.(export.Subtractor) diff --git a/sdk/metric/processor/processortest/test.go b/sdk/metric/processor/processortest/test.go index 9b767291b58..89a913bc1bd 100644 --- a/sdk/metric/processor/processortest/test.go +++ b/sdk/metric/processor/processortest/test.go @@ -23,9 +23,9 @@ import ( "go.opentelemetry.io/otel/label" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" "go.opentelemetry.io/otel/metric/number" export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/aggregator/exact" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" @@ -49,7 +49,7 @@ type ( mapValue struct { labels *label.Set resource *resource.Resource - aggregator export.Aggregator + aggregator metric.Aggregator } // Output implements export.CheckpointSet. @@ -74,7 +74,7 @@ type ( // Processor is a testing implementation of export.Processor that // assembles its results as a map[string]float64. Processor struct { - export.AggregatorSelector + metric.AggregatorSelector output *Output } @@ -101,7 +101,7 @@ type ( // // Where in the example A=1,B=2 is the encoded labels and R=V is the // encoded resource value. -func NewProcessor(selector export.AggregatorSelector, encoder label.Encoder) *Processor { +func NewProcessor(selector metric.AggregatorSelector, encoder label.Encoder) *Processor { return &Processor{ AggregatorSelector: selector, output: NewOutput(encoder), @@ -156,12 +156,12 @@ func (c *testCheckpointer) CheckpointSet() export.CheckpointSet { // AggregatorSelector returns a policy that is consistent with the // test descriptors above. I.e., it returns sum.New() for counter // instruments and lastvalue.New() for lastValue instruments. -func AggregatorSelector() export.AggregatorSelector { +func AggregatorSelector() metric.AggregatorSelector { return testAggregatorSelector{} } // AggregatorFor implements export.AggregatorSelector. -func (testAggregatorSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*export.Aggregator) { +func (testAggregatorSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*metric.Aggregator) { switch { case strings.HasSuffix(desc.Name(), ".disabled"): @@ -237,7 +237,7 @@ func (o *Output) AddRecord(rec export.Record) error { resource: rec.Resource().Equivalent(), } if _, ok := o.m[key]; !ok { - var agg export.Aggregator + var agg metric.Aggregator testAggregatorSelector{}.AggregatorFor(rec.Descriptor(), &agg) o.m[key] = mapValue{ aggregator: agg, @@ -245,7 +245,7 @@ func (o *Output) AddRecord(rec export.Record) error { resource: rec.Resource(), } } - return o.m[key].aggregator.Merge(rec.Aggregation().(export.Aggregator), rec.Descriptor()) + return o.m[key].aggregator.Merge(rec.Aggregation().(metric.Aggregator), rec.Descriptor()) } // Map returns the calculated values for test validation from a set of diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index 14b381c3384..9ae8d77735a 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -118,8 +118,8 @@ type ( // current implements the actual RecordOne() API, // depending on the type of aggregation. If nil, the // metric was disabled by the exporter. - current export.Aggregator - checkpoint export.Aggregator + current metric.Aggregator + checkpoint metric.Aggregator } instrument struct { @@ -137,7 +137,7 @@ type ( labeledRecorder struct { observedEpoch int64 labels *label.Set - observed export.Aggregator + observed metric.Aggregator } ) @@ -179,7 +179,7 @@ func (a *asyncInstrument) observe(num number.Number, labels *label.Set) { } } -func (a *asyncInstrument) getRecorder(labels *label.Set) export.Aggregator { +func (a *asyncInstrument) getRecorder(labels *label.Set) metric.Aggregator { lrec, ok := a.recorders[labels.Equivalent()] if ok { // Note: SynchronizedMove(nil) can't return an error @@ -188,7 +188,7 @@ func (a *asyncInstrument) getRecorder(labels *label.Set) export.Aggregator { a.recorders[labels.Equivalent()] = lrec return lrec.observed } - var rec export.Aggregator + var rec metric.Aggregator a.meter.processor.AggregatorFor(&a.descriptor, &rec) if a.recorders == nil { a.recorders = make(map[label.Distinct]*labeledRecorder) diff --git a/sdk/metric/selector/simple/simple.go b/sdk/metric/selector/simple/simple.go index bb5760994ac..6ffef0a9187 100644 --- a/sdk/metric/selector/simple/simple.go +++ b/sdk/metric/selector/simple/simple.go @@ -16,7 +16,6 @@ package simple // import "go.opentelemetry.io/otel/sdk/metric/selector/simple" import ( "go.opentelemetry.io/otel/metric" - export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/metric/aggregator/exact" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" @@ -33,9 +32,9 @@ type ( ) var ( - _ export.AggregatorSelector = selectorInexpensive{} - _ export.AggregatorSelector = selectorExact{} - _ export.AggregatorSelector = selectorHistogram{} + _ metric.AggregatorSelector = selectorInexpensive{} + _ metric.AggregatorSelector = selectorExact{} + _ metric.AggregatorSelector = selectorHistogram{} ) // NewWithInexpensiveDistribution returns a simple aggregator selector @@ -43,7 +42,7 @@ var ( // instruments. This selector is faster and uses less memory than the // others in this package because minmaxsumcount aggregators maintain // the least information about the distribution among these choices. -func NewWithInexpensiveDistribution() export.AggregatorSelector { +func NewWithInexpensiveDistribution() metric.AggregatorSelector { return selectorInexpensive{} } @@ -52,32 +51,32 @@ func NewWithInexpensiveDistribution() export.AggregatorSelector { // selector uses more memory than the others in this package because // exact aggregators maintain the most information about the // distribution among these choices. -func NewWithExactDistribution() export.AggregatorSelector { +func NewWithExactDistribution() metric.AggregatorSelector { return selectorExact{} } // NewWithHistogramDistribution returns a simple aggregator selector // that uses histogram aggregators for `ValueRecorder` instruments. // This selector is a good default choice for most metric exporters. -func NewWithHistogramDistribution(options ...histogram.Option) export.AggregatorSelector { +func NewWithHistogramDistribution(options ...histogram.Option) metric.AggregatorSelector { return selectorHistogram{options: options} } -func sumAggs(aggPtrs []*export.Aggregator) { +func sumAggs(aggPtrs []*metric.Aggregator) { aggs := sum.New(len(aggPtrs)) for i := range aggPtrs { *aggPtrs[i] = &aggs[i] } } -func lastValueAggs(aggPtrs []*export.Aggregator) { +func lastValueAggs(aggPtrs []*metric.Aggregator) { aggs := lastvalue.New(len(aggPtrs)) for i := range aggPtrs { *aggPtrs[i] = &aggs[i] } } -func (selectorInexpensive) AggregatorFor(descriptor *metric.Descriptor, aggPtrs ...*export.Aggregator) { +func (selectorInexpensive) AggregatorFor(descriptor *metric.Descriptor, aggPtrs ...*metric.Aggregator) { switch descriptor.InstrumentKind() { case metric.ValueObserverInstrumentKind: lastValueAggs(aggPtrs) @@ -91,7 +90,7 @@ func (selectorInexpensive) AggregatorFor(descriptor *metric.Descriptor, aggPtrs } } -func (selectorExact) AggregatorFor(descriptor *metric.Descriptor, aggPtrs ...*export.Aggregator) { +func (selectorExact) AggregatorFor(descriptor *metric.Descriptor, aggPtrs ...*metric.Aggregator) { switch descriptor.InstrumentKind() { case metric.ValueObserverInstrumentKind: lastValueAggs(aggPtrs) @@ -105,7 +104,7 @@ func (selectorExact) AggregatorFor(descriptor *metric.Descriptor, aggPtrs ...*ex } } -func (s selectorHistogram) AggregatorFor(descriptor *metric.Descriptor, aggPtrs ...*export.Aggregator) { +func (s selectorHistogram) AggregatorFor(descriptor *metric.Descriptor, aggPtrs ...*metric.Aggregator) { switch descriptor.InstrumentKind() { case metric.ValueObserverInstrumentKind: lastValueAggs(aggPtrs) diff --git a/sdk/metric/selector/simple/simple_test.go b/sdk/metric/selector/simple/simple_test.go index cd7dab46357..607722fd2ab 100644 --- a/sdk/metric/selector/simple/simple_test.go +++ b/sdk/metric/selector/simple/simple_test.go @@ -21,7 +21,6 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/number" - export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/metric/aggregator/exact" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" @@ -39,13 +38,13 @@ var ( testValueObserverDesc = metric.NewDescriptor("valueobserver", metric.ValueObserverInstrumentKind, number.Int64Kind) ) -func oneAgg(sel export.AggregatorSelector, desc *metric.Descriptor) export.Aggregator { - var agg export.Aggregator +func oneAgg(sel metric.AggregatorSelector, desc *metric.Descriptor) metric.Aggregator { + var agg metric.Aggregator sel.AggregatorFor(desc, &agg) return agg } -func testFixedSelectors(t *testing.T, sel export.AggregatorSelector) { +func testFixedSelectors(t *testing.T, sel metric.AggregatorSelector) { require.IsType(t, (*lastvalue.Aggregator)(nil), oneAgg(sel, &testValueObserverDesc)) require.IsType(t, (*sum.Aggregator)(nil), oneAgg(sel, &testCounterDesc)) require.IsType(t, (*sum.Aggregator)(nil), oneAgg(sel, &testUpDownCounterDesc)) diff --git a/sdk/metric/stress_test.go b/sdk/metric/stress_test.go index 4878435bde3..94f330807b4 100644 --- a/sdk/metric/stress_test.go +++ b/sdk/metric/stress_test.go @@ -33,9 +33,9 @@ import ( "go.opentelemetry.io/otel/label" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/aggregation" "go.opentelemetry.io/otel/metric/number" export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/processor/processortest" ) @@ -58,7 +58,7 @@ type ( impl testImpl T *testing.T - export.AggregatorSelector + metric.AggregatorSelector lock sync.Mutex lused map[string]bool From fec26e5714665bf16244eb624b76d5e55deedc9a Mon Sep 17 00:00:00 2001 From: beanliu Date: Mon, 18 Jan 2021 10:32:24 +1100 Subject: [PATCH 2/9] add View type and meter.RegisterView method to public Metric API --- internal/global/meter.go | 5 ++ metric/metric.go | 12 +++++ metric/metric_sdkapi.go | 3 ++ metric/metric_test.go | 15 ++++++ metric/metric_view.go | 85 ++++++++++++++++++++++++++++++++ metric/registry/registry.go | 5 ++ metric/viewlabelconfig_string.go | 25 ++++++++++ oteltest/meter.go | 4 ++ sdk/metric/sdk.go | 4 ++ 9 files changed, 158 insertions(+) create mode 100644 metric/metric_view.go create mode 100644 metric/viewlabelconfig_string.go diff --git a/internal/global/meter.go b/internal/global/meter.go index 8b288df780f..ca2bde6e0cd 100644 --- a/internal/global/meter.go +++ b/internal/global/meter.go @@ -185,6 +185,11 @@ func (m *meterImpl) setDelegate(name, version string, provider metric.MeterProvi m.asyncInsts = nil } +// RegisterView is a no-op here. +func (m *meterImpl) RegisterView(metric.View) { + // Should we panic here? +} + func (m *meterImpl) NewSyncInstrument(desc metric.Descriptor) (metric.SyncImpl, error) { m.lock.Lock() defer m.lock.Unlock() diff --git a/metric/metric.go b/metric/metric.go index 0b988abba88..979d8c36bed 100644 --- a/metric/metric.go +++ b/metric/metric.go @@ -41,6 +41,13 @@ type Meter struct { name, version string } +// RegisterView registers a view for a synchronous instrument. +func (m Meter) RegisterView(v View) { + if m.impl != nil { + m.impl.RegisterView(v) + } +} + // RecordBatch atomically records a batch of measurements. func (m Meter) RecordBatch(ctx context.Context, ls []label.KeyValue, ms ...Measurement) { if m.impl == nil { @@ -328,6 +335,11 @@ func Must(meter Meter) MeterMust { return MeterMust{meter: meter} } +// RegisterView calls `Meter.RegisterView` +func (mm MeterMust) RegisterView(v View) { + mm.meter.RegisterView(v) +} + // NewInt64Counter calls `Meter.NewInt64Counter` and returns the // instrument, panicking if it encounters an error. func (mm MeterMust) NewInt64Counter(name string, cos ...InstrumentOption) Int64Counter { diff --git a/metric/metric_sdkapi.go b/metric/metric_sdkapi.go index 77d6adb173b..c87eb415723 100644 --- a/metric/metric_sdkapi.go +++ b/metric/metric_sdkapi.go @@ -24,6 +24,9 @@ import ( // MeterImpl is the interface an SDK must implement to supply a Meter // implementation. type MeterImpl interface { + // RegisterView registers a view for a synchronous instrument. + RegisterView(v View) + // RecordBatch atomically records a batch of measurements. RecordBatch(ctx context.Context, labels []label.KeyValue, measurement ...Measurement) diff --git a/metric/metric_test.go b/metric/metric_test.go index 3250f0e1588..774234a4b6e 100644 --- a/metric/metric_test.go +++ b/metric/metric_test.go @@ -206,6 +206,18 @@ func TestOptions(t *testing.T) { } } +func TestViewPanic(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Errorf("The code did not panic") + } + }() + + _, meter := oteltest.NewMeter() + c := Must(meter).NewFloat64Counter("test.counter.float") + meter.RegisterView(metric.NewView(c.SyncImpl(), metric.DropAll, nil, nil)) +} + func TestCounter(t *testing.T) { // N.B. the API does not check for negative // values, that's the SDK's responsibility. @@ -435,6 +447,9 @@ type testWrappedMeter struct { var _ metric.MeterImpl = testWrappedMeter{} +func (testWrappedMeter) RegisterView(metric.View) { +} + func (testWrappedMeter) RecordBatch(context.Context, []label.KeyValue, ...metric.Measurement) { } diff --git a/metric/metric_view.go b/metric/metric_view.go new file mode 100644 index 00000000000..7c8e2923817 --- /dev/null +++ b/metric/metric_view.go @@ -0,0 +1,85 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric // import "go.opentelemetry.io/otel/metric" + +import ( + "go.opentelemetry.io/otel/label" +) + +//go:generate stringer -type=ViewLabelConfig + +// ViewLabelConfig is the option to control how to track the set of labels +// for a given view. +// See available values for more details. +type ViewLabelConfig uint8 + +const ( + // Drop all label keys, and aggregate all measurements from the metric + // instrument together regardless of their label sets. + // + // Note: the DropAll is the same as LabelKeys with empty keys. + DropAll ViewLabelConfig = iota + + // Record all label keys. This is the default option without explicitly + // registering a view. + Ungroup + + // Specify a set of label keys to track at view creation time, and drop + // other keys from the label sets of recorded measurements before + // aggregating. + // + // Note: some label values may be undefined if the measurements don't + // provide all label values. + LabelKeys +) + +// View supports configuring non-default aggregation behaviors on the level +// of an individual synchronous instrument. +type View struct { + inst SyncImpl + labelConfig ViewLabelConfig + labelKeys []label.Key + aggregatorSelector AggregatorSelector +} + +// SyncImpl returns the associated synchronous instrument interface for a given view. +func (v View) SyncImpl() SyncImpl { + return v.inst +} + +// LabelConfig returns the label tracking option for a given view. +func (v View) LabelConfig() ViewLabelConfig { + return v.labelConfig +} + +// LabelKeys returns the pre-configured label keys for a given view. +func (v View) LabelKeys() []label.Key { + return v.labelKeys +} + +// AggregatorFactory returns the AggregatorFactory instance for a given view. +func (v View) AggregatorFactory() AggregatorSelector { + return v.aggregatorSelector +} + +// NewView returns a view object for the given parameters. +func NewView(inst SyncImpl, labelConfig ViewLabelConfig, labelKeys []label.Key, selector AggregatorSelector) View { + return View{ + inst: inst, + labelConfig: labelConfig, + labelKeys: labelKeys, + aggregatorSelector: selector, + } +} diff --git a/metric/registry/registry.go b/metric/registry/registry.go index f1d9819c319..830c6ac86bb 100644 --- a/metric/registry/registry.go +++ b/metric/registry/registry.go @@ -74,6 +74,11 @@ func NewUniqueInstrumentMeterImpl(impl metric.MeterImpl) metric.MeterImpl { } } +// RegisterView implements metric.MeterImpl. +func (u *uniqueInstrumentMeterImpl) RegisterView(v metric.View) { + u.impl.RegisterView(v) +} + // RecordBatch implements metric.MeterImpl. func (u *uniqueInstrumentMeterImpl) RecordBatch(ctx context.Context, labels []label.KeyValue, ms ...metric.Measurement) { u.impl.RecordBatch(ctx, labels, ms...) diff --git a/metric/viewlabelconfig_string.go b/metric/viewlabelconfig_string.go new file mode 100644 index 00000000000..660cc2a91cf --- /dev/null +++ b/metric/viewlabelconfig_string.go @@ -0,0 +1,25 @@ +// Code generated by "stringer -type=ViewLabelConfig"; DO NOT EDIT. + +package metric + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[DropAll-0] + _ = x[Ungroup-1] + _ = x[LabelKeys-2] +} + +const _ViewLabelConfig_name = "DropAllUngroupLabelKeys" + +var _ViewLabelConfig_index = [...]uint8{0, 7, 14, 23} + +func (i ViewLabelConfig) String() string { + if i >= ViewLabelConfig(len(_ViewLabelConfig_index)-1) { + return "ViewLabelConfig(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _ViewLabelConfig_name[_ViewLabelConfig_index[i]:_ViewLabelConfig_index[i+1]] +} diff --git a/oteltest/meter.go b/oteltest/meter.go index d7899ce71a8..ede9a7da9a2 100644 --- a/oteltest/meter.go +++ b/oteltest/meter.go @@ -127,6 +127,10 @@ func NewMeter() (*MeterImpl, metric.Meter) { return impl, p.Meter("mock") } +func (m *MeterImpl) RegisterView(v metric.View) { + panic("me") +} + func (m *MeterImpl) NewSyncInstrument(descriptor metric.Descriptor) (metric.SyncImpl, error) { m.lock.Lock() defer m.lock.Unlock() diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index 9ae8d77735a..82e5050ba9e 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -309,6 +309,10 @@ func NewAccumulator(processor export.Processor, resource *resource.Resource) *Ac } } +func (m *Accumulator) RegisterView(v metric.View) { + panic("me") +} + // NewSyncInstrument implements metric.MetricImpl. func (m *Accumulator) NewSyncInstrument(descriptor metric.Descriptor) (metric.SyncImpl, error) { return &syncInstrument{ From a3dfcd3812ed3019c2650d2c493d249558f9f191 Mon Sep 17 00:00:00 2001 From: beanliu Date: Mon, 18 Jan 2021 10:32:55 +1100 Subject: [PATCH 3/9] support labels with values unset for View API --- label/key.go | 12 ++++++++++++ label/kv.go | 5 +++++ label/kv_test.go | 8 ++++++++ label/value.go | 7 +++++++ label/value_test.go | 6 ++++++ 5 files changed, 38 insertions(+) diff --git a/label/key.go b/label/key.go index 732fa2523a7..0a94093f2a9 100644 --- a/label/key.go +++ b/label/key.go @@ -18,6 +18,18 @@ package label // import "go.opentelemetry.io/otel/label" // allowed character set in the key depends on the use of the key. type Key string +// Empty creates a KeyValue instance with value unset. +// +// Consider using a convenience function provided by the +// api/key package - +// key.Int64(name, value). +func (k Key) Empty() KeyValue { + return KeyValue{ + Key: k, + Value: EmptyValue(), + } +} + // Bool creates a KeyValue instance with a BOOL Value. // // If creating both key and a bool value at the same time, then diff --git a/label/kv.go b/label/kv.go index 0520f4f41a8..43cb85989bf 100644 --- a/label/kv.go +++ b/label/kv.go @@ -26,6 +26,11 @@ type KeyValue struct { Value Value } +// Empty creates an empty key-value pair with a passed name. +func Empty(k string) KeyValue { + return Key(k).Empty() +} + // Bool creates a new key-value pair with a passed name and a bool // value. func Bool(k string, v bool) KeyValue { diff --git a/label/kv_test.go b/label/kv_test.go index 068d2932381..f0aa3617989 100644 --- a/label/kv_test.go +++ b/label/kv_test.go @@ -29,6 +29,14 @@ func TestKeyValueConstructors(t *testing.T) { actual label.KeyValue expected label.KeyValue }{ + { + name: "Empty", + actual: label.Empty("k1"), + expected: label.KeyValue{ + Key: "k1", + Value: label.EmptyValue(), + }, + }, { name: "Bool", actual: label.Bool("k1", true), diff --git a/label/value.go b/label/value.go index 6cc0bee80c9..571714a53b5 100644 --- a/label/value.go +++ b/label/value.go @@ -64,6 +64,13 @@ const ( ARRAY ) +// EmptyValue creates a Value with no value set. +func EmptyValue() Value { + return Value{ + vtype: INVALID, + } +} + // BoolValue creates a BOOL Value. func BoolValue(v bool) Value { return Value{ diff --git a/label/value_test.go b/label/value_test.go index 7842fceef08..96a52df36ab 100644 --- a/label/value_test.go +++ b/label/value_test.go @@ -33,6 +33,12 @@ func TestValue(t *testing.T) { wantType label.Type wantValue interface{} }{ + { + name: "Key.Empty() correctly returns key's internal empty value", + value: k.Empty().Value, + wantType: label.INVALID, + wantValue: nil, + }, { name: "Key.Bool() correctly returns keys's internal bool value", value: k.Bool(true).Value, From 6eea9e94960b892064722f423d38215cf6de1373 Mon Sep 17 00:00:00 2001 From: beanliu Date: Mon, 18 Jan 2021 10:33:16 +1100 Subject: [PATCH 4/9] add view selector that takes precedence over the default AggregatorSelector in metrics processing/exporting --- sdk/export/metric/metric.go | 19 +++++-- sdk/metric/processor/basic/basic.go | 20 +++++-- sdk/metric/processor/basic/basic_test.go | 67 ++++++++++++++---------- sdk/metric/processor/reducer/reducer.go | 1 + sdk/metric/sdk.go | 4 +- 5 files changed, 74 insertions(+), 37 deletions(-) diff --git a/sdk/export/metric/metric.go b/sdk/export/metric/metric.go index b1f01a6ec93..3b762a04322 100644 --- a/sdk/export/metric/metric.go +++ b/sdk/export/metric/metric.go @@ -188,7 +188,8 @@ type Metadata struct { // and label set, as prepared by an Accumulator for the Processor. type Accumulation struct { Metadata - aggregator metric.Aggregator + aggregator metric.Aggregator + viewSelector metric.AggregatorSelector } // Record contains the exported data for a single metric instrument @@ -219,16 +220,17 @@ func (m Metadata) Resource() *resource.Resource { // NewAccumulation allows Accumulator implementations to construct new // Accumulations to send to Processors. The Descriptor, Labels, Resource, -// and Aggregator represent aggregate metric events received over a single -// collection period. -func NewAccumulation(descriptor *metric.Descriptor, labels *label.Set, resource *resource.Resource, aggregator metric.Aggregator) Accumulation { +// Aggregator and AggregatorSelector represent aggregate metric events +// received over a single collection period. +func NewAccumulation(descriptor *metric.Descriptor, labels *label.Set, resource *resource.Resource, aggregator metric.Aggregator, viewSelector metric.AggregatorSelector) Accumulation { return Accumulation{ Metadata: Metadata{ descriptor: descriptor, labels: labels, resource: resource, }, - aggregator: aggregator, + aggregator: aggregator, + viewSelector: viewSelector, } } @@ -238,6 +240,13 @@ func (r Accumulation) Aggregator() metric.Aggregator { return r.aggregator } +// ViewAggregatorSelector returns the aggregator selector object for an +// instrument associated with a view. This could be nil indicating +// the default AggregaotSelector should be used. +func (r Accumulation) ViewAggregatorSelector() metric.AggregatorSelector { + return r.viewSelector +} + // NewRecord allows Processor implementations to construct export // records. The Descriptor, Labels, and Aggregator represent // aggregate metric events received over a single collection period. diff --git a/sdk/metric/processor/basic/basic.go b/sdk/metric/processor/basic/basic.go index ce7359049ca..0d45ed8e3ac 100644 --- a/sdk/metric/processor/basic/basic.go +++ b/sdk/metric/processor/basic/basic.go @@ -144,6 +144,20 @@ func New(aselector metric.AggregatorSelector, eselector export.ExportKindSelecto return p } +// allocateAggregator allocates a variable number of aggregators of a kind +// suitable for the `descriptor`. +// +// If `viewSelector` is not `nil`, it would be used to initialize the +// aggregators. Otherwise, the AggregatorSelector associated with the +// `Processor` would be . +func (b *Processor) allocAggregator(descriptor *metric.Descriptor, viewSelector metric.AggregatorSelector, aggregator ...*metric.Aggregator) { + if viewSelector != nil { + viewSelector.AggregatorFor(descriptor, aggregator...) + } else { + b.AggregatorFor(descriptor, aggregator...) + } +} + // Process implements export.Processor. func (b *Processor) Process(accum export.Accumulation) error { if b.startedCollection != b.finishedCollection+1 { @@ -172,11 +186,11 @@ func (b *Processor) Process(accum export.Accumulation) error { if stateful { if desc.InstrumentKind().PrecomputedSum() { // If we know we need to compute deltas, allocate two aggregators. - b.AggregatorFor(desc, &newValue.cumulative, &newValue.delta) + b.allocAggregator(desc, accum.ViewAggregatorSelector(), &newValue.cumulative, &newValue.delta) } else { // In this case we are certain not to need a delta, only allocate // a cumulative aggregator. - b.AggregatorFor(desc, &newValue.cumulative) + b.allocAggregator(desc, accum.ViewAggregatorSelector(), &newValue.cumulative) } } b.state.values[key] = newValue @@ -233,7 +247,7 @@ func (b *Processor) Process(accum export.Accumulation) error { // before merging below. if !value.currentOwned { tmp := value.current - b.AggregatorSelector.AggregatorFor(desc, &value.current) + b.allocAggregator(desc, accum.ViewAggregatorSelector(), &value.current) value.currentOwned = true if err := tmp.SynchronizedMove(value.current, desc); err != nil { return err diff --git a/sdk/metric/processor/basic/basic_test.go b/sdk/metric/processor/basic/basic_test.go index 5b578eb7604..58f1f48e1ed 100644 --- a/sdk/metric/processor/basic/basic_test.go +++ b/sdk/metric/processor/basic/basic_test.go @@ -103,13 +103,18 @@ func asNumber(nkind number.Kind, value int64) number.Number { return number.NewFloat64Number(float64(value)) } -func updateFor(t *testing.T, desc *metric.Descriptor, selector metric.AggregatorSelector, res *resource.Resource, value int64, labs ...label.KeyValue) export.Accumulation { +func updateFor(t *testing.T, desc *metric.Descriptor, selector metric.AggregatorSelector, provideViewSelector bool, res *resource.Resource, value int64, labs ...label.KeyValue) export.Accumulation { ls := label.NewSet(labs...) var agg metric.Aggregator selector.AggregatorFor(desc, &agg) require.NoError(t, agg.Update(context.Background(), asNumber(desc.NumberKind(), value), desc)) - return export.NewAccumulation(desc, &ls, res, agg) + var accSelector metric.AggregatorSelector + if provideViewSelector { + accSelector = selector + } + + return export.NewAccumulation(desc, &ls, res, agg, accSelector) } func testProcessor( @@ -127,8 +132,14 @@ func testProcessor( labs1 := []label.KeyValue{label.String("L1", "V")} labs2 := []label.KeyValue{label.String("L2", "V")} - testBody := func(t *testing.T, hasMemory bool, nAccum, nCheckpoint int) { - processor := basic.New(selector, export.ConstantExportKindSelector(ekind), basic.WithMemory(hasMemory)) + testBody := func(t *testing.T, provideViewSelector, hasMemory bool, nAccum, nCheckpoint int) { + // testSelector would be nil given a view selector. + var testSelector metric.AggregatorSelector + if !provideViewSelector { + testSelector = selector + } + + processor := basic.New(testSelector, export.ConstantExportKindSelector(ekind), basic.WithMemory(hasMemory)) instSuffix := fmt.Sprint(".", strings.ToLower(akind.String())) @@ -149,8 +160,8 @@ func testProcessor( processor.StartCollection() for na := 0; na < nAccum; na++ { - _ = processor.Process(updateFor(t, &desc1, selector, res, input, labs1...)) - _ = processor.Process(updateFor(t, &desc2, selector, res, input, labs2...)) + _ = processor.Process(updateFor(t, &desc1, selector, provideViewSelector, res, input, labs1...)) + _ = processor.Process(updateFor(t, &desc2, selector, provideViewSelector, res, input, labs2...)) } err := processor.FinishCollection() @@ -240,20 +251,22 @@ func testProcessor( } } - for _, hasMem := range []bool{false, true} { - t.Run(fmt.Sprintf("HasMemory=%v", hasMem), func(t *testing.T) { - // For 1 to 3 checkpoints: - for nAccum := 1; nAccum <= 3; nAccum++ { - t.Run(fmt.Sprintf("NumAccum=%d", nAccum), func(t *testing.T) { - // For 1 to 3 accumulators: - for nCheckpoint := 1; nCheckpoint <= 3; nCheckpoint++ { - t.Run(fmt.Sprintf("NumCkpt=%d", nCheckpoint), func(t *testing.T) { - testBody(t, hasMem, nAccum, nCheckpoint) - }) - } - }) - } - }) + for _, provideAggrFactory := range []bool{false, true} { + for _, hasMem := range []bool{false, true} { + t.Run(fmt.Sprintf("HasMemory=%v", hasMem), func(t *testing.T) { + // For 1 to 3 checkpoints: + for nAccum := 1; nAccum <= 3; nAccum++ { + t.Run(fmt.Sprintf("NumAccum=%d", nAccum), func(t *testing.T) { + // For 1 to 3 accumulators: + for nCheckpoint := 1; nCheckpoint <= 3; nCheckpoint++ { + t.Run(fmt.Sprintf("NumCkpt=%d", nCheckpoint), func(t *testing.T) { + testBody(t, provideAggrFactory, hasMem, nAccum, nCheckpoint) + }) + } + }) + } + }) + } } } @@ -297,7 +310,7 @@ func TestBasicInconsistent(t *testing.T) { b = basic.New(processorTest.AggregatorSelector(), export.StatelessExportKindSelector()) desc := metric.NewDescriptor("inst", metric.CounterInstrumentKind, number.Int64Kind) - accum := export.NewAccumulation(&desc, label.EmptySet(), resource.Empty(), metrictest.NoopAggregator{}) + accum := export.NewAccumulation(&desc, label.EmptySet(), resource.Empty(), metrictest.NoopAggregator{}, nil) require.Equal(t, basic.ErrInconsistentState, b.Process(accum)) // Test invalid kind: @@ -320,7 +333,7 @@ func TestBasicTimestamps(t *testing.T) { afterNew := time.Now() desc := metric.NewDescriptor("inst", metric.CounterInstrumentKind, number.Int64Kind) - accum := export.NewAccumulation(&desc, label.EmptySet(), resource.Empty(), metrictest.NoopAggregator{}) + accum := export.NewAccumulation(&desc, label.EmptySet(), resource.Empty(), metrictest.NoopAggregator{}, nil) b.StartCollection() _ = b.Process(accum) @@ -383,7 +396,7 @@ func TestStatefulNoMemoryCumulative(t *testing.T) { // Add 10 processor.StartCollection() - _ = processor.Process(updateFor(t, &desc, selector, res, 10, label.String("A", "B"))) + _ = processor.Process(updateFor(t, &desc, selector, false, res, 10, label.String("A", "B"))) require.NoError(t, processor.FinishCollection()) // Verify one element @@ -417,7 +430,7 @@ func TestStatefulNoMemoryDelta(t *testing.T) { // Add 10 processor.StartCollection() - _ = processor.Process(updateFor(t, &desc, selector, res, int64(i*10), label.String("A", "B"))) + _ = processor.Process(updateFor(t, &desc, selector, false, res, int64(i*10), label.String("A", "B"))) require.NoError(t, processor.FinishCollection()) // Verify one element @@ -445,9 +458,9 @@ func TestMultiObserverSum(t *testing.T) { for i := 1; i < 3; i++ { // Add i*10*3 times processor.StartCollection() - _ = processor.Process(updateFor(t, &desc, selector, res, int64(i*10), label.String("A", "B"))) - _ = processor.Process(updateFor(t, &desc, selector, res, int64(i*10), label.String("A", "B"))) - _ = processor.Process(updateFor(t, &desc, selector, res, int64(i*10), label.String("A", "B"))) + _ = processor.Process(updateFor(t, &desc, selector, false, res, int64(i*10), label.String("A", "B"))) + _ = processor.Process(updateFor(t, &desc, selector, false, res, int64(i*10), label.String("A", "B"))) + _ = processor.Process(updateFor(t, &desc, selector, false, res, int64(i*10), label.String("A", "B"))) require.NoError(t, processor.FinishCollection()) // Multiplier is 1 for deltas, otherwise i. diff --git a/sdk/metric/processor/reducer/reducer.go b/sdk/metric/processor/reducer/reducer.go index 91c4dae2d52..6a7b8ea5731 100644 --- a/sdk/metric/processor/reducer/reducer.go +++ b/sdk/metric/processor/reducer/reducer.go @@ -62,6 +62,7 @@ func (p *Processor) Process(accum export.Accumulation) error { &reduced, accum.Resource(), accum.Aggregator(), + nil, ), ) } diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index 82e5050ba9e..d2cc369b8c8 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -437,7 +437,7 @@ func (m *Accumulator) checkpointRecord(r *record) int { return 0 } - a := export.NewAccumulation(&r.inst.descriptor, r.labels, m.resource, r.checkpoint) + a := export.NewAccumulation(&r.inst.descriptor, r.labels, m.resource, r.checkpoint, nil) err = m.processor.Process(a) if err != nil { otel.Handle(err) @@ -455,7 +455,7 @@ func (m *Accumulator) checkpointAsync(a *asyncInstrument) int { epochDiff := m.currentEpoch - lrec.observedEpoch if epochDiff == 0 { if lrec.observed != nil { - a := export.NewAccumulation(&a.descriptor, lrec.labels, m.resource, lrec.observed) + a := export.NewAccumulation(&a.descriptor, lrec.labels, m.resource, lrec.observed, nil) err := m.processor.Process(a) if err != nil { otel.Handle(err) From f70f95827efe8e6d49c143ac02c48626546e97f4 Mon Sep 17 00:00:00 2001 From: beanliu Date: Mon, 18 Jan 2021 10:33:40 +1100 Subject: [PATCH 5/9] support the Metrics View API --- sdk/metric/correct_test.go | 111 ++++++++++++++++++++ sdk/metric/sdk.go | 200 ++++++++++++++++++++++++++++++------- 2 files changed, 273 insertions(+), 38 deletions(-) diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index 0b62bd42b0d..904b1259b28 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -30,6 +30,7 @@ import ( "go.opentelemetry.io/otel/metric/number" export "go.opentelemetry.io/otel/sdk/export/metric" metricsdk "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/processor/processortest" "go.opentelemetry.io/otel/sdk/resource" ) @@ -89,6 +90,16 @@ func (ts *testSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*metri processortest.AggregatorSelector().AggregatorFor(desc, aggPtrs...) } +type testAlwaysLastValueAggregatorSelector struct { +} + +func (t *testAlwaysLastValueAggregatorSelector) AggregatorFor(descriptor *metric.Descriptor, aggPtrs ...*metric.Aggregator) { + aggs := lastvalue.New(len(aggPtrs)) + for i := range aggPtrs { + *aggPtrs[i] = &aggs[i] + } +} + func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessProcessor) { testHandler.Reset() processor := &correctnessProcessor{ @@ -174,6 +185,106 @@ func TestInputRangeValueRecorder(t *testing.T) { require.Nil(t, err) } +func TestViewDropAllLabels(t *testing.T) { + ctx := context.Background() + meter, sdk, processor := newSDK(t) + + valuerecorder := Must(meter).NewFloat64ValueRecorder("name.exact") + Must(meter).RegisterView(metric.NewView(valuerecorder.SyncImpl(), metric.DropAll, nil, nil)) + Must(meter).RegisterView(metric.NewView(valuerecorder.SyncImpl(), metric.LabelKeys, nil, nil)) + + valuerecorder.Record(ctx, 1.0, label.String("A", "a")) + + checkpointed := sdk.Collect(ctx) + require.Equal(t, 1, checkpointed) + + require.Equal(t, 1, len(processor.accumulations)) + require.Equal(t, 0, processor.accumulations[0].Labels().Len()) +} + +func TestViewUngroupLabels(t *testing.T) { + ctx := context.Background() + meter, sdk, processor := newSDK(t) + + valuerecorder := Must(meter).NewFloat64ValueRecorder("name.exact") + Must(meter).RegisterView(metric.NewView(valuerecorder.SyncImpl(), metric.Ungroup, nil, nil)) + + valuerecorder.Record(ctx, 1.0, label.String("A", "a")) + + checkpointed := sdk.Collect(ctx) + require.Equal(t, 1, checkpointed) + + labels := processor.accumulations[0].Labels().ToSlice() + require.ElementsMatch(t, labels, []label.KeyValue{label.String("A", "a")}) +} + +func TestViewLabelKeys(t *testing.T) { + ctx := context.Background() + meter, sdk, processor := newSDK(t) + + valuerecorder := Must(meter).NewFloat64ValueRecorder("name.exact") + Must(meter).RegisterView(metric.NewView(valuerecorder.SyncImpl(), metric.LabelKeys, []label.Key{"A", "B"}, nil)) + + valuerecorder.Record(ctx, 1.0, label.String("A", "a")) + + checkpointed := sdk.Collect(ctx) + require.Equal(t, 1, checkpointed) + + labels := processor.accumulations[0].Labels() + expected := label.NewSet(label.String("A", "a"), label.Empty("B")) + require.True(t, labels.Equals(&expected)) +} + +func TestViewCustomAggregator(t *testing.T) { + ctx := context.Background() + meter, sdk, processor := newSDK(t) + + valuerecorder := Must(meter).NewFloat64ValueRecorder("name.exact") + Must(meter).RegisterView(metric.NewView(valuerecorder.SyncImpl(), metric.Ungroup, nil, &testAlwaysLastValueAggregatorSelector{})) + + valuerecorder.Record(ctx, 1.0, label.String("A", "a")) + valuerecorder.Record(ctx, 2.0, label.String("A", "a")) + + checkpointed := sdk.Collect(ctx) + require.Equal(t, 1, checkpointed) + + agg := processor.accumulations[0].Aggregator() + require.Equal(t, aggregation.LastValueKind, agg.Aggregation().Kind()) +} + +func TestViewBind(t *testing.T) { + ctx := context.Background() + meter, sdk, processor := newSDK(t) + + valuerecorder := Must(meter).NewFloat64ValueRecorder("name.exact") + Must(meter).RegisterView(metric.NewView(valuerecorder.SyncImpl(), metric.Ungroup, nil, &testAlwaysLastValueAggregatorSelector{})) + + bindvaluerecorder := valuerecorder.Bind(label.String("A", "a")) + defer bindvaluerecorder.Unbind() + + Must(meter).RegisterView(metric.NewView(valuerecorder.SyncImpl(), metric.DropAll, nil, &testAlwaysLastValueAggregatorSelector{})) + + valuerecorder.Record(ctx, 1.0, label.String("A", "a")) + + bindvaluerecorder.Record(ctx, 2.0) + + checkpointed := sdk.Collect(ctx) + require.Equal(t, 2, checkpointed) + + require.Equal(t, 2, len(processor.accumulations)) + v0, _, _ := processor.accumulations[0].Aggregator().Aggregation().(aggregation.LastValue).LastValue() + v1, _, _ := processor.accumulations[1].Aggregator().Aggregation().(aggregation.LastValue).LastValue() + result := map[label.Set]float64{ + *processor.accumulations[0].Labels(): v0.AsFloat64(), + *processor.accumulations[1].Labels(): v1.AsFloat64(), + } + expected := map[label.Set]float64{ + label.NewSet(): 1.0, + label.NewSet(label.String("A", "a")): 2.0, + } + require.Equal(t, expected, result) +} + func TestDisabledInstrument(t *testing.T) { ctx := context.Background() meter, sdk, processor := newSDK(t) diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index d2cc369b8c8..201d00a8675 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -18,6 +18,8 @@ import ( "context" "fmt" "runtime" + "sort" + "strings" "sync" "sync/atomic" @@ -71,6 +73,21 @@ type ( syncInstrument struct { instrument + + // views is a logical set of registered views for this instrument. + views sync.Map + } + + // syncInstrumentViewKey is used only in syncInstrument as a key representation of a view. + syncInstrumentViewKey struct { + selector metric.AggregatorSelector + labelConfig metric.ViewLabelConfig + labelKeysSignature string + } + + // syncInstrumentViewValue is used only in syncInstrument as a value representation of a view. + syncInstrumentViewValue struct { + labelKeys map[label.Key]struct{} } // mapkey uniquely describes a metric instrument in terms of @@ -80,6 +97,11 @@ type ( ordered label.Distinct } + // boundInstrument is a snapshot of the view data for a given instrument. + boundInstrument struct { + records []*record + } + // record maintains the state of one metric instrument. Due // the use of lock-free algorithms, there may be more than one // `record` in existence at a time, although at most one can @@ -115,6 +137,9 @@ type ( // inst is a pointer to the corresponding instrument. inst *syncInstrument + // viewSelector is a reference to the corresponding view's aggregator selector. + viewSelector metric.AggregatorSelector + // current implements the actual RecordOne() API, // depending on the type of aggregation. If nil, the // metric was disabled by the exporter. @@ -146,6 +171,7 @@ var ( _ metric.AsyncImpl = &asyncInstrument{} _ metric.SyncImpl = &syncInstrument{} _ metric.BoundSyncImpl = &record{} + _ metric.BoundSyncImpl = &boundInstrument{} ErrUninitializedInstrument = fmt.Errorf("use of an uninitialized instrument") ) @@ -158,6 +184,98 @@ func (a *asyncInstrument) Implementation() interface{} { return a } +func (s *syncInstrument) registerView(labelConfig metric.ViewLabelConfig, labelKeys []label.Key, selector metric.AggregatorSelector) { + key, val := newSyncInstrumentView(labelConfig, labelKeys, selector) + + // First-write-win: ignore duplicated view silently. + // maybe return/record an error? + s.views.LoadOrStore(key, val) +} + +func (s *syncInstrument) rangeViews(kvs []label.KeyValue, f func([]label.KeyValue, metric.AggregatorSelector)) { + empty := true + s.views.Range(func(key, value interface{}) bool { + empty = false + return false + }) + if empty { + s.registerView(metric.Ungroup, nil, nil) + } + + // compute on need + var labelKvs map[label.Key]label.KeyValue + + s.views.Range(func(key, value interface{}) bool { + viewKey := key.(*syncInstrumentViewKey) + viewValue := value.(*syncInstrumentViewValue) + + var viewLabelKeyValues []label.KeyValue + switch viewKey.labelConfig { + case metric.Ungroup: + viewLabelKeyValues = kvs + + case metric.LabelKeys: + if labelKvs == nil { + labelKvs = map[label.Key]label.KeyValue{} + for _, kv := range kvs { + labelKvs[kv.Key] = kv + } + } + for k, kv := range labelKvs { + if _, p := viewValue.labelKeys[k]; p { + viewLabelKeyValues = append(viewLabelKeyValues, kv) + } + } + for k := range viewValue.labelKeys { + if _, p := labelKvs[k]; !p { + viewLabelKeyValues = append(viewLabelKeyValues, k.Empty()) + } + } + + case metric.DropAll: + // keep nil + default: + panic(fmt.Sprintf("impossible label config: %s", viewKey.labelConfig)) + } + + f(viewLabelKeyValues, viewKey.selector) + return true + }) +} + +// newSyncInstrumentView returns a view key/value representation for the given parameters. +func newSyncInstrumentView(labelConfig metric.ViewLabelConfig, labelKeys []label.Key, selector metric.AggregatorSelector) (*syncInstrumentViewKey, *syncInstrumentViewValue) { + key := &syncInstrumentViewKey{ + selector: selector, + labelConfig: labelConfig, + labelKeysSignature: "", + } + value := &syncInstrumentViewValue{ + labelKeys: nil, + } + + if labelConfig == metric.LabelKeys { + if len(labelKeys) > 0 { + value.labelKeys = map[label.Key]struct{}{} + for _, k := range labelKeys { + value.labelKeys[k] = struct{}{} + } + + keys := make([]string, 0, len(value.labelKeys)) + for k := range value.labelKeys { + keys = append(keys, string(k)) + } + sort.Strings(keys) + key.labelKeysSignature = strings.Join(keys, "|") + } else { + // make sure there is only one representation for aggregations without any labels + key.labelConfig = metric.DropAll + } + } + + return key, value +} + func (s *syncInstrument) Implementation() interface{} { return s } @@ -209,21 +327,18 @@ func (a *asyncInstrument) getRecorder(labels *label.Set) metric.Aggregator { // support re-use of the orderedLabels computed by a previous // measurement in the same batch. This performs two allocations // in the common case. -func (s *syncInstrument) acquireHandle(kvs []label.KeyValue, labelPtr *label.Set) *record { +func (s *syncInstrument) acquireHandle(kvs []label.KeyValue, selector metric.AggregatorSelector) *record { var rec *record var equiv label.Distinct - if labelPtr == nil { - // This memory allocation may not be used, but it's - // needed for the `sortSlice` field, to avoid an - // allocation while sorting. - rec = &record{} - rec.storage = label.NewSetWithSortable(kvs, &rec.sortSlice) - rec.labels = &rec.storage - equiv = rec.storage.Equivalent() - } else { - equiv = labelPtr.Equivalent() - } + // This memory allocation may not be used, but it's + // needed for the `sortSlice` field, to avoid an + // allocation while sorting. + rec = &record{} + rec.storage = label.NewSetWithSortable(kvs, &rec.sortSlice) + rec.labels = &rec.storage + rec.viewSelector = selector + equiv = rec.storage.Equivalent() // Create lookup key for sync.Map (one allocation, as this // passes through an interface{}) @@ -243,14 +358,14 @@ func (s *syncInstrument) acquireHandle(kvs []label.KeyValue, labelPtr *label.Set // This entry is no longer mapped, try to add a new entry. } - if rec == nil { - rec = &record{} - rec.labels = labelPtr - } rec.refMapped = refcountMapped{value: 2} rec.inst = s - s.meter.processor.AggregatorFor(&s.descriptor, &rec.current, &rec.checkpoint) + if selector != nil { + selector.AggregatorFor(&s.descriptor, &rec.current, &rec.checkpoint) + } else { + s.meter.processor.AggregatorFor(&s.descriptor, &rec.current, &rec.checkpoint) + } for { // Load/Store: there's a memory allocation to place `mk` into @@ -283,13 +398,19 @@ func (s *syncInstrument) acquireHandle(kvs []label.KeyValue, labelPtr *label.Set } func (s *syncInstrument) Bind(kvs []label.KeyValue) metric.BoundSyncImpl { - return s.acquireHandle(kvs, nil) + result := &boundInstrument{} + s.rangeViews(kvs, func(viewKvs []label.KeyValue, selector metric.AggregatorSelector) { + result.records = append(result.records, s.acquireHandle(viewKvs, selector)) + }) + return result } func (s *syncInstrument) RecordOne(ctx context.Context, num number.Number, kvs []label.KeyValue) { - h := s.acquireHandle(kvs, nil) - defer h.Unbind() - h.RecordOne(ctx, num) + s.rangeViews(kvs, func(viewKvs []label.KeyValue, selector metric.AggregatorSelector) { + h := s.acquireHandle(viewKvs, selector) + h.RecordOne(ctx, num) + h.Unbind() + }) } // NewAccumulator constructs a new Accumulator for the given @@ -310,7 +431,11 @@ func NewAccumulator(processor export.Processor, resource *resource.Resource) *Ac } func (m *Accumulator) RegisterView(v metric.View) { - panic("me") + s := m.fromSync(v.SyncImpl()) + // maybe return an error if s is `nil`? + if s != nil { + s.registerView(v.LabelConfig(), v.LabelKeys(), v.AggregatorFactory()) + } } // NewSyncInstrument implements metric.MetricImpl. @@ -437,7 +562,7 @@ func (m *Accumulator) checkpointRecord(r *record) int { return 0 } - a := export.NewAccumulation(&r.inst.descriptor, r.labels, m.resource, r.checkpoint, nil) + a := export.NewAccumulation(&r.inst.descriptor, r.labels, m.resource, r.checkpoint, r.viewSelector) err = m.processor.Process(a) if err != nil { otel.Handle(err) @@ -477,25 +602,12 @@ func (m *Accumulator) checkpointAsync(a *asyncInstrument) int { // RecordBatch enters a batch of metric events. func (m *Accumulator) RecordBatch(ctx context.Context, kvs []label.KeyValue, measurements ...metric.Measurement) { - // Labels will be computed the first time acquireHandle is - // called. Subsequent calls to acquireHandle will re-use the - // previously computed value instead of recomputing the - // ordered labels. - var labelsPtr *label.Set - for i, meas := range measurements { + for _, meas := range measurements { s := m.fromSync(meas.SyncImpl()) if s == nil { continue } - h := s.acquireHandle(kvs, labelsPtr) - - // Re-use labels for the next measurement. - if i == 0 { - labelsPtr = h.labels - } - - defer h.Unbind() - h.RecordOne(ctx, meas.Number()) + s.RecordOne(ctx, meas.Number(), kvs) } } @@ -530,6 +642,18 @@ func (r *record) mapkey() mapkey { } } +func (b *boundInstrument) RecordOne(ctx context.Context, number number.Number) { + for _, r := range b.records { + r.RecordOne(ctx, number) + } +} + +func (b *boundInstrument) Unbind() { + for _, r := range b.records { + r.Unbind() + } +} + // fromSync gets a sync implementation object, checking for // uninitialized instruments and instruments created by another SDK. func (m *Accumulator) fromSync(sync metric.SyncImpl) *syncInstrument { From c51df0b9767ec5e89368d1eb2dbca3d316a6d402 Mon Sep 17 00:00:00 2001 From: beanliu Date: Mon, 18 Jan 2021 10:34:02 +1100 Subject: [PATCH 6/9] add an example of the Metric View API --- example/otel-collector/main.go | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/example/otel-collector/main.go b/example/otel-collector/main.go index 249ca9e85f4..566094c409b 100644 --- a/example/otel-collector/main.go +++ b/example/otel-collector/main.go @@ -106,20 +106,30 @@ func main() { tracer := otel.Tracer("test-tracer") meter := otel.Meter("test-meter") + // Recorder metric example + vr, _ := meter.NewFloat64Counter( + "an_important_metric", + metric.WithDescription("Measures the cumulative epicness of the app"), + ) + + // DropAll view, meaning all records label would be dropped before aggregation. + meter.RegisterView(metric.NewView(vr.SyncImpl(), metric.DropAll, nil, nil)) + + // Ungroup view, meaning all records take the labels directly. + meter.RegisterView(metric.NewView(vr.SyncImpl(), metric.Ungroup, nil, nil)) + + // Two views with the same metric and aggregation type but different label keys for aggregation. + meter.RegisterView(metric.NewView(vr.SyncImpl(), metric.LabelKeys, []label.Key{"os_type"}, nil)) + meter.RegisterView(metric.NewView(vr.SyncImpl(), metric.LabelKeys, []label.Key{"environment"}, nil)) + // labels represent additional key-value descriptors that can be bound to a // metric observer or recorder. commonLabels := []label.KeyValue{ - label.String("labelA", "chocolate"), - label.String("labelB", "raspberry"), - label.String("labelC", "vanilla"), + label.String("os_type", "linux"), } - // Recorder metric example - valuerecorder := metric.Must(meter). - NewFloat64Counter( - "an_important_metric", - metric.WithDescription("Measures the cumulative epicness of the app"), - ).Bind(commonLabels...) + // Note: views registered after binding have no effect on the bind object. + valuerecorder := vr.Bind(commonLabels...) defer valuerecorder.Unbind() // work begins @@ -133,6 +143,10 @@ func main() { log.Printf("Doing really hard work (%d / 10)\n", i+1) valuerecorder.Add(ctx, 1.0) + vr.Add(context.Background(), 1.0, label.String("os_type", "win"), label.String("environment", "prod")) + + vr.Add(context.Background(), 1.0, label.String("environment", "ddev")) + <-time.After(time.Second) iSpan.End() } From 50f51a661279a8ae89cf91875bb86b7fb1cb2ce0 Mon Sep 17 00:00:00 2001 From: beanliu Date: Mon, 18 Jan 2021 13:31:04 +1100 Subject: [PATCH 7/9] update CHANGELOG --- CHANGELOG.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c624b9fe251..2a88d50f56b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,17 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] +### Added + +- Add `View` and `RegisterView` as part of the Metric API, and their implementation in the SDK. (#1473) + +### Changed + +- Moved `Aggregator`/`Aggregation`/`AggregatorSelector` interfaces to _otel/metric_ from _otel/sdk/export/metric_. (#1473) +- Support `KeyValue` labels with values unset. (#1473) +- Modified the `otel-collector` example to include View API usage. (#1473) +- `Accumulation` accepts a view selector for aggregator creation. (#1473) + ## [0.16.0] - 2020-01-13 ### Added From bcc48c0c47561774b3fb766a33e45410a900e6c8 Mon Sep 17 00:00:00 2001 From: Raymond Wang Date: Wed, 20 Jan 2021 11:23:11 +1100 Subject: [PATCH 8/9] Added fixed bucket histogram view example using simple aggregation selector --- example/otel-collector/main.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/example/otel-collector/main.go b/example/otel-collector/main.go index 566094c409b..3bad3305620 100644 --- a/example/otel-collector/main.go +++ b/example/otel-collector/main.go @@ -31,6 +31,7 @@ import ( "go.opentelemetry.io/otel/label" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" "go.opentelemetry.io/otel/sdk/metric/selector/simple" @@ -122,6 +123,16 @@ func main() { meter.RegisterView(metric.NewView(vr.SyncImpl(), metric.LabelKeys, []label.Key{"os_type"}, nil)) meter.RegisterView(metric.NewView(vr.SyncImpl(), metric.LabelKeys, []label.Key{"environment"}, nil)) + // ValueRecorder histogram metric example + hist, _ := meter.NewFloat64ValueRecorder( + "example_hist", + metric.WithDescription("Example fixed bucket histogram")) + + // Custom defined fixed bucket histogram aggregation + definedBuckets := []float64{1.0, 2.0, 3.0} + histogramAggregatorSelector := simple.NewWithHistogramDistribution(histogram.WithExplicitBoundaries(definedBuckets)) + meter.RegisterView(metric.NewView(hist.SyncImpl(), metric.Ungroup, nil, histogramAggregatorSelector)) + // labels represent additional key-value descriptors that can be bound to a // metric observer or recorder. commonLabels := []label.KeyValue{ @@ -147,6 +158,8 @@ func main() { vr.Add(context.Background(), 1.0, label.String("environment", "ddev")) + hist.Record(context.Background(), float64(i%5)) + <-time.After(time.Second) iSpan.End() } From cb00cdf0569021c3e0e4701b8c3cc73806a15fa3 Mon Sep 17 00:00:00 2001 From: Raymond Wang Date: Wed, 20 Jan 2021 11:28:59 +1100 Subject: [PATCH 9/9] Wrapped aggregation selector constructor --- example/otel-collector/main.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/example/otel-collector/main.go b/example/otel-collector/main.go index 3bad3305620..038da37cdcd 100644 --- a/example/otel-collector/main.go +++ b/example/otel-collector/main.go @@ -130,7 +130,7 @@ func main() { // Custom defined fixed bucket histogram aggregation definedBuckets := []float64{1.0, 2.0, 3.0} - histogramAggregatorSelector := simple.NewWithHistogramDistribution(histogram.WithExplicitBoundaries(definedBuckets)) + histogramAggregatorSelector := NewHistogramAggregationSelector(definedBuckets) meter.RegisterView(metric.NewView(hist.SyncImpl(), metric.Ungroup, nil, histogramAggregatorSelector)) // labels represent additional key-value descriptors that can be bound to a @@ -172,3 +172,7 @@ func handleErr(err error, message string) { log.Fatalf("%s: %v", message, err) } } + +func NewHistogramAggregationSelector(buckets []float64) metric.AggregatorSelector { + return simple.NewWithHistogramDistribution(histogram.WithExplicitBoundaries(buckets)) +}