Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sampling config for instrumentation #982

Merged
merged 19 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ OpenTelemetry Go Automatic Instrumentation adheres to [Semantic Versioning](http

## [Unreleased]

### Added

- `Sampler` interface that can be passed to `Instrumentation` via the new `WithSampler` option.
This configuration allows customization of what sampler is used by the `Instrumentation`. ([#982](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/982))
- The `OTEL_TRACES_SAMPLER` and `OTEL_TRACES_SAMPLER_ARG` environment variables are now supported when the `WithEnv` option is used. ([#982](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/982))

### Changed

- The `WithSampler` option function now accepts the new `Sampler` interface instead of `trace.Sampler`. ([#982](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/982))

## [v0.14.0-alpha] - 2024-07-15

### Added
Expand Down
40 changes: 32 additions & 8 deletions instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"

"go.opentelemetry.io/auto/internal/pkg/instrumentation"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/probe/sampling"
"go.opentelemetry.io/auto/internal/pkg/opentelemetry"
"go.opentelemetry.io/auto/internal/pkg/process"
)
Expand Down Expand Up @@ -125,7 +126,7 @@ func NewInstrumentation(ctx context.Context, opts ...InstrumentationOption) (*In
return nil, err
}

mngr, err := instrumentation.NewManager(logger, ctrl, c.globalImpl, c.loadIndicator)
mngr, err := instrumentation.NewManager(logger, ctrl, c.globalImpl, c.loadIndicator, c.samplingConfig)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -173,14 +174,14 @@ type InstrumentationOption interface {
}

type instConfig struct {
sampler trace.Sampler
traceExp trace.SpanExporter
target process.TargetArgs
serviceName string
additionalResAttrs []attribute.KeyValue
globalImpl bool
loadIndicator chan struct{}
logLevel LogLevel
samplingConfig *sampling.Config
}

func newInstConfig(ctx context.Context, opts []InstrumentationOption) (instConfig, error) {
Expand All @@ -207,8 +208,8 @@ func newInstConfig(ctx context.Context, opts []InstrumentationOption) (instConfi
err = errors.Join(err, e)
}

if c.sampler == nil {
c.sampler = trace.AlwaysSample()
if c.samplingConfig == nil {
c.samplingConfig = sampling.DefaultConfig()
}

if c.logLevel == logLevelUndefined {
Expand Down Expand Up @@ -239,7 +240,9 @@ func (c instConfig) validate() error {

func (c instConfig) tracerProvider(bi *buildinfo.BuildInfo) *trace.TracerProvider {
return trace.NewTracerProvider(
trace.WithSampler(c.sampler),
// the actual sampling is done in the eBPF probes.
// this is just to make sure that we export all spans we get from the probes
trace.WithSampler(trace.AlwaysSample()),
trace.WithResource(c.res(bi)),
trace.WithBatcher(c.traceExp),
trace.WithIDGenerator(opentelemetry.NewEBPFSourceIDGenerator()),
Expand Down Expand Up @@ -355,9 +358,11 @@ var lookupEnv = os.LookupEnv
// - OTEL_TRACES_EXPORTER: sets the trace exporter
// - OTEL_GO_AUTO_GLOBAL: enables the OpenTelemetry global implementation
// - OTEL_LOG_LEVEL: sets the log level
// - OTEL_TRACES_SAMPLER: sets the trace sampler
// - OTEL_TRACES_SAMPLER_ARG: optionally sets the trace sampler argument
//
// This option may conflict with [WithTarget], [WithPID], [WithTraceExporter],
// [WithServiceName], [WithGlobal] and [WithLogLevel] if their respective environment variable is defined.
// [WithServiceName], [WithGlobal], [WithLogLevel] and [WithSampler] if their respective environment variable is defined.
// If more than one of these options are used, the last one provided to an
// [Instrumentation] will be used.
//
Expand Down Expand Up @@ -401,6 +406,17 @@ func WithEnv() InstrumentationOption {

err = errors.Join(err, e)
}
if s, e := newSamplerFromEnv(); e != nil {
err = errors.Join(err, e)
} else if s != nil {
e := s.validate()
if e == nil {
if cfg, e := s.convert(); e == nil {
c.samplingConfig = cfg
}
RonFed marked this conversation as resolved.
Show resolved Hide resolved
}
err = errors.Join(err, e)
}
return c, err
})
}
Expand Down Expand Up @@ -454,9 +470,17 @@ func WithTraceExporter(exp trace.SpanExporter) InstrumentationOption {

// WithSampler returns an [InstrumentationOption] that will configure
// an [Instrumentation] to use the provided sampler to sample OpenTelemetry traces.
func WithSampler(sampler trace.Sampler) InstrumentationOption {
func WithSampler(sampler Sampler) InstrumentationOption {
return fnOpt(func(_ context.Context, c instConfig) (instConfig, error) {
c.sampler = sampler
err := sampler.validate()
if err != nil {
return c, err
}
cfg, err := sampler.convert()
if err != nil {
return c, err
}
c.samplingConfig = cfg
return c, nil
})
}
Expand Down
83 changes: 83 additions & 0 deletions instrumentation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"

"go.opentelemetry.io/auto/internal/pkg/instrumentation/probe/sampling"
)

func TestWithServiceName(t *testing.T) {
Expand Down Expand Up @@ -198,6 +200,87 @@ func TestWithLogLevel(t *testing.T) {
})
}

func TestWithSampler(t *testing.T) {
t.Run("Default sampler", func(t *testing.T) {
c, err := newInstConfig(context.Background(), []InstrumentationOption{})
require.NoError(t, err)
sc := c.samplingConfig
assert.Equal(t, sc.Samplers, sampling.DefaultConfig().Samplers)
assert.Equal(t, sc.ActiveSampler, sampling.ParentBasedID)
conf, ok := sc.Samplers[sampling.ParentBasedID]
assert.True(t, ok)
assert.Equal(t, conf.SamplerType, sampling.SamplerParentBased)
pbConfig, ok := conf.Config.(sampling.ParentBasedConfig)
assert.True(t, ok)
assert.Equal(t, pbConfig, sampling.DefaultParentBasedSampler())
})

t.Run("Env config", func(t *testing.T) {
mockEnv(t, map[string]string{
tracesSamplerKey: samplerNameParentBasedTraceIDRatio,
tracesSamplerArgKey: "0.42",
})

c, err := newInstConfig(context.Background(), []InstrumentationOption{WithEnv()})
require.NoError(t, err)
sc := c.samplingConfig
assert.Equal(t, sc.ActiveSampler, sampling.ParentBasedID)
parentBasedConfig, ok := sc.Samplers[sampling.ParentBasedID]
assert.True(t, ok)
assert.Equal(t, parentBasedConfig.SamplerType, sampling.SamplerParentBased)
pbConfig, ok := parentBasedConfig.Config.(sampling.ParentBasedConfig)
assert.True(t, ok)
assert.Equal(t, pbConfig.Root, sampling.TraceIDRatioID)
tidRatio, ok := sc.Samplers[sampling.TraceIDRatioID]
assert.True(t, ok)
assert.Equal(t, tidRatio.SamplerType, sampling.SamplerTraceIDRatio)
config, ok := tidRatio.Config.(sampling.TraceIDRatioConfig)
assert.True(t, ok)
expected, _ := sampling.NewTraceIDRatioConfig(0.42)
assert.Equal(t, expected, config)
})

t.Run("Invalid Env config", func(t *testing.T) {
mockEnv(t, map[string]string{
tracesSamplerKey: "invalid",
tracesSamplerArgKey: "0.42",
})

_, err := newInstConfig(context.Background(), []InstrumentationOption{WithEnv()})
require.Error(t, err)
require.Contains(t, err.Error(), "unknown sampler name")
})

t.Run("WithSampler", func(t *testing.T) {
c, err := newInstConfig(context.Background(), []InstrumentationOption{
WithSampler(ParentBased{
Root: TraceIDRatio{Fraction: 0.42},
}),
})
require.NoError(t, err)
sc := c.samplingConfig
assert.Equal(t, sc.ActiveSampler, sampling.ParentBasedID)
parentBasedConfig, ok := sc.Samplers[sampling.ParentBasedID]
assert.True(t, ok)
assert.Equal(t, parentBasedConfig.SamplerType, sampling.SamplerParentBased)
pbConfig, ok := parentBasedConfig.Config.(sampling.ParentBasedConfig)
assert.True(t, ok)
assert.Equal(t, pbConfig.Root, sampling.TraceIDRatioID)
assert.Equal(t, pbConfig.RemoteSampled, sampling.AlwaysOnID)
assert.Equal(t, pbConfig.RemoteNotSampled, sampling.AlwaysOffID)
assert.Equal(t, pbConfig.LocalSampled, sampling.AlwaysOnID)
assert.Equal(t, pbConfig.LocalNotSampled, sampling.AlwaysOffID)

tidRatio, ok := sc.Samplers[sampling.TraceIDRatioID]
assert.True(t, ok)
assert.Equal(t, tidRatio.SamplerType, sampling.SamplerTraceIDRatio)
config, ok := tidRatio.Config.(sampling.TraceIDRatioConfig)
assert.True(t, ok)
expected, _ := sampling.NewTraceIDRatioConfig(0.42)
assert.Equal(t, expected, config)
})
}

func mockEnv(t *testing.T, env map[string]string) {
orig := lookupEnv
t.Cleanup(func() { lookupEnv = orig })
Expand Down
7 changes: 5 additions & 2 deletions internal/pkg/instrumentation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
httpServer "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/net/http/server"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/bpffs"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/probe"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/probe/sampling"
"go.opentelemetry.io/auto/internal/pkg/opentelemetry"
"go.opentelemetry.io/auto/internal/pkg/process"
)
Expand All @@ -38,10 +39,11 @@ type Manager struct {
wg sync.WaitGroup
closingErrors chan error
loadedIndicator chan struct{}
sc *sampling.Config
}

// NewManager returns a new [Manager].
func NewManager(logger logr.Logger, otelController *opentelemetry.Controller, globalImpl bool, loadIndicator chan struct{}) (*Manager, error) {
func NewManager(logger logr.Logger, otelController *opentelemetry.Controller, globalImpl bool, loadIndicator chan struct{}, samplingConfig *sampling.Config) (*Manager, error) {
logger = logger.WithName("Manager")
m := &Manager{
logger: logger,
Expand All @@ -52,6 +54,7 @@ func NewManager(logger logr.Logger, otelController *opentelemetry.Controller, gl
globalImpl: globalImpl,
closingErrors: make(chan error, 1),
loadedIndicator: loadIndicator,
sc: samplingConfig,
}

err := m.registerProbes()
Expand Down Expand Up @@ -195,7 +198,7 @@ func (m *Manager) load(target *process.TargetDetails) error {
// Load probes
for name, i := range m.probes {
m.logger.V(0).Info("loading probe", "name", name)
err := i.Load(exe, target)
err := i.Load(exe, target, m.sc)
if err != nil {
m.logger.Error(err, "error while loading probes, cleaning up", "name", name)
return errors.Join(err, m.cleanup(target))
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/instrumentation/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/assert"

"go.opentelemetry.io/auto/internal/pkg/instrumentation/probe"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/probe/sampling"
"go.opentelemetry.io/auto/internal/pkg/process"
"go.opentelemetry.io/auto/internal/pkg/process/binary"
)
Expand Down Expand Up @@ -180,7 +181,7 @@ func fakeManager(t *testing.T) *Manager {
logger := stdr.New(log.New(os.Stderr, "", log.LstdFlags))
logger = logger.WithName("Instrumentation")

m, err := NewManager(logger, nil, true, nil)
m, err := NewManager(logger, nil, true, nil, &sampling.Config{})
assert.NoError(t, err)
assert.NotNil(t, m)

Expand Down
10 changes: 8 additions & 2 deletions internal/pkg/instrumentation/probe/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"go.opentelemetry.io/auto/internal/pkg/inject"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/bpffs"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/probe/sampling"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/utils"
"go.opentelemetry.io/auto/internal/pkg/process"
"go.opentelemetry.io/auto/internal/pkg/structfield"
Expand All @@ -33,7 +34,7 @@ type Probe interface {
Manifest() Manifest

// Load loads all instrumentation offsets.
Load(*link.Executable, *process.TargetDetails) error
Load(*link.Executable, *process.TargetDetails, *sampling.Config) error

// Run runs the events processing loop.
Run(eventsChan chan<- *Event)
Expand Down Expand Up @@ -93,7 +94,7 @@ func (i *Base[BPFObj, BPFEvent]) Manifest() Manifest {
}

// Load loads all instrumentation offsets.
func (i *Base[BPFObj, BPFEvent]) Load(exec *link.Executable, td *process.TargetDetails) error {
func (i *Base[BPFObj, BPFEvent]) Load(exec *link.Executable, td *process.TargetDetails, sc *sampling.Config) error {
spec, err := i.SpecFn()
if err != nil {
return err
Expand All @@ -118,6 +119,11 @@ func (i *Base[BPFObj, BPFEvent]) Load(exec *link.Executable, td *process.TargetD
if err != nil {
return err
}

// TODO: Initialize sampling manager based on the sampling configuration and the eBPF collection.
// The manager will be responsible for writing to eBPF maps - configuring the sampling.
// In addition the sampling manager will be responsible for handling updates for the configuration.

i.closers = append(i.closers, i.reader)

return nil
Expand Down
Loading