Skip to content

Commit

Permalink
Refactor Pipeline (#3233)
Browse files Browse the repository at this point in the history
* Add views field to pipeline

Redundant maps tracking readers to views and readers to pipelines exist
in the pipelineRegistry. Unify these maps by tracing views in pipelines.

* Rename newPipelineRegistries->newPipelineRegistry

* Add Reader as field to pipeline

* Replace createAggregators with resolver facilitator

* Replace create agg funcs with inserter facilitator

* Correct documentation

* Replace pipelineRegistry with []pipeline type

* Rename newPipelineRegistry->newPipelines

* Fix pipeline_registry_test

* Flatten isMonotonic into only use

* Update FIXME into TODO

* Rename instrument provider resolver field to resolve

* Fix comment English

* Fix drop aggregator detection
  • Loading branch information
MrAlias authored Sep 28, 2022
1 parent 12e16d4 commit aca054b
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 171 deletions.
40 changes: 20 additions & 20 deletions sdk/metric/instrument_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
)

type asyncInt64Provider struct {
scope instrumentation.Scope
registry *pipelineRegistry
scope instrumentation.Scope
resolve *resolver[int64]
}

var _ asyncint64.InstrumentProvider = asyncInt64Provider{}
Expand All @@ -37,7 +37,7 @@ var _ asyncint64.InstrumentProvider = asyncInt64Provider{}
func (p asyncInt64Provider) Counter(name string, opts ...instrument.Option) (asyncint64.Counter, error) {
cfg := instrument.NewConfig(opts...)

aggs, err := createAggregators[int64](p.registry, view.Instrument{
aggs, err := p.resolve.Aggregators(view.Instrument{
Scope: p.scope,
Name: name,
Description: cfg.Description(),
Expand All @@ -56,7 +56,7 @@ func (p asyncInt64Provider) Counter(name string, opts ...instrument.Option) (asy
func (p asyncInt64Provider) UpDownCounter(name string, opts ...instrument.Option) (asyncint64.UpDownCounter, error) {
cfg := instrument.NewConfig(opts...)

aggs, err := createAggregators[int64](p.registry, view.Instrument{
aggs, err := p.resolve.Aggregators(view.Instrument{
Scope: p.scope,
Name: name,
Description: cfg.Description(),
Expand All @@ -74,7 +74,7 @@ func (p asyncInt64Provider) UpDownCounter(name string, opts ...instrument.Option
func (p asyncInt64Provider) Gauge(name string, opts ...instrument.Option) (asyncint64.Gauge, error) {
cfg := instrument.NewConfig(opts...)

aggs, err := createAggregators[int64](p.registry, view.Instrument{
aggs, err := p.resolve.Aggregators(view.Instrument{
Scope: p.scope,
Name: name,
Description: cfg.Description(),
Expand All @@ -89,8 +89,8 @@ func (p asyncInt64Provider) Gauge(name string, opts ...instrument.Option) (async
}

type asyncFloat64Provider struct {
scope instrumentation.Scope
registry *pipelineRegistry
scope instrumentation.Scope
resolve *resolver[float64]
}

var _ asyncfloat64.InstrumentProvider = asyncFloat64Provider{}
Expand All @@ -99,7 +99,7 @@ var _ asyncfloat64.InstrumentProvider = asyncFloat64Provider{}
func (p asyncFloat64Provider) Counter(name string, opts ...instrument.Option) (asyncfloat64.Counter, error) {
cfg := instrument.NewConfig(opts...)

aggs, err := createAggregators[float64](p.registry, view.Instrument{
aggs, err := p.resolve.Aggregators(view.Instrument{
Scope: p.scope,
Name: name,
Description: cfg.Description(),
Expand All @@ -117,7 +117,7 @@ func (p asyncFloat64Provider) Counter(name string, opts ...instrument.Option) (a
func (p asyncFloat64Provider) UpDownCounter(name string, opts ...instrument.Option) (asyncfloat64.UpDownCounter, error) {
cfg := instrument.NewConfig(opts...)

aggs, err := createAggregators[float64](p.registry, view.Instrument{
aggs, err := p.resolve.Aggregators(view.Instrument{
Scope: p.scope,
Name: name,
Description: cfg.Description(),
Expand All @@ -135,7 +135,7 @@ func (p asyncFloat64Provider) UpDownCounter(name string, opts ...instrument.Opti
func (p asyncFloat64Provider) Gauge(name string, opts ...instrument.Option) (asyncfloat64.Gauge, error) {
cfg := instrument.NewConfig(opts...)

aggs, err := createAggregators[float64](p.registry, view.Instrument{
aggs, err := p.resolve.Aggregators(view.Instrument{
Scope: p.scope,
Name: name,
Description: cfg.Description(),
Expand All @@ -150,8 +150,8 @@ func (p asyncFloat64Provider) Gauge(name string, opts ...instrument.Option) (asy
}

type syncInt64Provider struct {
scope instrumentation.Scope
registry *pipelineRegistry
scope instrumentation.Scope
resolve *resolver[int64]
}

var _ syncint64.InstrumentProvider = syncInt64Provider{}
Expand All @@ -160,7 +160,7 @@ var _ syncint64.InstrumentProvider = syncInt64Provider{}
func (p syncInt64Provider) Counter(name string, opts ...instrument.Option) (syncint64.Counter, error) {
cfg := instrument.NewConfig(opts...)

aggs, err := createAggregators[int64](p.registry, view.Instrument{
aggs, err := p.resolve.Aggregators(view.Instrument{
Scope: p.scope,
Name: name,
Description: cfg.Description(),
Expand All @@ -178,7 +178,7 @@ func (p syncInt64Provider) Counter(name string, opts ...instrument.Option) (sync
func (p syncInt64Provider) UpDownCounter(name string, opts ...instrument.Option) (syncint64.UpDownCounter, error) {
cfg := instrument.NewConfig(opts...)

aggs, err := createAggregators[int64](p.registry, view.Instrument{
aggs, err := p.resolve.Aggregators(view.Instrument{
Scope: p.scope,
Name: name,
Description: cfg.Description(),
Expand All @@ -196,7 +196,7 @@ func (p syncInt64Provider) UpDownCounter(name string, opts ...instrument.Option)
func (p syncInt64Provider) Histogram(name string, opts ...instrument.Option) (syncint64.Histogram, error) {
cfg := instrument.NewConfig(opts...)

aggs, err := createAggregators[int64](p.registry, view.Instrument{
aggs, err := p.resolve.Aggregators(view.Instrument{
Scope: p.scope,
Name: name,
Description: cfg.Description(),
Expand All @@ -211,8 +211,8 @@ func (p syncInt64Provider) Histogram(name string, opts ...instrument.Option) (sy
}

type syncFloat64Provider struct {
scope instrumentation.Scope
registry *pipelineRegistry
scope instrumentation.Scope
resolve *resolver[float64]
}

var _ syncfloat64.InstrumentProvider = syncFloat64Provider{}
Expand All @@ -221,7 +221,7 @@ var _ syncfloat64.InstrumentProvider = syncFloat64Provider{}
func (p syncFloat64Provider) Counter(name string, opts ...instrument.Option) (syncfloat64.Counter, error) {
cfg := instrument.NewConfig(opts...)

aggs, err := createAggregators[float64](p.registry, view.Instrument{
aggs, err := p.resolve.Aggregators(view.Instrument{
Scope: p.scope,
Name: name,
Description: cfg.Description(),
Expand All @@ -239,7 +239,7 @@ func (p syncFloat64Provider) Counter(name string, opts ...instrument.Option) (sy
func (p syncFloat64Provider) UpDownCounter(name string, opts ...instrument.Option) (syncfloat64.UpDownCounter, error) {
cfg := instrument.NewConfig(opts...)

aggs, err := createAggregators[float64](p.registry, view.Instrument{
aggs, err := p.resolve.Aggregators(view.Instrument{
Scope: p.scope,
Name: name,
Description: cfg.Description(),
Expand All @@ -257,7 +257,7 @@ func (p syncFloat64Provider) UpDownCounter(name string, opts ...instrument.Optio
func (p syncFloat64Provider) Histogram(name string, opts ...instrument.Option) (syncfloat64.Histogram, error) {
cfg := instrument.NewConfig(opts...)

aggs, err := createAggregators[float64](p.registry, view.Instrument{
aggs, err := p.resolve.Aggregators(view.Instrument{
Scope: p.scope,
Name: name,
Description: cfg.Description(),
Expand Down
22 changes: 11 additions & 11 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type meterRegistry struct {

meters map[instrumentation.Scope]*meter

registry *pipelineRegistry
pipes pipelines
}

// Get returns a registered meter matching the instrumentation scope if it
Expand All @@ -56,8 +56,8 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter {

if r.meters == nil {
m := &meter{
Scope: s,
registry: r.registry,
Scope: s,
pipes: r.pipes,
}
r.meters = map[instrumentation.Scope]*meter{s: m}
return m
Expand All @@ -69,8 +69,8 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter {
}

m = &meter{
Scope: s,
registry: r.registry,
Scope: s,
pipes: r.pipes,
}
r.meters[s] = m
return m
Expand All @@ -83,35 +83,35 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter {
type meter struct {
instrumentation.Scope

registry *pipelineRegistry
pipes pipelines
}

// Compile-time check meter implements metric.Meter.
var _ metric.Meter = (*meter)(nil)

// AsyncInt64 returns the asynchronous integer instrument provider.
func (m *meter) AsyncInt64() asyncint64.InstrumentProvider {
return asyncInt64Provider{scope: m.Scope, registry: m.registry}
return asyncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)}
}

// AsyncFloat64 returns the asynchronous floating-point instrument provider.
func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider {
return asyncFloat64Provider{scope: m.Scope, registry: m.registry}
return asyncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)}
}

// RegisterCallback registers the function f to be called when any of the
// insts Collect method is called.
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) error {
m.registry.registerCallback(f)
m.pipes.registerCallback(f)
return nil
}

// SyncInt64 returns the synchronous integer instrument provider.
func (m *meter) SyncInt64() syncint64.InstrumentProvider {
return syncInt64Provider{scope: m.Scope, registry: m.registry}
return syncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)}
}

// SyncFloat64 returns the synchronous floating-point instrument provider.
func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider {
return syncFloat64Provider{scope: m.Scope, registry: m.registry}
return syncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)}
}
Loading

0 comments on commit aca054b

Please sign in to comment.