From 122a517d63c97d663f472f70e0b3de83a7fa218c Mon Sep 17 00:00:00 2001 From: Piotr <17101802+thampiotr@users.noreply.github.com> Date: Mon, 4 Nov 2024 17:32:51 +0000 Subject: [PATCH] Fix race condition in otelcol processors --- .../internal/lazyconsumer/lazyconsumer.go | 32 +++++++++++++++++ .../otelcol/internal/scheduler/scheduler.go | 34 +++++++++++++++++-- .../component/otelcol/processor/processor.go | 25 +++++++------- 3 files changed, 77 insertions(+), 14 deletions(-) diff --git a/internal/component/otelcol/internal/lazyconsumer/lazyconsumer.go b/internal/component/otelcol/internal/lazyconsumer/lazyconsumer.go index 3d2e653423..180b0753f5 100644 --- a/internal/component/otelcol/internal/lazyconsumer/lazyconsumer.go +++ b/internal/component/otelcol/internal/lazyconsumer/lazyconsumer.go @@ -17,6 +17,10 @@ import ( type Consumer struct { ctx context.Context + // pauseMut is used to implement Pause & Resume semantics. When a write lock is held - this consumer is paused. + // See Pause method for more info. + pauseMut sync.RWMutex + mut sync.RWMutex metricsConsumer otelconsumer.Metrics logsConsumer otelconsumer.Logs @@ -36,6 +40,13 @@ func New(ctx context.Context) *Consumer { return &Consumer{ctx: ctx} } +// NewPaused is like New, but returns a Consumer that is paused by calling Pause method. +func NewPaused(ctx context.Context) *Consumer { + c := New(ctx) + c.Pause() + return c +} + // Capabilities implements otelconsumer.baseConsumer. func (c *Consumer) Capabilities() otelconsumer.Capabilities { return otelconsumer.Capabilities{ @@ -52,6 +63,9 @@ func (c *Consumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { return c.ctx.Err() } + c.pauseMut.RLock() // wait until resumed + defer c.pauseMut.RUnlock() + c.mut.RLock() defer c.mut.RUnlock() @@ -73,6 +87,9 @@ func (c *Consumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error return c.ctx.Err() } + c.pauseMut.RLock() // wait until resumed + defer c.pauseMut.RUnlock() + c.mut.RLock() defer c.mut.RUnlock() @@ -94,6 +111,9 @@ func (c *Consumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error { return c.ctx.Err() } + c.pauseMut.RLock() // wait until resumed + defer c.pauseMut.RUnlock() + c.mut.RLock() defer c.mut.RUnlock() @@ -119,3 +139,15 @@ func (c *Consumer) SetConsumers(t otelconsumer.Traces, m otelconsumer.Metrics, l c.logsConsumer = l c.tracesConsumer = t } + +// Pause will stop the consumer until Resume is called. While paused, the calls to Consume* methods will block. +// After calling Pause once, it must not be called again until Resume is called. +func (c *Consumer) Pause() { + c.pauseMut.Lock() +} + +// Resume will revert the Pause call and the consumer will continue to work. Resume must not be called if Pause wasn't +// called before. See Pause for more details. +func (c *Consumer) Resume() { + c.pauseMut.Unlock() +} diff --git a/internal/component/otelcol/internal/scheduler/scheduler.go b/internal/component/otelcol/internal/scheduler/scheduler.go index 022c09dadf..2ee6efaf6e 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler.go +++ b/internal/component/otelcol/internal/scheduler/scheduler.go @@ -9,10 +9,11 @@ import ( "time" "github.com/go-kit/log" - "github.com/grafana/alloy/internal/component" - "github.com/grafana/alloy/internal/runtime/logging/level" otelcomponent "go.opentelemetry.io/collector/component" "go.uber.org/multierr" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/runtime/logging/level" ) // Scheduler implements manages a set of OpenTelemetry Collector components. @@ -39,6 +40,11 @@ type Scheduler struct { // newComponentsCh is written to when schedComponents gets updated. newComponentsCh chan struct{} + + // onPause is called when scheduler is making changes to running components. + onPause func() + // onResume is called when scheduler is done making changes to running components. + onResume func() } // New creates a new unstarted Scheduler. Call Run to start it, and call @@ -47,6 +53,20 @@ func New(l log.Logger) *Scheduler { return &Scheduler{ log: l, newComponentsCh: make(chan struct{}, 1), + onPause: func() {}, + onResume: func() {}, + } +} + +// NewWithPauseCallbacks is like New, but allows to specify onPause and onResume callbacks. The scheduler is assumed to +// start paused and only when its components are scheduled, it will call onResume. From then on, each update to running +// components via Schedule method will trigger a call to onPause and then onResume. +func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Scheduler { + return &Scheduler{ + log: l, + newComponentsCh: make(chan struct{}, 1), + onPause: onPause, + onResume: onResume, } } @@ -75,11 +95,16 @@ func (cs *Scheduler) Schedule(h otelcomponent.Host, cc ...otelcomponent.Componen // Run starts the Scheduler. Run will watch for schedule components to appear // and run them, terminating previously running components if they exist. func (cs *Scheduler) Run(ctx context.Context) error { + firstRun := true var components []otelcomponent.Component // Make sure we terminate all of our running components on shutdown. defer func() { + if !firstRun { // always handle the callbacks correctly + cs.onPause() + } cs.stopComponents(context.Background(), components...) + // We don't resume, as the scheduler is exiting. }() // Wait for a write to cs.newComponentsCh. The initial list of components is @@ -90,6 +115,10 @@ func (cs *Scheduler) Run(ctx context.Context) error { case <-ctx.Done(): return nil case <-cs.newComponentsCh: + if !firstRun { // do not pause on first run + cs.onPause() + firstRun = false + } // Stop the old components before running new scheduled ones. cs.stopComponents(ctx, components...) @@ -100,6 +129,7 @@ func (cs *Scheduler) Run(ctx context.Context) error { level.Debug(cs.log).Log("msg", "scheduling components", "count", len(components)) components = cs.startComponents(ctx, host, components...) + cs.onResume() } } } diff --git a/internal/component/otelcol/processor/processor.go b/internal/component/otelcol/processor/processor.go index 5aaa4750b3..5072d65233 100644 --- a/internal/component/otelcol/processor/processor.go +++ b/internal/component/otelcol/processor/processor.go @@ -7,6 +7,17 @@ import ( "errors" "os" + "github.com/prometheus/client_golang/prometheus" + otelcomponent "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" + otelextension "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/pipeline" + otelprocessor "go.opentelemetry.io/collector/processor" + sdkprometheus "go.opentelemetry.io/otel/exporters/prometheus" + otelmetric "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/sdk/metric" + "github.com/grafana/alloy/internal/build" "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/otelcol" @@ -18,16 +29,6 @@ import ( "github.com/grafana/alloy/internal/component/otelcol/internal/scheduler" "github.com/grafana/alloy/internal/service/livedebugging" "github.com/grafana/alloy/internal/util/zapadapter" - "github.com/prometheus/client_golang/prometheus" - otelcomponent "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" - otelextension "go.opentelemetry.io/collector/extension" - "go.opentelemetry.io/collector/pipeline" - otelprocessor "go.opentelemetry.io/collector/processor" - sdkprometheus "go.opentelemetry.io/otel/exporters/prometheus" - otelmetric "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/noop" - "go.opentelemetry.io/otel/sdk/metric" ) // Arguments is an extension of component.Arguments which contains necessary @@ -94,7 +95,7 @@ func New(opts component.Options, f otelprocessor.Factory, args Arguments) (*Proc ctx, cancel := context.WithCancel(context.Background()) - consumer := lazyconsumer.New(ctx) + consumer := lazyconsumer.NewPaused(ctx) // Create a lazy collector where metrics from the upstream component will be // forwarded. @@ -116,7 +117,7 @@ func New(opts component.Options, f otelprocessor.Factory, args Arguments) (*Proc factory: f, consumer: consumer, - sched: scheduler.New(opts.Logger), + sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume), collector: collector, liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), opts.ID),