Skip to content

Commit

Permalink
Use contexts for cancellation (sensu#2827)
Browse files Browse the repository at this point in the history
Use contexts for cancellation through the agent application. This
means threading a context through components that are also used in
the backend, so the backend has been nominally modified as well.

Note that cancellation problems still exist in the backend and will
require a larger, focused effort to resolve.

The Agent Stop() method has been removed, and Agent Run() now
blocks until the agent context is cancelled.

The asset manager now accepts a context, and will stop trying to
retrieve assets when the context is cancelled. This prevents
problems with asset managers never stopping, when assets cannot be
retrieved.

The agent's connection manager now stops trying to reconnect when
a context cancellation is observed.

Agents now shut down cleanly when SIGTERM and SIGINT are received.
All in-progress HTTP connections are given 1s to shut down and are
then terminated forcibly.

Agents still block on SIGTERM and SIGINT, but the longer waits for
shutdown should now be resolved.

A second minor issue has also been resolved, where agents would
always try to reconnect to the same backend.

Signed-off-by: Eric Chlebek <[email protected]>
  • Loading branch information
echlebek authored Mar 25, 2019
1 parent aa5ebd2 commit 656b5ef
Show file tree
Hide file tree
Showing 32 changed files with 322 additions and 267 deletions.
6 changes: 6 additions & 0 deletions CHANGELOGS/unreleased/2756-agent-cancellation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
### Fixed
- Fixed a bug where agents would sometimes refuse to terminate on SIGTERM and
SIGINT.
- Fixed a bug where agents would always try to reconnect to the same backend,
even when multiple backends were specified. Agents will now try to connect to
other backends, in pseudorandom fashion.
110 changes: 48 additions & 62 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"strings"
Expand Down Expand Up @@ -44,11 +45,9 @@ type Agent struct {
api *http.Server
assetGetter asset.Getter
backendSelector BackendSelector
cancel context.CancelFunc
config *Config
connected bool
connectedMu sync.RWMutex
context context.Context
entity *v2.Entity
executor command.Executor
handler *handler.MessageHandler
Expand All @@ -57,7 +56,6 @@ type Agent struct {
inProgressMu *sync.Mutex
statsdServer *statsd.Server
sendq chan *transport.Message
stopping chan struct{}
systemInfo *v2.System
systemInfoMu sync.RWMutex
wg sync.WaitGroup
Expand All @@ -67,19 +65,14 @@ type Agent struct {
// NewAgent creates a new Agent. It returns non-nil error if there is any error
// when creating the config.CacheDir.
func NewAgent(config *Config) (*Agent, error) {
ctx := context.TODO()
ctx, cancel := context.WithCancel(ctx)
agent := &Agent{
backendSelector: &RandomBackendSelector{Backends: config.BackendURLs},
cancel: cancel,
context: ctx,
connected: false,
config: config,
executor: command.NewExecutor(),
handler: handler.NewMessageHandler(),
inProgress: make(map[string]*v2.CheckConfig),
inProgressMu: &sync.Mutex{},
stopping: make(chan struct{}),
sendq: make(chan *transport.Message, 10),
systemInfo: &v2.System{},
}
Expand Down Expand Up @@ -119,7 +112,7 @@ func (a *Agent) refreshSystemInfo() error {
return nil
}

func (a *Agent) refreshSystemInfoPeriodically() {
func (a *Agent) refreshSystemInfoPeriodically(ctx context.Context) {
defer logger.Debug("shutting down system info collector")
ticker := time.NewTicker(time.Duration(DefaultSystemInfoRefreshInterval) * time.Second)
defer ticker.Stop()
Expand All @@ -130,7 +123,7 @@ func (a *Agent) refreshSystemInfoPeriodically() {
if err := a.refreshSystemInfo(); err != nil {
logger.WithError(err).Error("failed to refresh system info")
}
case <-a.stopping:
case <-ctx.Done():
return
}
}
Expand All @@ -157,7 +150,12 @@ func (a *Agent) buildTransportHeaderMap() http.Header {
// 7. Start refreshing system info periodically.
// 8. Start sending periodic keepalives.
// 9. Start the API server, shutdown the agent if doing so fails.
func (a *Agent) Run() error {
func (a *Agent) Run(ctx context.Context) error {
defer func() {
if err := a.apiQueue.Close(); err != nil {
logger.WithError(err).Error("error closing API queue")
}
}()
userCredentials := fmt.Sprintf("%s:%s", a.config.User, a.config.Password)
userCredentials = base64.StdEncoding.EncodeToString([]byte(userCredentials))
a.header = a.buildTransportHeaderMap()
Expand All @@ -171,54 +169,55 @@ func (a *Agent) Run() error {
return fmt.Errorf("bad keepalive timeout: %d (minimum value is 5 seconds)", timeout)
}

assetManager := asset.NewManager(a.config.CacheDir, a.getAgentEntity(), a.stopping, &a.wg)
assetManager := asset.NewManager(a.config.CacheDir, a.getAgentEntity(), &a.wg)
var err error
a.assetGetter, err = assetManager.StartAssetManager()
a.assetGetter, err = assetManager.StartAssetManager(ctx)
if err != nil {
return err
}

// Start the statsd listener only if the agent configuration has it enabled
if !a.config.StatsdServer.Disable {
a.StartStatsd()
a.StartStatsd(ctx)
}

go a.connectionManager()
go a.refreshSystemInfoPeriodically()
go a.handleAPIQueue(a.context)
go a.connectionManager(ctx)
go a.refreshSystemInfoPeriodically(ctx)
go a.handleAPIQueue(ctx)

a.wg.Wait()
return nil
}

func (a *Agent) connectionManager() {
func (a *Agent) connectionManager(ctx context.Context) {
defer logger.Debug("shutting down connection manager")
for {
select {
case <-a.stopping:
return
default:
}

a.connectedMu.Lock()
a.connected = false
a.connectedMu.Unlock()

conn := connectWithBackoff(a.backendSelector.Select(), a.config.TLS, a.header)
conn, err := connectWithBackoff(ctx, a.backendSelector, a.config.TLS, a.header)
if err != nil {
if err == ctx.Err() {
return
}
log.Fatal(err)
}

a.connectedMu.Lock()
a.connected = true
a.connectedMu.Unlock()

done := make(chan struct{})

go a.receiveLoop(conn, done)
a.sendLoop(conn, done)
ctx, cancel := context.WithCancel(ctx)
go a.receiveLoop(ctx, cancel, conn)
if err := a.sendLoop(ctx, cancel, conn); err != nil && err != ctx.Err() {
logger.WithError(err).Error("error sending messages")
}
}
}

func (a *Agent) receiveLoop(conn transport.Transport, done chan struct{}) {
defer close(done)

func (a *Agent) receiveLoop(ctx context.Context, cancel context.CancelFunc, conn transport.Transport) {
defer cancel()
for {
m, err := conn.Receive()
if err != nil {
Expand All @@ -231,15 +230,16 @@ func (a *Agent) receiveLoop(conn transport.Transport, done chan struct{}) {
"type": msg.Type,
"payload": string(msg.Payload),
}).Info("message received")
err := a.handler.Handle(msg.Type, msg.Payload)
err := a.handler.Handle(ctx, msg.Type, msg.Payload)
if err != nil {
logger.WithError(err).Error("error handling message")
}
}(m)
}
}

func (a *Agent) sendLoop(conn transport.Transport, done chan struct{}) error {
func (a *Agent) sendLoop(ctx context.Context, cancel context.CancelFunc, conn transport.Transport) error {
defer cancel()
keepalive := time.NewTicker(time.Duration(a.config.KeepaliveInterval) * time.Second)
defer keepalive.Stop()
logger.Info("sending keepalive")
Expand All @@ -249,9 +249,7 @@ func (a *Agent) sendLoop(conn transport.Transport, done chan struct{}) error {
}
for {
select {
case <-done:
return nil
case <-a.stopping:
case <-ctx.Done():
if err := conn.Close(); err != nil {
logger.WithError(err).Error("error closing websocket connection")
return err
Expand Down Expand Up @@ -310,7 +308,7 @@ func (a *Agent) Connected() bool {

// StartAPI starts the Agent HTTP API. After attempting to start the API, if the
// HTTP server encounters a fatal error, it will shutdown the rest of the agent.
func (a *Agent) StartAPI() {
func (a *Agent) StartAPI(ctx context.Context) {
// Prepare the HTTP API server
a.api = newServer(a)

Expand All @@ -331,7 +329,8 @@ func (a *Agent) StartAPI() {
// This is _only_ for the purpose of making Stop() a blocking call.
// The goroutine running the HTTP Server has to return before Stop()
// can return, so we use this to signal that goroutine to shutdown.
<-a.stopping
defer a.wg.Done()
<-ctx.Done()
logger.Info("API shutting down")

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
Expand All @@ -340,51 +339,40 @@ func (a *Agent) StartAPI() {
if err := a.api.Shutdown(ctx); err != nil {
logger.WithError(err).Error("error shutting down the API server")
}

a.wg.Done()
}()
}

// StartSocketListeners starts the agent's TCP and UDP socket listeners.
func (a *Agent) StartSocketListeners() {
if _, _, err := a.createListenSockets(); err != nil {
func (a *Agent) StartSocketListeners(ctx context.Context) {
if _, _, err := a.createListenSockets(ctx); err != nil {
logger.WithError(err).Error("unable to start socket listeners")
}
}

// Stop shuts down the agent. It will block until all listening goroutines
// have returned.
func (a *Agent) Stop() {
a.cancel()
close(a.stopping)
if err := a.apiQueue.Close(); err != nil {
logger.WithError(err).Error("error closing API queue")
}
a.wg.Wait()
}

// StartStatsd starts up a StatsD listener on the agent, logs an error for any
// failures.
func (a *Agent) StartStatsd() {
func (a *Agent) StartStatsd(ctx context.Context) {
logger.Info("starting statsd server on address: ", a.statsdServer.MetricsAddr)

go func() {
if err := a.statsdServer.Run(a.context); err != nil {
if err := a.statsdServer.Run(ctx); err != nil && err != ctx.Err() {
logger.WithError(err).Errorf("error with statsd server on address: %s, statsd listener will not run", a.statsdServer.MetricsAddr)
}
}()
}

func connectWithBackoff(url string, tlsOpts *v2.TLSOptions, header http.Header) transport.Transport {
func connectWithBackoff(ctx context.Context, selector BackendSelector, tlsOpts *v2.TLSOptions, header http.Header) (transport.Transport, error) {
var conn transport.Transport

backoff := retry.ExponentialBackoff{
InitialDelayInterval: 10 * time.Millisecond,
MaxDelayInterval: 10 * time.Second,
Multiplier: 10,
Ctx: ctx,
}

if err := backoff.Retry(func(retry int) (bool, error) {
err := backoff.Retry(func(retry int) (bool, error) {
url := selector.Select()
c, err := transport.Connect(url, tlsOpts, header)
if err != nil {
logger.WithError(err).Error("reconnection attempt failed")
Expand All @@ -396,9 +384,7 @@ func connectWithBackoff(url string, tlsOpts *v2.TLSOptions, header http.Header)
conn = c

return true, nil
}); err != nil {
logger.WithError(err).Fatal("could not connect to transport")
}
})

return conn
return conn, err
}
Loading

0 comments on commit 656b5ef

Please sign in to comment.