Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cardinality limits to the Lightstep metrics SDK #385

Merged
merged 23 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## Unreleased

## [1.13.4](https://github.com/lightstep/otel-launcher-go/releases/tag/v1.13.3) - 2023-03-01)
- Support cardinality limits. Synchronous instruments support an
instrument-level cardinality limit; Synchronous and Asynchronous
aggregators support per-view cardinality limits. Performance settings
determine the view-configured defaults. [#385](https://github.com/lightstep/otel-launcher-go/pull/385)

## [1.13.4](https://github.com/lightstep/otel-launcher-go/releases/tag/v1.13.3) - 2023-03-02)

- Minor performance improvement, one less allocation under the lock
when fingerprint collisions are being checked. [#407](https://github.com/lightstep/otel-launcher-go/pull/407)
Expand Down
24 changes: 24 additions & 0 deletions lightstep/sdk/metric/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,27 @@ Setting this field to 1 means records will be removed from memory
after one inactive collection cycle.

Setting this field to 0 causes the default value 10 to be used.

#### InstrumentCardinalityLimit

Synchronous instruments are implemented using a map of intermediate
state. When this map grows to `InstrumentCardinalityLimit`, new
attribute sets will be replaced by the overflow attribute set, which
is `{ otel.metric.overflow=true }`. This limit is applied to all
instruments regardless of view configuration before attribute filters
are applied.

For instruments configured with Delta temporality, where it is
possible for the map to shrink, note that the size of this map
includes records maintained due to `InactiveCollectionPeriods`. The
inactivity period should be taken into account when setting
`InstrumentCardinalityLimit` to avoid overflow.

#### AggregatorCardinalityLimit

All views maintain a configurable cardinality limit, calculated after
attribute filters are applied.

When the aggregator's output grows to `AggregatorCardinalityLimit`,
new attribute sets will be replaced by the overflow attribute set,
which is `{ otel.metric.overflow=true }`.
20 changes: 12 additions & 8 deletions lightstep/sdk/metric/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,18 @@ type JSONHistogramConfig struct {

// JSONConfig supports the configuration for all aggregators in a single struct.
type JSONConfig struct {
Histogram JSONHistogramConfig `json:"histogram"`
}

// ToConfig returns a Config from the fixed-JSON represented.
func (jc JSONConfig) ToConfig() Config {
return Config{
Histogram: histostruct.NewConfig(histostruct.WithMaxSize(jc.Histogram.MaxSize)),
}
Histogram JSONHistogramConfig `json:"histogram"`
CardinalityLimit uint32 `json:"cardinality_limit"`
}

// Config supports the configuration for all aggregators in a single struct.
type Config struct {
// Histogram configuration, specifically.
Histogram histostruct.Config

// CardinalityLimit limits the number of instances of this
// aggregator in a given view.
CardinalityLimit uint32
}

// Valid returns true for valid configurations.
Expand All @@ -101,6 +100,11 @@ func (c Config) Valid() bool {
func (c Config) Validate() (Config, error) {
var err error
c.Histogram, err = c.Histogram.Validate()

if c.CardinalityLimit == 0 {
c.CardinalityLimit = sdkinstrument.DefaultAggregatorCardinalityLimit
}

return c, err
}

Expand Down
13 changes: 4 additions & 9 deletions lightstep/sdk/metric/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package metric // import "github.com/lightstep/otel-launcher-go/lightstep/sdk/me
import (
"github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/sdkinstrument"
"github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/view"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/resource"
)

Expand All @@ -30,9 +29,9 @@ type config struct {
// the i'th reader uses the i'th entry in views.
readers []Reader

// views is a slice of *Views instances corresponding with readers.
// the i'th views applies to the i'th reader.
views []*view.Views
// vopts is a slice of []view.Option corresponding with readers.
// the i'th view option list applies to the i'th reader.
vopts [][]view.Option

// performance settings
performance sdkinstrument.Performance
Expand Down Expand Up @@ -63,12 +62,8 @@ func WithResource(res *resource.Resource) Option {
// a new MeterProvider
func WithReader(r Reader, opts ...view.Option) Option {
return optionFunction(func(cfg config) config {
v, err := view.Validate(view.New(r.String(), opts...))
if err != nil {
otel.Handle(err)
}
cfg.readers = append(cfg.readers, r)
cfg.views = append(cfg.views, v)
cfg.vopts = append(cfg.vopts, opts)
return cfg
})
}
Expand Down
7 changes: 4 additions & 3 deletions lightstep/sdk/metric/internal/asyncstate/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,10 @@ func New(desc sdkinstrument.Descriptor, _ sdkinstrument.Performance, opaque inte
// disabled the instrument. This ensures that certain error
// checks still work (wrong meter, wrong callback, etc).
//
// Note: performance settings are not used because async
// instruments do not use fingerprinting so IgnoreCollisions is
// meaningless.
// Note: performance settings are not used.
// 1. There is no fingerprinting, so IgnoreCollisions is meaningless.
// 2. InstrumentCardinalityLimit is not enforceable, because of duplicate
// suppression -- better left to the aggregator.
return &Observer{
opaque: opaque,
descriptor: desc,
Expand Down
8 changes: 4 additions & 4 deletions lightstep/sdk/metric/internal/asyncstate/async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,17 @@ func (tsdk *testSDK) compile(desc sdkinstrument.Descriptor) pipeline.Register[vi
func testAsync(name string, opts ...view.Option) *testSDK {
return &testSDK{
compilers: []*viewstate.Compiler{
viewstate.New(testLibrary, view.New(name, opts...)),
viewstate.New(testLibrary, view.New(name, opts...)),
viewstate.New(testLibrary, view.New(name, ignorePerf, opts...)),
viewstate.New(testLibrary, view.New(name, ignorePerf, opts...)),
},
}
}

func testAsync2(name string, opts1, opts2 []view.Option) *testSDK {
return &testSDK{
compilers: []*viewstate.Compiler{
viewstate.New(testLibrary, view.New(name, opts1...)),
viewstate.New(testLibrary, view.New(name, opts2...)),
viewstate.New(testLibrary, view.New(name, ignorePerf, opts1...)),
viewstate.New(testLibrary, view.New(name, ignorePerf, opts2...)),
},
}
}
Expand Down
26 changes: 26 additions & 0 deletions lightstep/sdk/metric/internal/pipeline/overflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pipeline // import "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/internal/pipeline"

import "go.opentelemetry.io/otel/attribute"

// OverflowAttributes is the specified list of attributes to use when
// configured mechanisms overflow a cardinality limit.
var OverflowAttributes = []attribute.KeyValue{
attribute.Bool("otel.metric.overflow", true),
}

// OverflowAttributeSet is the set corresponding with OverflowAttributes.
var OverflowAttributeSet = attribute.NewSet(OverflowAttributes...)
48 changes: 42 additions & 6 deletions lightstep/sdk/metric/internal/syncstate/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"go.opentelemetry.io/otel/metric/instrument"
)

var overflowAttributesFingerprint = fingerprintAttributes(pipeline.OverflowAttributes)

// Instrument maintains a mapping from attribute.Set to an internal
// record type for a single API-level instrument. This type is
// organized so that a single attribute.Set lookup is performed
Expand Down Expand Up @@ -129,7 +131,7 @@ func (inst *Observer) SnapshotAndProcess() {
}
}

// When records are kept, delete the map entry.
// When no records are kept, delete the map entry.
if head == nil {
delete(inst.current, key)
continue
Expand Down Expand Up @@ -429,11 +431,31 @@ func attributesEqual(a, b []attribute.KeyValue) bool {
return true
}

// acquireRead acquires the read lock and searches for a `*record`.
func acquireRead(inst *Observer, fp uint64, attrs []attribute.KeyValue) *record {
// acquireRead acquires the lock and searches for a `*record`.
// This returns the overflow attributes and fingerprint in case the
// the cardinality limit is reached. The caller should exchange their
// fp and attrs for the ones returned by this call.
func acquireRead(inst *Observer, fp uint64, attrs []attribute.KeyValue) (uint64, []attribute.KeyValue, *record) {
inst.lock.RLock()
defer inst.lock.RUnlock()

overflow := false
fp, attrs, rec := acquireReadLocked(inst, fp, attrs, &overflow)

if rec != nil {
return fp, attrs, rec
}
// The overflow signal indicates another call is needed w/ the
// same logic but updated fp and attrs.
if !overflow {
// Otherwise, this is the first appearance of an overflow.
return fp, attrs, nil
}
// In which case fp and attrs are now the overflow attributes.
return acquireReadLocked(inst, fp, attrs, &overflow)
}

func acquireReadLocked(inst *Observer, fp uint64, attrs []attribute.KeyValue, overflow *bool) (uint64, []attribute.KeyValue, *record) {
rec := inst.current[fp]

// Potentially test for hash collisions.
Expand All @@ -447,10 +469,21 @@ func acquireRead(inst *Observer, fp uint64, attrs []attribute.KeyValue) *record
if rec != nil && rec.refMapped.ref() {
// At this moment it is guaranteed that the
// record is in the map and will not be removed.
return rec
return fp, attrs, rec
}

// Check for overflow after checking for the original
// attribute set. Note this means we are performing
// two map lookups for overflowing attributes and only
// one lookup if the attribute set was preexisting.
if !*overflow && uint32(len(inst.current)) >= inst.performance.InstrumentCardinalityLimit-1 {
// Use the overflow attributes, repeat.
attrs = pipeline.OverflowAttributes
fp = overflowAttributesFingerprint
*overflow = true
}

return nil
return fp, attrs, nil
}

// acquireUninitialized gets or creates a `*record` corresponding to
Expand All @@ -459,7 +492,10 @@ func acquireRead(inst *Observer, fp uint64, attrs []attribute.KeyValue) *record
func acquireUninitialized[N number.Any](inst *Observer, attrs []attribute.KeyValue) *record {
fp := fingerprintAttributes(attrs)

if rec := acquireRead(inst, fp, attrs); rec != nil {
// acquireRead may replace fp and attrs when there is overflow.
var rec *record
fp, attrs, rec = acquireRead(inst, fp, attrs)
if rec != nil {
return rec
}

Expand Down
Loading