Skip to content

Commit

Permalink
Create simple Supervisor example
Browse files Browse the repository at this point in the history
- The Supervisor can manage OpenTelemetry Collector.
- Demonstrates basic features: applying config, configuring Collector to collect its own metrics.

TODO:
- Find a way to fetch Collector version instead of hard-coding it.
- Set instance id in the Collector config file to make sure OpAMP and Collector
  metrics use the same instance id.
  (Related open issue open-telemetry/opentelemetry-collector#5398)
- Re-think callbacks to avoid unnecessary restarts
  (See open-telemetry#77)
- Add a way for Supervisor to understand why the Collector process exited unexpectedly.
  • Loading branch information
tigrannajaryan committed May 25, 2022
1 parent 26dad6c commit b971ffe
Show file tree
Hide file tree
Showing 9 changed files with 735 additions and 1 deletion.
1 change: 1 addition & 0 deletions internal/examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
require (
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-logr/logr v1.2.1 // indirect
github.com/go-logr/stdr v1.2.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
Expand Down
3 changes: 3 additions & 0 deletions internal/examples/supervisor/bin/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
agent.log
effective.yaml
supervisor
5 changes: 5 additions & 0 deletions internal/examples/supervisor/bin/supervisor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
server:
endpoint: ws://127.0.0.1:4320/v1/opamp

agent:
executable: ../../../../../opentelemetry-collector-contrib/bin/otelcontribcol_darwin_amd64
24 changes: 24 additions & 0 deletions internal/examples/supervisor/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package main

import (
"log"
"os"
"os/signal"

"github.com/open-telemetry/opamp-go/internal/examples/supervisor/supervisor"
)

func main() {
logger := &supervisor.Logger{Logger: log.Default()}
supervisor, err := supervisor.NewSupervisor(logger)
if err != nil {
logger.Errorf(err.Error())
os.Exit(-1)
return
}

interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
<-interrupt
supervisor.Shutdown()
}
158 changes: 158 additions & 0 deletions internal/examples/supervisor/supervisor/commander/commander.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package commander

import (
"context"
"errors"
"fmt"
"os"
"os/exec"
"syscall"
"time"

"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/internal/examples/supervisor/supervisor/config"
)

// Commander can start/stop/restat the Agent executable and also watch for a signal
// for the Agent process to finish.
type Commander struct {
logger types.Logger
cfg *config.Agent
args []string
cmd *exec.Cmd
doneCh chan struct{}
waitCh chan struct{}
}

func NewCommander(logger types.Logger, cfg *config.Agent, args ...string) (*Commander, error) {
if cfg.Executable == "" {
return nil, errors.New("agent.executable config option must be specified")
}

return &Commander{
logger: logger,
cfg: cfg,
args: args,
}, nil
}

// Start the Agent and begin watching the process.
// Agent's stdout and stderr are written to a file.
func (c *Commander) Start(ctx context.Context) error {
c.logger.Debugf("Starting agent %s", c.cfg.Executable)

logFilePath := "agent.log"
logFile, err := os.Create(logFilePath)
if err != nil {
return fmt.Errorf("cannot create %s: %s", logFilePath, err.Error())
}

c.cmd = exec.CommandContext(ctx, c.cfg.Executable, c.args...)

// Capture standard output and standard error.
c.cmd.Stdout = logFile
c.cmd.Stderr = logFile

c.doneCh = make(chan struct{}, 1)
c.waitCh = make(chan struct{})

if err := c.cmd.Start(); err != nil {
return err
}

c.logger.Debugf("Agent process started, PID=%d", c.cmd.Process.Pid)

go c.watch()

return nil
}

func (c *Commander) Restart(ctx context.Context) error {
if err := c.Stop(ctx); err != nil {
return err
}
if err := c.Start(ctx); err != nil {
return err
}
return nil
}

func (c *Commander) watch() {
c.cmd.Wait()
c.doneCh <- struct{}{}
close(c.waitCh)
}

// Done returns a channel that will send a signal when the Agent process is finished.
func (c *Commander) Done() <-chan struct{} {
return c.doneCh
}

// Pid returns Agent process PID if it is started or 0 if it is not.
func (c *Commander) Pid() int {
if c.cmd == nil || c.cmd.Process == nil {
return 0
}
return c.cmd.Process.Pid
}

// ExitCode returns Agent process exit code if it exited or 0 if it is not.
func (c *Commander) ExitCode() int {
if c.cmd == nil || c.cmd.ProcessState == nil {
return 0
}
return c.cmd.ProcessState.ExitCode()
}

// Stop the Agent process. Sends SIGTERM to the process and wait for up 10 seconds
// and if the process does not finish kills it forcedly by sending SIGKILL.
// Returns after the process is terminated.
func (c *Commander) Stop(ctx context.Context) error {
if c.cmd == nil || c.cmd.Process == nil {
// Not started, nothing to do.
return nil
}

c.logger.Debugf("Stopping agent process, PID=%v", c.cmd.Process.Pid)

// Gracefully signal process to stop.
if err := c.cmd.Process.Signal(syscall.SIGTERM); err != nil {
return err
}

finished := make(chan struct{})

// Setup a goroutine to wait a while for process to finish and send kill signal
// to the process if it doesn't finish.
var innerErr error
go func() {
// Wait 10 seconds.
t := time.After(10 * time.Second)
select {
case <-ctx.Done():
break
case <-t:
break
case <-finished:
// Process is successfully finished.
c.logger.Debugf("Agent process PID=%v successfully stopped.", c.cmd.Process.Pid)
return
}

// Time is out. Kill the process.
c.logger.Debugf(
"Agent process PID=%d is not responding to SIGTERM. Sending SIGKILL to kill forcedly.",
c.cmd.Process.Pid)
if innerErr = c.cmd.Process.Signal(syscall.SIGKILL); innerErr != nil {
return
}
}()

// Wait for process to terminate
<-c.waitCh

// Let goroutine know process is finished.
close(finished)

return innerErr
}
15 changes: 15 additions & 0 deletions internal/examples/supervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package config

// Supervisor is the Supervisor config file format.
type Supervisor struct {
Server *OpAMPServer
Agent *Agent
}

type OpAMPServer struct {
Endpoint string
}

type Agent struct {
Executable string
}
15 changes: 15 additions & 0 deletions internal/examples/supervisor/supervisor/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package supervisor

import "log"

type Logger struct {
Logger *log.Logger
}

func (l *Logger) Debugf(format string, v ...interface{}) {
l.Logger.Printf(format, v...)
}

func (l *Logger) Errorf(format string, v ...interface{}) {
l.Logger.Printf(format, v...)
}
Loading

0 comments on commit b971ffe

Please sign in to comment.