diff --git a/CHANGELOG.md b/CHANGELOG.md index 6860a271c29..e1a68c5a3e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm This addresses [GO-2022-0493](https://pkg.go.dev/vuln/GO-2022-0493). (#3235) - Update histogram default bounds to match the requirements of the latest specification. (#3222) +### Fixed + +- Use default view if instrument does not match any registered view of a reader. (#3224, #3237) + ## [0.32.1] Metric SDK (Alpha) - 2022-09-22 ### Changed diff --git a/sdk/metric/config.go b/sdk/metric/config.go index a1253334505..5b7537f63ed 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -126,10 +126,6 @@ func WithReader(r Reader, views ...view.View) Option { if cfg.readers == nil { cfg.readers = make(map[Reader][]view.View) } - if len(views) == 0 { - views = []view.View{{}} - } - cfg.readers[r] = views return cfg }) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 0bd52d63023..9eb86f593ac 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -188,31 +188,23 @@ func newInserter[N int64 | float64](p *pipeline) *inserter[N] { // Instrument inserts instrument inst with instUnit returning the Aggregators // that need to be updated with measurments for that instrument. func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]internal.Aggregator[N], error) { + var matched bool seen := map[instrumentID]struct{}{} var aggs []internal.Aggregator[N] errs := &multierror{wrapped: errCreatingAggregators} for _, v := range i.pipeline.views { inst, match := v.TransformInstrument(inst) + if !match { + continue + } + matched = true id := instrumentID{ scope: inst.Scope, name: inst.Name, description: inst.Description, } - - if _, ok := seen[id]; ok || !match { - continue - } - - if inst.Aggregation == nil { - inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind) - } else if _, ok := inst.Aggregation.(aggregation.Default); ok { - inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind) - } - - if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil { - err = fmt.Errorf("creating aggregator with instrumentKind: %d, aggregation %v: %w", inst.Kind, inst.Aggregation, err) - errs.append(err) + if _, ok := seen[id]; ok { continue } @@ -233,16 +225,40 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in errs.append(err) } } - // TODO(#3224): handle when no views match. Default should be reader - // aggregation returned. + + if !matched { // Apply implicit default view if no explicit matched. + a, err := i.aggregator(inst) + if err != nil { + errs.append(err) + } + if a != nil { + aggs = append(aggs, a) + err = i.pipeline.addAggregator(inst.Scope, inst.Name, inst.Description, instUnit, a) + if err != nil { + errs.append(err) + } + } + } + return aggs, errs.errorOrNil() } // aggregator returns the Aggregator for an instrument configuration. If the // instrument defines an unknown aggregation, an error is returned. func (i *inserter[N]) aggregator(inst view.Instrument) (internal.Aggregator[N], error) { - // TODO (#3011): If filtering is done by the Aggregator it should be passed - // here. + switch inst.Aggregation.(type) { + case nil, aggregation.Default: + // Undefined, nil, means to use the default from the reader. + inst.Aggregation = i.pipeline.reader.aggregation(inst.Kind) + } + + if err := isAggregatorCompatible(inst.Kind, inst.Aggregation); err != nil { + return nil, fmt.Errorf( + "creating aggregator with instrumentKind: %d, aggregation %v: %w", + inst.Kind, inst.Aggregation, err, + ) + } + var ( temporality = i.pipeline.reader.temporality(inst.Kind) monotonic bool diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index ca83c9c3a9e..7a3321da0c1 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -25,7 +25,10 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + "go.opentelemetry.io/otel/sdk/metric/view" "go.opentelemetry.io/otel/sdk/resource" ) @@ -211,3 +214,61 @@ func TestPipelineConcurrency(t *testing.T) { } wg.Wait() } + +func TestDefaultViewImplicit(t *testing.T) { + t.Run("Int64", testDefaultViewImplicit[int64]()) + t.Run("Float64", testDefaultViewImplicit[float64]()) +} + +func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) { + inst := view.Instrument{ + Scope: instrumentation.Scope{Name: "testing/lib"}, + Name: "requests", + Description: "count of requests received", + Kind: view.SyncCounter, + Aggregation: aggregation.Sum{}, + } + return func(t *testing.T) { + reader := NewManualReader() + v, err := view.New(view.MatchInstrumentName("foo"), view.WithRename("bar")) + require.NoError(t, err) + + tests := []struct { + name string + pipe *pipeline + }{ + { + name: "NoView", + pipe: newPipeline(nil, reader, nil), + }, + { + name: "NoMatchingView", + pipe: newPipeline(nil, reader, []view.View{v}), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + i := newInserter[N](test.pipe) + got, err := i.Instrument(inst, unit.Dimensionless) + require.NoError(t, err) + assert.Len(t, got, 1, "default view not applied") + + out, err := test.pipe.produce(context.Background()) + require.NoError(t, err) + require.Len(t, out.ScopeMetrics, 1, "Aggregator not registered with pipeline") + sm := out.ScopeMetrics[0] + require.Len(t, sm.Metrics, 1, "metrics not produced from default view") + metricdatatest.AssertEqual(t, metricdata.Metrics{ + Name: inst.Name, + Description: inst.Description, + Unit: unit.Dimensionless, + Data: metricdata.Sum[N]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, sm.Metrics[0], metricdatatest.IgnoreTimestamp()) + }) + } + } +}