Skip to content

Commit

Permalink
Fix race condition in otelcol component wrappers (#2027)
Browse files Browse the repository at this point in the history
* Fix race condition in otelcol processors

* fix issues

* Test the pausing scheduler

---------

Co-authored-by: William Dumont <[email protected]>
  • Loading branch information
thampiotr and wildum authored Nov 15, 2024
1 parent 8b5c04f commit f24c2f9
Show file tree
Hide file tree
Showing 8 changed files with 419 additions and 46 deletions.
23 changes: 12 additions & 11 deletions internal/component/otelcol/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,6 @@ import (
"errors"
"os"

"github.com/grafana/alloy/internal/build"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/otelcol"
otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config"
"github.com/grafana/alloy/internal/component/otelcol/internal/fanoutconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/lazycollector"
"github.com/grafana/alloy/internal/component/otelcol/internal/lazyconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/scheduler"
"github.com/grafana/alloy/internal/util/zapadapter"
"github.com/prometheus/client_golang/prometheus"
otelcomponent "go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
Expand All @@ -26,6 +17,16 @@ import (
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/sdk/metric"

"github.com/grafana/alloy/internal/build"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/otelcol"
otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config"
"github.com/grafana/alloy/internal/component/otelcol/internal/fanoutconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/lazycollector"
"github.com/grafana/alloy/internal/component/otelcol/internal/lazyconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/scheduler"
"github.com/grafana/alloy/internal/util/zapadapter"
)

const (
Expand Down Expand Up @@ -94,7 +95,7 @@ var (
func New(opts component.Options, f otelconnector.Factory, args Arguments) (*Connector, error) {
ctx, cancel := context.WithCancel(context.Background())

consumer := lazyconsumer.New(ctx)
consumer := lazyconsumer.NewPaused(ctx)

// Create a lazy collector where metrics from the upstream component will be
// forwarded.
Expand All @@ -116,7 +117,7 @@ func New(opts component.Options, f otelconnector.Factory, args Arguments) (*Conn
factory: f,
consumer: consumer,

sched: scheduler.New(opts.Logger),
sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume),
collector: collector,
}
if err := p.Update(args); err != nil {
Expand Down
23 changes: 12 additions & 11 deletions internal/component/otelcol/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,6 @@ import (
"errors"
"os"

"github.com/grafana/alloy/internal/build"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/otelcol"
otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config"
"github.com/grafana/alloy/internal/component/otelcol/internal/lazycollector"
"github.com/grafana/alloy/internal/component/otelcol/internal/lazyconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/scheduler"
"github.com/grafana/alloy/internal/component/otelcol/internal/views"
"github.com/grafana/alloy/internal/util/zapadapter"
"github.com/prometheus/client_golang/prometheus"
otelcomponent "go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
Expand All @@ -26,6 +17,16 @@ import (
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/sdk/metric"

"github.com/grafana/alloy/internal/build"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/otelcol"
otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config"
"github.com/grafana/alloy/internal/component/otelcol/internal/lazycollector"
"github.com/grafana/alloy/internal/component/otelcol/internal/lazyconsumer"
"github.com/grafana/alloy/internal/component/otelcol/internal/scheduler"
"github.com/grafana/alloy/internal/component/otelcol/internal/views"
"github.com/grafana/alloy/internal/util/zapadapter"
)

// Arguments is an extension of component.Arguments which contains necessary
Expand Down Expand Up @@ -108,7 +109,7 @@ var (
func New(opts component.Options, f otelexporter.Factory, args Arguments, supportedSignals TypeSignal) (*Exporter, error) {
ctx, cancel := context.WithCancel(context.Background())

consumer := lazyconsumer.New(ctx)
consumer := lazyconsumer.NewPaused(ctx)

// Create a lazy collector where metrics from the upstream component will be
// forwarded.
Expand All @@ -130,7 +131,7 @@ func New(opts component.Options, f otelexporter.Factory, args Arguments, support
factory: f,
consumer: consumer,

sched: scheduler.New(opts.Logger),
sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume),
collector: collector,

supportedSignals: supportedSignals,
Expand Down
60 changes: 60 additions & 0 deletions internal/component/otelcol/internal/lazyconsumer/lazyconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
type Consumer struct {
ctx context.Context

// pauseMut and pausedWg are used to implement Pause & Resume semantics. See Pause method for more info.
pauseMut sync.RWMutex
pausedWg *sync.WaitGroup

mut sync.RWMutex
metricsConsumer otelconsumer.Metrics
logsConsumer otelconsumer.Logs
Expand All @@ -36,6 +40,13 @@ func New(ctx context.Context) *Consumer {
return &Consumer{ctx: ctx}
}

// NewPaused is like New, but returns a Consumer that is paused by calling Pause method.
func NewPaused(ctx context.Context) *Consumer {
c := New(ctx)
c.Pause()
return c
}

// Capabilities implements otelconsumer.baseConsumer.
func (c *Consumer) Capabilities() otelconsumer.Capabilities {
return otelconsumer.Capabilities{
Expand All @@ -52,6 +63,8 @@ func (c *Consumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
return c.ctx.Err()
}

c.waitUntilResumed()

c.mut.RLock()
defer c.mut.RUnlock()

Expand All @@ -73,6 +86,8 @@ func (c *Consumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error
return c.ctx.Err()
}

c.waitUntilResumed()

c.mut.RLock()
defer c.mut.RUnlock()

Expand All @@ -94,6 +109,8 @@ func (c *Consumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
return c.ctx.Err()
}

c.waitUntilResumed()

c.mut.RLock()
defer c.mut.RUnlock()

Expand All @@ -109,6 +126,15 @@ func (c *Consumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
return c.logsConsumer.ConsumeLogs(ctx, ld)
}

func (c *Consumer) waitUntilResumed() {
c.pauseMut.RLock()
pausedWg := c.pausedWg
c.pauseMut.RUnlock()
if pausedWg != nil {
pausedWg.Wait()
}
}

// SetConsumers updates the internal consumers that Consumer will forward data
// to. It is valid for any combination of m, l, and t to be nil.
func (c *Consumer) SetConsumers(t otelconsumer.Traces, m otelconsumer.Metrics, l otelconsumer.Logs) {
Expand All @@ -119,3 +145,37 @@ func (c *Consumer) SetConsumers(t otelconsumer.Traces, m otelconsumer.Metrics, l
c.logsConsumer = l
c.tracesConsumer = t
}

// Pause will stop the consumer until Resume is called. While paused, the calls to Consume* methods will block.
// Pause can be called multiple times, but a single call to Resume will un-pause this consumer. Thread-safe.
func (c *Consumer) Pause() {
c.pauseMut.Lock()
defer c.pauseMut.Unlock()

if c.pausedWg != nil {
return // already paused
}

c.pausedWg = &sync.WaitGroup{}
c.pausedWg.Add(1)
}

// Resume will revert the Pause call and the consumer will continue to work. See Pause for more details.
func (c *Consumer) Resume() {
c.pauseMut.Lock()
defer c.pauseMut.Unlock()

if c.pausedWg == nil {
return // already resumed
}

c.pausedWg.Done() // release all waiting
c.pausedWg = nil
}

// IsPaused returns whether the consumer is currently paused. See Pause for details.
func (c *Consumer) IsPaused() bool {
c.pauseMut.RLock()
defer c.pauseMut.RUnlock()
return c.pausedWg != nil
}
161 changes: 161 additions & 0 deletions internal/component/otelcol/internal/lazyconsumer/lazyconsumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package lazyconsumer

import (
"context"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/goleak"

"github.com/grafana/alloy/internal/runtime/componenttest"
)

func Test_PauseAndResume(t *testing.T) {
c := New(componenttest.TestContext(t))
require.False(t, c.IsPaused())
c.Pause()
require.True(t, c.IsPaused())
c.Resume()
require.False(t, c.IsPaused())
}

func Test_NewPaused(t *testing.T) {
c := NewPaused(componenttest.TestContext(t))
require.True(t, c.IsPaused())
c.Resume()
require.False(t, c.IsPaused())
}

func Test_PauseResume_MultipleCalls(t *testing.T) {
c := New(componenttest.TestContext(t))
require.False(t, c.IsPaused())
c.Pause()
c.Pause()
c.Pause()
require.True(t, c.IsPaused())
c.Resume()
c.Resume()
c.Resume()
require.False(t, c.IsPaused())
}

func Test_ConsumeWaitsForResume(t *testing.T) {
goleak.VerifyNone(t, goleak.IgnoreCurrent())
c := NewPaused(componenttest.TestContext(t))
require.True(t, c.IsPaused())

method := map[string]func(){
"ConsumeTraces": func() {
_ = c.ConsumeTraces(nil, ptrace.NewTraces())
},
"ConsumeMetrics": func() {
_ = c.ConsumeMetrics(nil, pmetric.NewMetrics())
},
"ConsumeLogs": func() {
_ = c.ConsumeLogs(nil, plog.NewLogs())
},
}

for name, fn := range method {
t.Run(name, func(t *testing.T) {
c.Pause()
require.True(t, c.IsPaused())

started := make(chan struct{})
finished := make(chan struct{})

// Start goroutine that attempts to run Consume* method
go func() {
started <- struct{}{}
fn()
finished <- struct{}{}
}()

// Wait to be started
select {
case <-started:
case <-time.After(5 * time.Second):
t.Fatal("consumer goroutine never started")
}

// Wait for a bit to ensure the consumer is blocking on Consume* function
select {
case <-finished:
t.Fatal("consumer should not have finished yet - it's paused")
case <-time.After(100 * time.Millisecond):
}

// Resume the consumer and verify the Consume* function unblocked
c.Resume()
select {
case <-finished:
case <-time.After(5 * time.Second):
t.Fatal("consumer should have finished after resuming")
}

})
}
}

func Test_PauseResume_Multithreaded(t *testing.T) {
goleak.VerifyNone(t, goleak.IgnoreCurrent())
ctx, cancel := context.WithCancel(componenttest.TestContext(t))
runs := 500
routines := 5
allDone := sync.WaitGroup{}

c := NewPaused(componenttest.TestContext(t))
require.True(t, c.IsPaused())

// Run goroutines that constantly try to call Consume* methods
for i := 0; i < routines; i++ {
allDone.Add(1)
go func() {
for {
select {
case <-ctx.Done():
allDone.Done()
return
default:
_ = c.ConsumeLogs(ctx, plog.NewLogs())
_ = c.ConsumeMetrics(ctx, pmetric.NewMetrics())
_ = c.ConsumeTraces(ctx, ptrace.NewTraces())
}
}
}()
}

// Run goroutines that Pause and then Resume in parallel.
// In particular, this verifies we can call .Pause() and .Resume() on an already paused or already resumed consumer.
workChan := make(chan struct{}, routines)
for i := 0; i < routines; i++ {
allDone.Add(1)
go func() {
for {
select {
case <-workChan:
c.Pause()
c.Resume()
case <-ctx.Done():
allDone.Done()
return
}
}
}()
}

for i := 0; i < runs; i++ {
workChan <- struct{}{}
}
cancel()

allDone.Wait()

// Should not be paused as last call will always be c.Resume()
require.False(t, c.IsPaused())
}
Loading

0 comments on commit f24c2f9

Please sign in to comment.