Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make otel scheduler sync #2262

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/component/otelcol/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (a *Auth) Update(args component.Arguments) error {
})

// Schedule the components to run once our component is running.
a.sched.Schedule(host, components...)
a.sched.Schedule(a.ctx, host, components...)
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions internal/component/otelcol/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func New(opts component.Options, f otelconnector.Factory, args Arguments) (*Conn
factory: f,
consumer: consumer,

sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume),
sched: scheduler.New(opts.Logger),
collector: collector,
}
if err := p.Update(args); err != nil {
Expand Down Expand Up @@ -215,8 +215,10 @@ func (p *Connector) Update(args component.Arguments) error {
}

// Schedule the components to run once our component is running.
p.sched.Schedule(host, components...)
p.consumer.Pause()
p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector)
p.sched.Schedule(p.ctx, host, components...)
p.consumer.Resume()
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions internal/component/otelcol/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func New(opts component.Options, f otelexporter.Factory, args Arguments, support
factory: f,
consumer: consumer,

sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume),
sched: scheduler.New(opts.Logger),
collector: collector,

supportedSignals: supportedSignals,
Expand Down Expand Up @@ -243,8 +243,10 @@ func (e *Exporter) Update(args component.Arguments) error {
}

// Schedule the components to run once our component is running.
e.sched.Schedule(host, components...)
e.consumer.Pause()
e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter)
e.sched.Schedule(e.ctx, host, components...)
e.consumer.Resume()
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/component/otelcol/extension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (e *Extension) Update(args component.Arguments) error {
}

// Schedule the components to run once our component is running.
e.sched.Schedule(host, components...)
e.sched.Schedule(e.ctx, host, components...)
return nil
}

Expand Down
83 changes: 12 additions & 71 deletions internal/component/otelcol/internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,37 +37,13 @@ type Scheduler struct {
schedMut sync.Mutex
schedComponents []otelcomponent.Component // Most recently created components
host otelcomponent.Host

// newComponentsCh is written to when schedComponents gets updated.
newComponentsCh chan struct{}

// onPause is called when scheduler is making changes to running components.
onPause func()
// onResume is called when scheduler is done making changes to running components.
onResume func()
}

// New creates a new unstarted Scheduler. Call Run to start it, and call
// Schedule to schedule components to run.
func New(l log.Logger) *Scheduler {
return &Scheduler{
log: l,
newComponentsCh: make(chan struct{}, 1),
onPause: func() {},
onResume: func() {},
}
}

// NewWithPauseCallbacks is like New, but allows to specify onPause and onResume callbacks. The scheduler is assumed to
// start paused and only when its components are scheduled, it will call onResume. From then on, each update to running
// components via Schedule method will trigger a call to onPause and then onResume. When scheduler is shutting down, it
// will call onResume as a last step.
func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Scheduler {
return &Scheduler{
log: l,
newComponentsCh: make(chan struct{}, 1),
onPause: onPause,
onResume: onResume,
log: l,
}
}

Expand All @@ -77,63 +53,28 @@ func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Sched
// Schedule completely overrides the set of previously running components;
// components which have been removed since the last call to Schedule will be
// stopped.
func (cs *Scheduler) Schedule(h otelcomponent.Host, cc ...otelcomponent.Component) {
func (cs *Scheduler) Schedule(ctx context.Context, h otelcomponent.Host, cc ...otelcomponent.Component) {
cs.schedMut.Lock()
defer cs.schedMut.Unlock()

cs.schedComponents = cc
cs.host = h
// Stop the old components before running new scheduled ones.
cs.stopComponents(ctx, cs.schedComponents...)

select {
case cs.newComponentsCh <- struct{}{}:
// Queued new message.
default:
// A message is already queued for refreshing running components so we
// don't have to do anything here.
}
level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cc))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cc))
level.Debug(cs.log).Log("msg", "scheduling otelcol components", "count", len(cc))

It's quite an ambiguous log line, but if you think there is value in keeping it I'm happy to have it as a debug log.

cs.schedComponents = cs.startComponents(ctx, h, cc...)
ptodev marked this conversation as resolved.
Show resolved Hide resolved
}

// Run starts the Scheduler. Run will watch for schedule components to appear
// and run them, terminating previously running components if they exist.
// Run starts the Scheduler and stops the components when the context is cancelled.
func (cs *Scheduler) Run(ctx context.Context) error {
firstRun := true
var components []otelcomponent.Component

// Make sure we terminate all of our running components on shutdown.
defer func() {
if !firstRun { // always handle the callbacks correctly
cs.onPause()
}
cs.stopComponents(context.Background(), components...)
cs.onResume()
cs.schedMut.Lock()
defer cs.schedMut.Unlock()
cs.stopComponents(context.Background(), cs.schedComponents...)
}()

// Wait for a write to cs.newComponentsCh. The initial list of components is
// always empty so there's nothing to do until cs.newComponentsCh is written
// to.
for {
select {
case <-ctx.Done():
return nil
case <-cs.newComponentsCh:
if !firstRun {
cs.onPause() // do not pause on first run
} else {
firstRun = false
}
// Stop the old components before running new scheduled ones.
cs.stopComponents(ctx, components...)

cs.schedMut.Lock()
components = cs.schedComponents
host := cs.host
cs.schedMut.Unlock()

level.Debug(cs.log).Log("msg", "scheduling components", "count", len(components))
components = cs.startComponents(ctx, host, components...)
cs.onResume()
}
}
<-ctx.Done()
return nil
}

func (cs *Scheduler) stopComponents(ctx context.Context, cc ...otelcomponent.Component) {
Expand Down
63 changes: 4 additions & 59 deletions internal/component/otelcol/internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
otelcomponent "go.opentelemetry.io/collector/component"
"go.uber.org/atomic"

"github.com/grafana/alloy/internal/component/otelcol/internal/scheduler"
"github.com/grafana/alloy/internal/runtime/componenttest"
Expand All @@ -32,7 +30,7 @@ func TestScheduler(t *testing.T) {
// Schedule our component, which should notify the started trigger once it is
// running.
component, started, _ := newTriggerComponent()
cs.Schedule(h, component)
cs.Schedule(context.Background(), h, component)
require.NoError(t, started.Wait(5*time.Second), "component did not start")
})

Expand All @@ -52,68 +50,15 @@ func TestScheduler(t *testing.T) {
// Schedule our component, which should notify the started and stopped
// trigger once it starts and stops respectively.
component, started, stopped := newTriggerComponent()
cs.Schedule(h, component)
cs.Schedule(context.Background(), h, component)

// Wait for the component to start, and then unschedule all components, which
// should cause our running component to terminate.
require.NoError(t, started.Wait(5*time.Second), "component did not start")
cs.Schedule(h)
cs.Schedule(context.Background(), h)
require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown")
})

t.Run("Pause callbacks are called", func(t *testing.T) {
var (
pauseCalls = &atomic.Int32{}
resumeCalls = &atomic.Int32{}
l = util.TestLogger(t)
cs = scheduler.NewWithPauseCallbacks(
l,
func() { pauseCalls.Inc() },
func() { resumeCalls.Inc() },
)
h = scheduler.NewHost(l)
)
ctx, cancel := context.WithCancel(context.Background())

// Run our scheduler in the background.
go func() {
err := cs.Run(ctx)
require.NoError(t, err)
}()

// Schedule our component, which should notify the started and stopped
// trigger once it starts and stops respectively.
component, started, stopped := newTriggerComponent()
cs.Schedule(h, component)

toInt := func(a *atomic.Int32) int { return int(a.Load()) }

require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, 0, toInt(pauseCalls), "pause callbacks should not be called on first run")
assert.Equal(t, 1, toInt(resumeCalls), "resume callback should be called on first run")
}, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly")

// Wait for the component to start, and then unschedule all components, which
// should cause our running component to terminate.
require.NoError(t, started.Wait(5*time.Second), "component did not start")
cs.Schedule(h)

require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, 1, toInt(pauseCalls), "pause callback should be called on second run")
assert.Equal(t, 2, toInt(resumeCalls), "resume callback should be called on second run")
}, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly")

require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown")

// Stop the scheduler
cancel()

require.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, 2, toInt(pauseCalls), "pause callback should be called on shutdown")
assert.Equal(t, 3, toInt(resumeCalls), "resume callback should be called on shutdown")
}, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly")
})

t.Run("Running components get stopped on shutdown", func(t *testing.T) {
var (
l = util.TestLogger(t)
Expand All @@ -133,7 +78,7 @@ func TestScheduler(t *testing.T) {
// Schedule our component which will notify our trigger when Shutdown gets
// called.
component, started, stopped := newTriggerComponent()
cs.Schedule(h, component)
cs.Schedule(ctx, h, component)

// Wait for the component to start, and then stop our scheduler, which
// should cause our running component to terminate.
Expand Down
7 changes: 5 additions & 2 deletions internal/component/otelcol/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func New(opts component.Options, f otelprocessor.Factory, args Arguments) (*Proc
factory: f,
consumer: consumer,

sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume),
sched: scheduler.New(opts.Logger),
collector: collector,

liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), opts.ID),
Expand Down Expand Up @@ -238,8 +238,11 @@ func (p *Processor) Update(args component.Arguments) error {
}

// Schedule the components to run once our component is running.
p.sched.Schedule(host, components...)
p.consumer.Pause()
p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor)
p.sched.Schedule(p.ctx, host, components...)
p.consumer.Resume()

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/component/otelcol/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (r *Receiver) Update(args component.Arguments) error {
}

// Schedule the components to run once our component is running.
r.sched.Schedule(host, components...)
r.sched.Schedule(r.ctx, host, components...)
return nil
}

Expand Down
Loading