From 48d27fee43a5c6fdb93c8bba08e88caf2b5afc17 Mon Sep 17 00:00:00 2001 From: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> Date: Wed, 8 Jun 2022 21:06:39 +0000 Subject: [PATCH 1/3] Introduce Temporality and InstrumentKind Because Temporality is the responsibility of the Reader additional methods are added to the Reader interface. And New options are created to configure the temporality selector. --- sdk/metric/config_test.go | 10 +++-- sdk/metric/instrumentkind.go | 34 +++++++++++++++++ sdk/metric/manual_reader.go | 35 +++++++++++++++++- sdk/metric/manual_reader_test.go | 2 +- sdk/metric/periodic_reader.go | 25 +++++++++---- sdk/metric/reader.go | 10 +++++ sdk/metric/temporality.go | 63 ++++++++++++++++++++++++++++++++ 7 files changed, 164 insertions(+), 15 deletions(-) create mode 100644 sdk/metric/instrumentkind.go create mode 100644 sdk/metric/temporality.go diff --git a/sdk/metric/config_test.go b/sdk/metric/config_test.go index 22917ba4fa0..c2b91e8f559 100644 --- a/sdk/metric/config_test.go +++ b/sdk/metric/config_test.go @@ -30,15 +30,17 @@ import ( ) type reader struct { - producer producer - collectFunc func(context.Context) (export.Metrics, error) - forceFlushFunc func(context.Context) error - shutdownFunc func(context.Context) error + producer producer + temporalityFunc func(InstrumentKind) Temporality + collectFunc func(context.Context) (export.Metrics, error) + forceFlushFunc func(context.Context) error + shutdownFunc func(context.Context) error } var _ Reader = (*reader)(nil) func (r *reader) register(p producer) { r.producer = p } +func (r *reader) temporality(kind InstrumentKind) Temporality { return r.temporalityFunc(kind) } func (r *reader) Collect(ctx context.Context) (export.Metrics, error) { return r.collectFunc(ctx) } func (r *reader) ForceFlush(ctx context.Context) error { return r.forceFlushFunc(ctx) } func (r *reader) Shutdown(ctx context.Context) error { return r.shutdownFunc(ctx) } diff --git a/sdk/metric/instrumentkind.go b/sdk/metric/instrumentkind.go new file mode 100644 index 00000000000..e34dfa23ab7 --- /dev/null +++ b/sdk/metric/instrumentkind.go @@ -0,0 +1,34 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.17 +// +build go1.17 + +package metric // import "go.opentelemetry.io/otel/sdk/metric" + +// InstrumentKind describes the a kind of instrument. +type InstrumentKind uint8 + +// The following represent the different kind of instruments that can be +// created by the SDK. +const ( + //nolint:deadcode,varcheck + undefinedInstrument InstrumentKind = iota + SyncCounter + SyncUpDownCounter + SyncHistogram + AsyncCounter + AsyncUpDownCounter + AsyncGauge +) diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 485f09faa8e..97a11dfdfcd 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -32,14 +32,19 @@ import ( type manualReader struct { producer atomic.Value shutdownOnce sync.Once + + temporalitySelector func(InstrumentKind) Temporality } // Compile time check the manualReader implements Reader. var _ Reader = &manualReader{} // NewManualReader returns a Reader which is directly called to collect metrics. -func NewManualReader() Reader { - return &manualReader{} +func NewManualReader(opts ...ManualReaderOption) Reader { + cfg := newManualReaderConfig(opts) + return &manualReader{ + temporalitySelector: cfg.temporalitySelector, + } } // register stores the Producer which enables the caller to read @@ -52,6 +57,11 @@ func (mr *manualReader) register(p producer) { } } +// temporality reports the Temporality for the instrument kind provided. +func (mr *manualReader) temporality(kind InstrumentKind) Temporality { + return mr.temporalitySelector(kind) +} + // ForceFlush is a no-op, it always returns nil. func (mr *manualReader) ForceFlush(context.Context) error { return nil @@ -89,3 +99,24 @@ func (mr *manualReader) Collect(ctx context.Context) (export.Metrics, error) { } return ph.produce(ctx) } + +// manualReaderConfig contains configuration options for a ManualReader. +type manualReaderConfig struct { + temporalitySelector func(InstrumentKind) Temporality +} + +// newManualReaderConfig returns a manualReaderConfig configured with options. +func newManualReaderConfig(opts []ManualReaderOption) manualReaderConfig { + cfg := manualReaderConfig{ + temporalitySelector: defaultTemporalitySelector, + } + for _, opt := range opts { + cfg = opt.applyManual(cfg) + } + return cfg +} + +// ManualReaderOption applies a configuration option value to a ManualReader. +type ManualReaderOption interface { + applyManual(manualReaderConfig) manualReaderConfig +} diff --git a/sdk/metric/manual_reader_test.go b/sdk/metric/manual_reader_test.go index 6ea83824a1c..582ca64f2ae 100644 --- a/sdk/metric/manual_reader_test.go +++ b/sdk/metric/manual_reader_test.go @@ -24,7 +24,7 @@ import ( ) func TestManualReader(t *testing.T) { - suite.Run(t, &readerTestSuite{Factory: NewManualReader}) + suite.Run(t, &readerTestSuite{Factory: func() Reader { return NewManualReader() }}) } func BenchmarkManualReader(b *testing.B) { diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 75585fe9178..22d81cad627 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -37,33 +37,35 @@ const ( // periodicReaderConfig contains configuration options for a PeriodicReader. type periodicReaderConfig struct { - interval time.Duration - timeout time.Duration + interval time.Duration + timeout time.Duration + temporalitySelector func(InstrumentKind) Temporality } // newPeriodicReaderConfig returns a periodicReaderConfig configured with // options. func newPeriodicReaderConfig(options []PeriodicReaderOption) periodicReaderConfig { c := periodicReaderConfig{ - interval: defaultInterval, - timeout: defaultTimeout, + interval: defaultInterval, + timeout: defaultTimeout, + temporalitySelector: defaultTemporalitySelector, } for _, o := range options { - c = o.apply(c) + c = o.applyPeriodic(c) } return c } // PeriodicReaderOption applies a configuration option value to a PeriodicReader. type PeriodicReaderOption interface { - apply(periodicReaderConfig) periodicReaderConfig + applyPeriodic(periodicReaderConfig) periodicReaderConfig } // periodicReaderOptionFunc applies a set of options to a periodicReaderConfig. type periodicReaderOptionFunc func(periodicReaderConfig) periodicReaderConfig -// apply returns a periodicReaderConfig with option(s) applied. -func (o periodicReaderOptionFunc) apply(conf periodicReaderConfig) periodicReaderConfig { +// applyPeriodic returns a periodicReaderConfig with option(s) applied. +func (o periodicReaderOptionFunc) applyPeriodic(conf periodicReaderConfig) periodicReaderConfig { return o(conf) } @@ -132,6 +134,8 @@ type periodicReader struct { timeout time.Duration exporter Exporter + temporalitySelector func(InstrumentKind) Temporality + wg sync.WaitGroup cancel context.CancelFunc shutdownOnce sync.Once @@ -173,6 +177,11 @@ func (r *periodicReader) register(p producer) { } } +// temporality reports the Temporality for the instrument kind provided. +func (r *periodicReader) temporality(kind InstrumentKind) Temporality { + return r.temporalitySelector(kind) +} + // Collect gathers and returns all metric data related to the Reader from // the SDK. The returned metric data is not exported to the configured // exporter, it is left to the caller to handle that if desired. diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index 7964e08d383..dc5fbc24822 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -55,6 +55,9 @@ type Reader interface { // and send aggregated metric measurements. register(producer) + // temporality reports the Temporality for the instrument kind provided. + temporality(InstrumentKind) Temporality + // Collect gathers and returns all metric data related to the Reader from // the SDK. An error is returned if this is called after Shutdown. Collect(context.Context) (export.Metrics, error) @@ -101,3 +104,10 @@ type shutdownProducer struct{} func (p shutdownProducer) produce(context.Context) (export.Metrics, error) { return export.Metrics{}, ErrReaderShutdown } + +// ReaderOption applies a configuration option value to either a ManualReader or +// a PeriodicReader. +type ReaderOption interface { + ManualReaderOption + PeriodicReaderOption +} diff --git a/sdk/metric/temporality.go b/sdk/metric/temporality.go new file mode 100644 index 00000000000..6a9d901d53c --- /dev/null +++ b/sdk/metric/temporality.go @@ -0,0 +1,63 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.17 +// +build go1.17 + +package metric // import "go.opentelemetry.io/otel/sdk/metric" + +// Temporality defines the window that an aggregation was calculated over. +type Temporality uint8 + +const ( + // undefinedTemporality represents an unset Temporality. + //nolint:deadcode,unused,varcheck + undefinedTemporality Temporality = iota + + // CumulativeTemporality defines a measurement interval that continues to + // expand forward in time from a starting point. New measurements are + // added to all previous measurements since a start time. + CumulativeTemporality + + // DeltaTemporality defines a measurement interval that resets each cycle. + // Measurements from one cycle are recorded independently, measurements + // from other cycles do not affect them. + DeltaTemporality +) + +// WithTemporality uses the selector to determine the Temporality measurements +// from instrument should be recorded with. +func WithTemporality(selector func(instrument InstrumentKind) Temporality) ReaderOption { + return temporalitySelectorOption{selector: selector} +} + +type temporalitySelectorOption struct { + selector func(instrument InstrumentKind) Temporality +} + +// applyManual returns a manualReaderConfig with option applied. +func (t temporalitySelectorOption) applyManual(mrc manualReaderConfig) manualReaderConfig { + mrc.temporalitySelector = t.selector + return mrc +} + +// applyPeriodic returns a periodicReaderConfig with option applied. +func (t temporalitySelectorOption) applyPeriodic(prc periodicReaderConfig) periodicReaderConfig { + prc.temporalitySelector = t.selector + return prc +} + +func defaultTemporalitySelector(_ InstrumentKind) Temporality { + return CumulativeTemporality +} From abea9683c75dc3793c4120f9a626b1c200b47e16 Mon Sep 17 00:00:00 2001 From: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> Date: Fri, 10 Jun 2022 13:37:51 +0000 Subject: [PATCH 2/3] Addresses comments, and adds tests. --- sdk/metric/instrumentkind.go | 16 ++++++++++-- sdk/metric/manual_reader_test.go | 41 ++++++++++++++++++++++++++++++ sdk/metric/periodic_reader.go | 2 ++ sdk/metric/periodic_reader_test.go | 37 +++++++++++++++++++++++++++ sdk/metric/reader.go | 28 ++++++++++++++++++++ sdk/metric/temporality.go | 26 ------------------- 6 files changed, 122 insertions(+), 28 deletions(-) diff --git a/sdk/metric/instrumentkind.go b/sdk/metric/instrumentkind.go index e34dfa23ab7..d8126583d24 100644 --- a/sdk/metric/instrumentkind.go +++ b/sdk/metric/instrumentkind.go @@ -20,15 +20,27 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" // InstrumentKind describes the a kind of instrument. type InstrumentKind uint8 -// The following represent the different kind of instruments that can be -// created by the SDK. +// These are all the instrument kinds supported by the SDK. const ( + // undefinedInstrument is an uninitialized instrument kind, should not be used. //nolint:deadcode,varcheck undefinedInstrument InstrumentKind = iota + // SyncCounter is an instrument kind that records increasing values + // synchronously in application code. SyncCounter + // SyncUpDownCounter is an instrument kind that records increasing and + // decreasing values synchronously in application code. SyncUpDownCounter + // SyncHistogram is an instrument kind that records a distribution of + // values synchronously in application code. SyncHistogram + // AsyncCounter is an instrument kind that records increasing values in an + // asynchronous callback. AsyncCounter + // AsyncUpDownCounter is an instrument kind that records increasing and + // decreasing values in an asynchronous callback. AsyncUpDownCounter + // AsyncGauge is an instrument kind that records current values in an + // asynchronous callback. AsyncGauge ) diff --git a/sdk/metric/manual_reader_test.go b/sdk/metric/manual_reader_test.go index 582ca64f2ae..61b9ec74291 100644 --- a/sdk/metric/manual_reader_test.go +++ b/sdk/metric/manual_reader_test.go @@ -20,6 +20,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric/reader" import ( "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" ) @@ -30,3 +31,43 @@ func TestManualReader(t *testing.T) { func BenchmarkManualReader(b *testing.B) { b.Run("Collect", benchReaderCollectFunc(NewManualReader())) } + +var deltaTemporalitySelector = func(InstrumentKind) Temporality { return DeltaTemporality } +var cumulativeTemporalitySelector = func(InstrumentKind) Temporality { return CumulativeTemporality } + +func TestManualReaderTemporality(t *testing.T) { + tests := []struct { + name string + options []ManualReaderOption + // Currently only testing constant temporality. This should be expanded + // if we put more advanced selection in the SDK + wantTemporality Temporality + }{ + { + name: "default", + wantTemporality: CumulativeTemporality, + }, + { + name: "delta", + options: []ManualReaderOption{ + WithTemporality(deltaTemporalitySelector), + }, + wantTemporality: DeltaTemporality, + }, + { + name: "repeats overwrite", + options: []ManualReaderOption{ + WithTemporality(deltaTemporalitySelector), + WithTemporality(cumulativeTemporalitySelector), + }, + wantTemporality: CumulativeTemporality, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rdr := NewManualReader(tt.options...) + assert.Equal(t, tt.wantTemporality, rdr.temporality(undefinedInstrument)) + }) + } +} diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 22d81cad627..83ef273a341 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -115,6 +115,8 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade timeout: conf.timeout, exporter: exporter, cancel: cancel, + + temporalitySelector: conf.temporalitySelector, } r.wg.Add(1) diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index 1dc42fb254e..ae5e40f2a20 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -184,3 +184,40 @@ func BenchmarkPeriodicReader(b *testing.B) { NewPeriodicReader(new(fnExporter)), )) } + +func TestPeriodiclReaderTemporality(t *testing.T) { + tests := []struct { + name string + options []PeriodicReaderOption + // Currently only testing constant temporality. This should be expanded + // if we put more advanced selection in the SDK + wantTemporality Temporality + }{ + { + name: "default", + wantTemporality: CumulativeTemporality, + }, + { + name: "delta", + options: []PeriodicReaderOption{ + WithTemporality(deltaTemporalitySelector), + }, + wantTemporality: DeltaTemporality, + }, + { + name: "repeats overwrite", + options: []PeriodicReaderOption{ + WithTemporality(deltaTemporalitySelector), + WithTemporality(cumulativeTemporalitySelector), + }, + wantTemporality: CumulativeTemporality, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rdr := NewPeriodicReader(new(fnExporter), tt.options...) + assert.Equal(t, tt.wantTemporality, rdr.temporality(undefinedInstrument)) + }) + } +} diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index dc5fbc24822..2af077bb4bf 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -111,3 +111,31 @@ type ReaderOption interface { ManualReaderOption PeriodicReaderOption } + +// WithTemporality uses the selector to determine the Temporality measurements +// from instrument should be recorded with. +func WithTemporality(selector func(instrument InstrumentKind) Temporality) ReaderOption { + return temporalitySelectorOption{selector: selector} +} + +type temporalitySelectorOption struct { + selector func(instrument InstrumentKind) Temporality +} + +// applyManual returns a manualReaderConfig with option applied. +func (t temporalitySelectorOption) applyManual(mrc manualReaderConfig) manualReaderConfig { + mrc.temporalitySelector = t.selector + return mrc +} + +// applyPeriodic returns a periodicReaderConfig with option applied. +func (t temporalitySelectorOption) applyPeriodic(prc periodicReaderConfig) periodicReaderConfig { + prc.temporalitySelector = t.selector + return prc +} + +// defaultTemporalitySelector returns the default Temporality measurements +// from instrument should be recorded with: cumulative. +func defaultTemporalitySelector(InstrumentKind) Temporality { + return CumulativeTemporality +} diff --git a/sdk/metric/temporality.go b/sdk/metric/temporality.go index 6a9d901d53c..289b151606e 100644 --- a/sdk/metric/temporality.go +++ b/sdk/metric/temporality.go @@ -35,29 +35,3 @@ const ( // from other cycles do not affect them. DeltaTemporality ) - -// WithTemporality uses the selector to determine the Temporality measurements -// from instrument should be recorded with. -func WithTemporality(selector func(instrument InstrumentKind) Temporality) ReaderOption { - return temporalitySelectorOption{selector: selector} -} - -type temporalitySelectorOption struct { - selector func(instrument InstrumentKind) Temporality -} - -// applyManual returns a manualReaderConfig with option applied. -func (t temporalitySelectorOption) applyManual(mrc manualReaderConfig) manualReaderConfig { - mrc.temporalitySelector = t.selector - return mrc -} - -// applyPeriodic returns a periodicReaderConfig with option applied. -func (t temporalitySelectorOption) applyPeriodic(prc periodicReaderConfig) periodicReaderConfig { - prc.temporalitySelector = t.selector - return prc -} - -func defaultTemporalitySelector(_ InstrumentKind) Temporality { - return CumulativeTemporality -} From ecb0ce602467653ad3115fdd0ae4b8f1202cab31 Mon Sep 17 00:00:00 2001 From: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> Date: Fri, 10 Jun 2022 13:39:42 +0000 Subject: [PATCH 3/3] Fix addition PR comment --- sdk/metric/instrumentkind.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metric/instrumentkind.go b/sdk/metric/instrumentkind.go index d8126583d24..8174eee5ef3 100644 --- a/sdk/metric/instrumentkind.go +++ b/sdk/metric/instrumentkind.go @@ -17,7 +17,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" -// InstrumentKind describes the a kind of instrument. +// InstrumentKind describes the kind of instrument a Meter can create. type InstrumentKind uint8 // These are all the instrument kinds supported by the SDK.