Skip to content

Commit

Permalink
pare back
Browse files Browse the repository at this point in the history
  • Loading branch information
Joshua MacDonald committed Feb 15, 2023
1 parent ee21be0 commit 02ef200
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 149 deletions.
51 changes: 1 addition & 50 deletions lightstep/sdk/metric/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@ import (
"github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/number"
"github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/sdkinstrument"
"go.opentelemetry.io/otel"
"go.uber.org/multierr"
)

// Sentinel errors for Aggregator interface.
var (
ErrNegativeInput = fmt.Errorf("negative value is out of range for this instrument")
ErrNaNInput = fmt.Errorf("NaN value is an invalid input")
ErrInfInput = fmt.Errorf("±Inf value is an invalid input")
ErrInvalidLimit = fmt.Errorf("limit is invalid")
)

// RangeTest is a common routine for testing for valid input values.
Expand Down Expand Up @@ -90,7 +88,6 @@ func (jc JSONConfig) ToConfig() Config {
// Config supports the configuration for all aggregators in a single struct.
type Config struct {
Histogram histostruct.Config
Limits LimitsConfig
}

// Valid returns true for valid configurations.
Expand All @@ -102,57 +99,11 @@ func (c Config) Valid() bool {
// Valid returns a valid Configuration along with an error if there
// were invalid settings. Note that the empty state is considered valid and a correct
func (c Config) Validate() (Config, error) {
var ret error
var err error
if c.Histogram, err = c.Histogram.Validate(); err != nil {
ret = multierr.Append(ret, err)
}

if c.Limits, err = c.Limits.Validate(); err != nil {
ret = multierr.Append(ret, err)
}
c.Histogram, err = c.Histogram.Validate()
return c, err
}

// LimitsConfig specifies configurable instrument limits.
type LimitsConfig struct {
// maxTimeseries is the maximum number of timeseries that can
// be written before triggering a builtin circuit breaker.
// When this number of concurrent attribute sets is reached
// the SDK will ...
MaxTimeseries uint
}

// DefaultLimits is the specified default limit for LimitsConfig.MaxTimeseries.
const DefaultMaxTimeseries = 2000

// Validate fills in defaults and returns whether the limits are invalid.
func (c LimitsConfig) Validate() (LimitsConfig, error) {
if c.MaxTimeseries == 0 {
c.MaxTimeseries = DefaultMaxTimeseries
}
if c.MaxTimeseries < 2 {
return c, fmt.Errorf("%w: %d", ErrInvalidLimit, c.MaxTimeseries)
}
return c, nil
}

// Combine returns the smaller limits. Since limits are applied to
// Instruments, not Views, the smaller of the limits takes effect
// and overrides larger limits requested by different views.
func (c LimitsConfig) Combine(o LimitsConfig) LimitsConfig {
if c.MaxTimeseries == 0 {
return o
}
if o.MaxTimeseries == 0 {
return c
}
if c.MaxTimeseries < o.MaxTimeseries {
return c
}
return o
}

// Methods implements a specific aggregation behavior for a specific
// type of aggregator Storage. Methods are parameterized by the type
// of the number (int64, float64), the Storage (generally a `Storage`
Expand Down
105 changes: 60 additions & 45 deletions lightstep/sdk/metric/internal/syncstate/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ type Observer struct {
// for synchronous aggregation.
compiled viewstate.Instrument

// maxTimeseries is copied to avoid several repeated function
// calls.
maxTimeseries uint

// lock protects current.
lock sync.Mutex

Expand All @@ -84,20 +80,19 @@ func New(desc sdkinstrument.Descriptor, _ interface{}, compiled pipeline.Registe
// When no readers enable the instrument, no need for an instrument.
return nil
}
// Note that viewstate.Combine is used to eliminate
// the per-pipeline distinction that is useful in the
// asyncstate package. Here, in the common case there
// will be one pipeline and one view, such that
// viewstate.Combine produces a single concrete
// viewstate.Instrument. Only when there are multiple
// views or multiple pipelines will the combination
// produce a viewstate.multiInstrument here.
inst := viewstate.Combine(desc, nonnil...)
return &Observer{
descriptor: desc,
current: map[uint64]*record{},
compiled: inst,
maxTimeseries: inst.Limits().MaxTimeseries,
descriptor: desc,
current: map[uint64]*record{},

// Note that viewstate.Combine is used to eliminate
// the per-pipeline distinction that is useful in the
// asyncstate package. Here, in the common case there
// will be one pipeline and one view, such that
// viewstate.Combine produces a single concrete
// viewstate.Instrument. Only when there are multiple
// views or multiple pipelines will the combination
// produce a viewstate.multiInstrument here.
compiled: viewstate.Combine(desc, nonnil...),
}
}

Expand Down Expand Up @@ -352,43 +347,63 @@ func attributesEqual(a, b []attribute.KeyValue) bool {
return true
}

// acquireRead acquires the read lock and searches for a `*record`.
// This returns the effective attributes and fingerprint, considering
// the cardinality limit.
// acquireRead acquires the lock and searches for a `*record`.
// This returns the overflow attributes and fingerprint in case the
// the cardinality limit is reached. The caller should exchange their
// fp and attrs for the ones returned by this call.
func acquireRead(inst *Observer, fp uint64, attrs []attribute.KeyValue) (uint64, []attribute.KeyValue, *record) {
inst.lock.Lock()
defer inst.lock.Unlock()

// This loop can be taken twice.
for overflow := false; ; overflow = true {
rec := inst.current[fp]
overflow := false
fp, attrs, rec := acquireReadLocked(inst, fp, attrs, &overflow)

// Note: we could (optionally) allow collisions and not scan this list.
// The copied `attributeList` can be avoided in this case, as well.
for rec != nil && !attributesEqual(attrs, rec.attributeList) {
rec = rec.next
}
if rec != nil {
return fp, attrs, rec
}
// The overflow signal indicates another call is needed w/ the
// same logic but updated fp and attrs.
if !overflow {
// Otherwise, this is the first appearance of an overflow.
return fp, attrs, nil
}
// In which case fp and attrs are now the overflow attributes.
return acquireReadLocked(inst, fp, attrs, &overflow)
}

// Existing record case.
if rec != nil && rec.refMapped.ref() {
// At this moment it is guaranteed that the
// record is in the map and will not be removed.
return fp, attrs, rec
}
func acquireReadLocked(inst *Observer, fp uint64, attrs []attribute.KeyValue, overflow *bool) (uint64, []attribute.KeyValue, *record) {
rec := inst.current[fp]

// Check for overflow after checking for the original
// attribute set. Note this means we are performing
// two map lookups for overflowing attributes and only
// one lookup if the attribute set was preexisting.
if !overflow && uint(len(inst.current)) >= inst.maxTimeseries {
// Use the overflow attributes, repeat.
attrs = pipeline.OverflowAttributes
fp = overflowAttributesFingerprint
continue
}
// Note: we could (optionally) allow collisions and not scan this list.
// The copied `attributeList` can be avoided in this case, as well.
for rec != nil && !attributesEqual(attrs, rec.attributeList) {
rec = rec.next
}

return fp, attrs, nil
// Existing record case.
if rec != nil && rec.refMapped.ref() {
// At this moment it is guaranteed that the
// record is in the map and will not be removed.
return fp, attrs, rec
}

// Check for overflow after checking for the original
// attribute set. Note this means we are performing
// two map lookups for overflowing attributes and only
// one lookup if the attribute set was preexisting.
//
// Note! This 10000 is hard-coded until merging with
// https://github.com/lightstep/otel-launcher-go/pull/384,
// when the cardinality limit will become part of the
// performance settings.
if !*overflow && uint(len(inst.current)) >= 10000 {
// Use the overflow attributes, repeat.
attrs = pipeline.OverflowAttributes
fp = overflowAttributesFingerprint
*overflow = true
}

return fp, attrs, nil
}

// acquireRecord gets or creates a `*record` corresponding to `attrs`,
Expand Down
10 changes: 0 additions & 10 deletions lightstep/sdk/metric/internal/syncstate/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,11 +419,6 @@ func TestSyncGaugeDeltaInstrument(t *testing.T) {
deltaSelector,
view.WithClause(
view.WithKeys([]attribute.Key{"A", "C"}),
view.WithAggregatorConfig(aggregator.Config{
Limits: aggregator.LimitsConfig{
MaxTimeseries: 10000,
},
}),
),
))

Expand Down Expand Up @@ -906,10 +901,5 @@ func TestDuplicateFingerprint(t *testing.T) {
),
),
)
}

func TestOverflowBehavior(t *testing.T) {
const defLimit = aggregator.DefaultMaxTimeseries

// TODO
}
5 changes: 0 additions & 5 deletions lightstep/sdk/metric/internal/viewstate/base_instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ type instrumentBase[N number.Any, Storage, Auxiliary any, Methods aggregator.Met
keysFilter *attribute.Filter
}

// Limits reports configured limits for the instrument.
func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) Limits() aggregator.LimitsConfig {
return metric.acfg.Limits
}

// Size reports the size of the data map.
func (metric *instrumentBase[N, Storage, Auxiliary, Methods]) Size() int {
metric.instLock.Lock()
Expand Down
41 changes: 10 additions & 31 deletions lightstep/sdk/metric/internal/viewstate/viewstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,33 +35,27 @@ import (
"go.opentelemetry.io/otel/sdk/instrumentation"
)

var zeroValidConfig = func() aggregator.Config {
var zero aggregator.Config
zero, _ = zero.Validate()
return zero
}()

// Compiler implements Views for a single Meter. A single Compiler
// controls the namespace of metric instruments output and reports
// conflicting definitions for the same name.
//
// Information flows through the Compiler as follows:
//
// When new instruments are created:
// - The Compiler.Compile() method returns an Instrument value and possible
// duplicate or semantic conflict error.
// - The Compiler.Compile() method returns an Instrument value and possible
// duplicate or semantic conflict error.
//
// When instruments are used:
// - The Instrument.NewAccumulator() method returns an Accumulator value for each attribute.Set used
// - The Accumulator.Update() aggregates one value for each measurement.
//
// During collection:
// - The Accumulator.SnapshotAndProcess() method captures the current value
// and conveys it to the output storage
// - The Compiler.Collectors() interface returns one Collector per output
// Metric in the Meter (duplicate definitions included).
// - The Collector.Collect() method outputs one Point for each attribute.Set
// in the result.
// - The Accumulator.SnapshotAndProcess() method captures the current value
// and conveys it to the output storage
// - The Compiler.Collectors() interface returns one Collector per output
// Metric in the Meter (duplicate definitions included).
// - The Collector.Collect() method outputs one Point for each attribute.Set
// in the result.
type Compiler struct {
// views is the configuration of this compiler.
views *view.Views
Expand Down Expand Up @@ -96,9 +90,6 @@ type Instrument interface {
// called since the last collection and to ensure that each
// of them has SnapshotAndProcess() called.
NewAccumulator(kvs attribute.Set) Accumulator

// Limits indicates the maximum allowed cardinality.
Limits() aggregator.LimitsConfig
}

// Updater captures single measurements, for N an int64 or float64.
Expand Down Expand Up @@ -186,10 +177,6 @@ type singleBehavior struct {

// New returns a compiler for library given configured views.
func New(library instrumentation.Library, views *view.Views) *Compiler {
views, err := view.Validate(views)
if err != nil {
otel.Handle(err)
}
return &Compiler{
library: library,
views: views,
Expand Down Expand Up @@ -581,14 +568,6 @@ func (mi multiInstrument[N]) NewAccumulator(kvs attribute.Set) Accumulator {
return multiAccumulator[N](accs)
}

func (mi multiInstrument[N]) Limits() aggregator.LimitsConfig {
var empty aggregator.LimitsConfig
for _, item := range mi {
empty = empty.Combine(item.Limits())
}
return mi[0].Limits()
}

// Uses a int(0)-value attribute to identify distinct key sets.
func keysToSet(keys []attribute.Key) *attribute.Set {
attrs := make([]attribute.KeyValue, len(keys))
Expand Down Expand Up @@ -625,9 +604,9 @@ func equalConfigs(a, b aggregator.Config) bool {
}

// pickAggConfig returns the aggregator configuration prescribed by a view clause
// if it is not equal to the default validated value, otherwise use the default value.
// if it is not empty, otherwise the default value.
func pickAggConfig(def, vcfg aggregator.Config) aggregator.Config {
if vcfg != zeroValidConfig {
if vcfg != (aggregator.Config{}) {
return vcfg
}
return def
Expand Down
9 changes: 1 addition & 8 deletions lightstep/sdk/metric/view/views_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,7 @@ func TestClauseProperties(t *testing.T) {
require.Equal(t, []attribute.Key(nil), views.Clauses[2].Keys())
require.Equal(t, []attribute.Key{}, views.Clauses[3].Keys())
require.Equal(t, aggregation.DropKind, views.Clauses[4].Aggregation())
require.Equal(t, aggregator.Config{
Histogram: histogram.NewConfig(histogram.WithMaxSize(177)),
Limits: aggregator.LimitsConfig{
MaxTimeseries: aggregator.DefaultMaxTimeseries,
},
},
views.Clauses[5].AggregatorConfig(),
)
require.Equal(t, aggregator.Config{Histogram: histogram.NewConfig(histogram.WithMaxSize(177))}, views.Clauses[5].AggregatorConfig())
}

func TestNameAndRegexp(t *testing.T) {
Expand Down

0 comments on commit 02ef200

Please sign in to comment.