diff --git a/.chloggen/opamp_fix_27891_restart_delay.yaml b/.chloggen/opamp_fix_27891_restart_delay.yaml new file mode 100644 index 000000000000..54c94f444048 --- /dev/null +++ b/.chloggen/opamp_fix_27891_restart_delay.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: opampsupervisor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix restart delay when agent process exits unexpectedly. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27891] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/cmd/opampsupervisor/supervisor/commander/commander.go b/cmd/opampsupervisor/supervisor/commander/commander.go index 90901e0e7103..7457f23693da 100644 --- a/cmd/opampsupervisor/supervisor/commander/commander.go +++ b/cmd/opampsupervisor/supervisor/commander/commander.go @@ -26,6 +26,7 @@ type Commander struct { args []string cmd *exec.Cmd doneCh chan struct{} + exitCh chan struct{} running *atomic.Int64 } @@ -39,6 +40,9 @@ func NewCommander(logger *zap.Logger, cfg *config.Agent, args ...string) (*Comma cfg: cfg, args: args, running: &atomic.Int64{}, + // Buffer channels so we can send messages without blocking on listeners. + doneCh: make(chan struct{}, 1), + exitCh: make(chan struct{}, 1), }, nil } @@ -52,6 +56,21 @@ func (c *Commander) Start(ctx context.Context) error { return nil } + // Drain channels in case there are no listeners that + // drained messages from previous runs. + if len(c.doneCh) > 0 { + select { + case <-c.doneCh: + default: + } + } + if len(c.exitCh) > 0 { + select { + case <-c.exitCh: + default: + } + } + c.logger.Debug("Starting agent", zap.String("agent", c.cfg.Executable)) logFilePath := "agent.log" @@ -67,8 +86,6 @@ func (c *Commander) Start(ctx context.Context) error { c.cmd.Stdout = logFile c.cmd.Stderr = logFile - c.doneCh = make(chan struct{}) - if err := c.cmd.Start(); err != nil { return err } @@ -101,12 +118,13 @@ func (c *Commander) watch() { } c.running.Store(0) - close(c.doneCh) + c.doneCh <- struct{}{} + c.exitCh <- struct{}{} } -// Done returns a channel that will send a signal when the Agent process is finished. -func (c *Commander) Done() <-chan struct{} { - return c.doneCh +// Exited returns a channel that will send a signal when the Agent process exits. +func (c *Commander) Exited() <-chan struct{} { + return c.exitCh } // Pid returns Agent process PID if it is started or 0 if it is not. diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 87ad84ba543e..49c2864fb101 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -797,7 +797,7 @@ func (s *Supervisor) runAgentProcess() { s.stopAgentApplyConfig() s.startAgent() - case <-s.commander.Done(): + case <-s.commander.Exited(): if s.shuttingDown { return } @@ -817,7 +817,12 @@ func (s *Supervisor) runAgentProcess() { // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21079 // Wait 5 seconds before starting again. - restartTimer.Stop() + if !restartTimer.Stop() { + select { + case <-restartTimer.C: // Try to drain the channel + default: + } + } restartTimer.Reset(5 * time.Second) case <-restartTimer.C: