Skip to content

Commit

Permalink
(manual backport of be8575a)
Browse files Browse the repository at this point in the history
Fix server shutdown not waiting for worker run completion (#19560)

* Move group into a separate helper module for reuse

* Add shutdownCh to worker

The shutdown channel is used to signal that worker has stopped.

* Make server shutdown block on workers' shutdownCh

* Fix waiting for eval broker state change blocking indefinitely

There was a race condition in the GenericNotifier between the
Run and WaitForChange functions, where WaitForChange blocks
trying to write to a full unsubscribeCh, but the Run function never
reads from the unsubscribeCh as it has already stopped.

This commit fixes it by unblocking if the notifier has been stopped.

* Bound the amount of time server shutdown waits on worker completion

* Fix lostcancel linter error

* Fix worker test using unexpected worker constructor

* Add changelog

---------

Co-authored-by: Marvin Chin <[email protected]>
  • Loading branch information
2 people authored and shoenig committed Jan 5, 2024
1 parent eb51977 commit 5c23032
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 71 deletions.
3 changes: 3 additions & 0 deletions .changelog/19560.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
server: Fix server not waiting for workers to submit nacks for dequeued evaluations before shutting down
```
32 changes: 3 additions & 29 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/envoy"
"github.com/hashicorp/nomad/helper/group"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/pool"
hstats "github.com/hashicorp/nomad/helper/stats"
Expand Down Expand Up @@ -255,7 +256,7 @@ type Client struct {

// shutdownGroup are goroutines that exit when shutdownCh is closed.
// Shutdown() blocks on Wait() after closing shutdownCh.
shutdownGroup group
shutdownGroup group.Group

// tokensClient is Nomad Client's custom Consul client for requesting Consul
// Service Identity tokens through Nomad Server.
Expand Down Expand Up @@ -840,7 +841,7 @@ func (c *Client) Shutdown() error {
// Stop Garbage collector
c.garbageCollector.Stop()

arGroup := group{}
arGroup := group.Group{}
if c.GetConfig().DevMode {
// In DevMode destroy all the running allocations.
for _, ar := range c.getAllocRunners() {
Expand Down Expand Up @@ -3354,33 +3355,6 @@ func (c *Client) GetTaskEventHandler(allocID, taskName string) drivermanager.Eve
return nil
}

// group wraps a func() in a goroutine and provides a way to block until it
// exits. Inspired by https://godoc.org/golang.org/x/sync/errgroup
type group struct {
wg sync.WaitGroup
}

// Go starts f in a goroutine and must be called before Wait.
func (g *group) Go(f func()) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
f()
}()
}

func (g *group) AddCh(ch <-chan struct{}) {
g.Go(func() {
<-ch
})
}

// Wait for all goroutines to exit. Must be called after all calls to Go
// complete.
func (g *group) Wait() {
g.wg.Wait()
}

// pendingClientUpdates are the set of allocation updates that the client is
// waiting to send
type pendingClientUpdates struct {
Expand Down
23 changes: 18 additions & 5 deletions helper/broker/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package broker

import (
"context"
"time"

"github.com/hashicorp/nomad/helper"
Expand All @@ -21,15 +22,18 @@ type GenericNotifier struct {
// subscription membership mapping.
subscribeCh chan chan interface{}
unsubscribeCh chan chan interface{}

ctx context.Context
}

// NewGenericNotifier returns a generic notifier which can be used by a process
// to notify many subscribers when a specific update is triggered.
func NewGenericNotifier() *GenericNotifier {
func NewGenericNotifier(ctx context.Context) *GenericNotifier {
return &GenericNotifier{
publishCh: make(chan interface{}, 1),
subscribeCh: make(chan chan interface{}, 1),
unsubscribeCh: make(chan chan interface{}, 1),
ctx: ctx,
}
}

Expand All @@ -46,7 +50,7 @@ func (g *GenericNotifier) Notify(msg interface{}) {
// Run is a long-lived process which handles updating subscribers as well as
// ensuring any update is sent to them. The passed stopCh is used to coordinate
// shutdown.
func (g *GenericNotifier) Run(stopCh <-chan struct{}) {
func (g *GenericNotifier) Run() {

// Store our subscribers inline with a map. This map can only be accessed
// via a single channel update at a time, meaning we can manage without
Expand All @@ -55,7 +59,7 @@ func (g *GenericNotifier) Run(stopCh <-chan struct{}) {

for {
select {
case <-stopCh:
case <-g.ctx.Done():
return
case msgCh := <-g.subscribeCh:
subscribers[msgCh] = struct{}{}
Expand Down Expand Up @@ -83,7 +87,11 @@ func (g *GenericNotifier) WaitForChange(timeout time.Duration) interface{} {
// Create a channel and subscribe to any update. This channel is buffered
// to ensure we do not block the main broker process.
updateCh := make(chan interface{}, 1)
g.subscribeCh <- updateCh
select {
case <-g.ctx.Done():
return "shutting down"
case g.subscribeCh <- updateCh:
}

// Create a timeout timer and use the helper to ensure this routine doesn't
// panic and making the stop call clear.
Expand All @@ -93,14 +101,19 @@ func (g *GenericNotifier) WaitForChange(timeout time.Duration) interface{} {
// subscriber once it has been notified of a change, or reached its wait
// timeout.
defer func() {
g.unsubscribeCh <- updateCh
select {
case <-g.ctx.Done():
case g.unsubscribeCh <- updateCh:
}
close(updateCh)
timeoutStop()
}()

// Enter the main loop which listens for an update or timeout and returns
// this information to the subscriber.
select {
case <-g.ctx.Done():
return "shutting down"
case <-timeoutTimer.C:
return "wait timed out after " + timeout.String()
case update := <-updateCh:
Expand Down
9 changes: 5 additions & 4 deletions helper/broker/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package broker

import (
"context"
"sync"
"testing"
"time"
Expand All @@ -16,11 +17,11 @@ func TestGenericNotifier(t *testing.T) {
ci.Parallel(t)

// Create the new notifier.
stopChan := make(chan struct{})
defer close(stopChan)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

notifier := NewGenericNotifier()
go notifier.Run(stopChan)
notifier := NewGenericNotifier(ctx)
go notifier.Run()

// Ensure we have buffered channels.
require.Equal(t, 1, cap(notifier.publishCh))
Expand Down
50 changes: 50 additions & 0 deletions helper/group/group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package group

import (
"context"
"sync"
)

// group wraps a func() in a goroutine and provides a way to block until it
// exits. Inspired by https://godoc.org/golang.org/x/sync/errgroup
type Group struct {
wg sync.WaitGroup
}

// Go starts f in a goroutine and must be called before Wait.
func (g *Group) Go(f func()) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
f()
}()
}

func (g *Group) AddCh(ch <-chan struct{}) {
g.Go(func() {
<-ch
})
}

// Wait for all goroutines to exit. Must be called after all calls to Go
// complete.
func (g *Group) Wait() {
g.wg.Wait()
}

// Wait for all goroutines to exit, or for the context to finish.
// Must be called after all calls to Go complete.
func (g *Group) WaitWithContext(ctx context.Context) {
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
g.Wait()
}()
select {
case <-doneCh:
case <-ctx.Done():
}
}
3 changes: 2 additions & 1 deletion helper/raftutil/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package raftutil

import (
"context"
"fmt"
"io"
"path/filepath"
Expand Down Expand Up @@ -79,7 +80,7 @@ func dummyFSM(logger hclog.Logger) (nomadFSM, error) {
// use dummy non-enabled FSM dependencies
periodicDispatch := nomad.NewPeriodicDispatch(logger, nil)
blockedEvals := nomad.NewBlockedEvals(nil, logger)
evalBroker, err := nomad.NewEvalBroker(1, 1, 1, 1)
evalBroker, err := nomad.NewEvalBroker(context.Background(), 1, 1, 1, 1)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,15 @@ type PendingEvaluations []*structs.Evaluation
// initialNackDelay is the delay before making a Nacked evaluation available
// again for the first Nack and subsequentNackDelay is the compounding delay
// after the first Nack.
func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, deliveryLimit int) (*EvalBroker, error) {
func NewEvalBroker(ctx context.Context, timeout, initialNackDelay, subsequentNackDelay time.Duration, deliveryLimit int) (*EvalBroker, error) {
if timeout < 0 {
return nil, fmt.Errorf("timeout cannot be negative")
}
b := &EvalBroker{
nackTimeout: timeout,
deliveryLimit: deliveryLimit,
enabled: false,
enabledNotifier: broker.NewGenericNotifier(),
enabledNotifier: broker.NewGenericNotifier(ctx),
stats: new(BrokerStats),
evals: make(map[string]int),
jobEvals: make(map[structs.NamespacedID]string),
Expand Down
3 changes: 2 additions & 1 deletion nomad/eval_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package nomad

import (
"container/heap"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -54,7 +55,7 @@ func testBroker(t *testing.T, timeout time.Duration) *EvalBroker {
}

func testBrokerFromConfig(t *testing.T, c *Config) *EvalBroker {
b, err := NewEvalBroker(c.EvalNackTimeout, c.EvalNackInitialReenqueueDelay, c.EvalNackSubsequentReenqueueDelay, 3)
b, err := NewEvalBroker(context.Background(), c.EvalNackTimeout, c.EvalNackInitialReenqueueDelay, c.EvalNackSubsequentReenqueueDelay, 3)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
47 changes: 33 additions & 14 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/codec"
"github.com/hashicorp/nomad/helper/group"
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/helper/stats"
"github.com/hashicorp/nomad/helper/tlsutil"
Expand Down Expand Up @@ -83,6 +84,10 @@ const (
// to replicate to gracefully leave the cluster.
raftRemoveGracePeriod = 5 * time.Second

// workerShutdownGracePeriod is the maximum time we will wait for workers to stop
// gracefully when the server shuts down
workerShutdownGracePeriod = 5 * time.Second

// defaultConsulDiscoveryInterval is how often to poll Consul for new
// servers if there is no leader.
defaultConsulDiscoveryInterval time.Duration = 3 * time.Second
Expand Down Expand Up @@ -265,6 +270,10 @@ type Server struct {
// aclCache is used to maintain the parsed ACL objects
aclCache *structs.ACLCache[*acl.ACL]

// workerShutdownGroup tracks the running worker goroutines so that Shutdown()
// can wait on their completion
workerShutdownGroup group.Group

// oidcProviderCache maintains a cache of OIDC providers. This is useful as
// the provider performs background HTTP requests. When the Nomad server is
// shutting down, the oidcProviderCache.Shutdown() function must be called.
Expand Down Expand Up @@ -299,16 +308,6 @@ type Server struct {
// configuration, potentially returning an error
func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntries consul.ConfigAPI, consulACLs consul.ACLsAPI) (*Server, error) {

// Create an eval broker
evalBroker, err := NewEvalBroker(
config.EvalNackTimeout,
config.EvalNackInitialReenqueueDelay,
config.EvalNackSubsequentReenqueueDelay,
config.EvalDeliveryLimit)
if err != nil {
return nil, err
}

// Configure TLS
tlsConf, err := tlsutil.NewTLSConfiguration(config.TLSConfig, true, true)
if err != nil {
Expand Down Expand Up @@ -347,9 +346,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr
reconcileCh: make(chan serf.Member, 32),
readyForConsistentReads: &atomic.Bool{},
eventCh: make(chan serf.Event, 256),
evalBroker: evalBroker,
reapCancelableEvalsCh: make(chan struct{}),
blockedEvals: NewBlockedEvals(evalBroker, logger),
rpcTLS: incomingTLS,
aclCache: aclCache,
workersEventCh: make(chan interface{}, 1),
Expand All @@ -358,6 +355,21 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr
s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background())
s.shutdownCh = s.shutdownCtx.Done()

// Create an eval broker
evalBroker, err := NewEvalBroker(
s.shutdownCtx,
config.EvalNackTimeout,
config.EvalNackInitialReenqueueDelay,
config.EvalNackSubsequentReenqueueDelay,
config.EvalDeliveryLimit)
if err != nil {
return nil, err
}
s.evalBroker = evalBroker

// Create the blocked evals
s.blockedEvals = NewBlockedEvals(s.evalBroker, s.logger)

// Create the RPC handler
s.rpcHandler = newRpcHandler(s)

Expand Down Expand Up @@ -456,7 +468,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr

// Start the eval broker notification system so any subscribers can get
// updates when the processes SetEnabled is triggered.
go s.evalBroker.enabledNotifier.Run(s.shutdownCh)
go s.evalBroker.enabledNotifier.Run()

// Setup the node drainer.
s.setupNodeDrainer()
Expand Down Expand Up @@ -673,6 +685,13 @@ func (s *Server) Shutdown() error {
s.shutdown = true
s.shutdownCancel()

s.workerLock.Lock()
defer s.workerLock.Unlock()
s.stopOldWorkers(s.workers)
workerShutdownTimeoutCtx, cancelWorkerShutdownTimeoutCtx := context.WithTimeout(context.Background(), workerShutdownGracePeriod)
defer cancelWorkerShutdownTimeoutCtx()
s.workerShutdownGroup.WaitWithContext(workerShutdownTimeoutCtx)

if s.serf != nil {
s.serf.Shutdown()
}
Expand Down Expand Up @@ -1743,7 +1762,7 @@ func (s *Server) setupWorkersLocked(ctx context.Context, poolArgs SchedulerWorke
return err
} else {
s.logger.Debug("started scheduling worker", "id", w.ID(), "index", i+1, "of", s.config.NumSchedulers)

s.workerShutdownGroup.AddCh(w.ShutdownCh())
s.workers = append(s.workers, w)
}
}
Expand Down
Loading

0 comments on commit 5c23032

Please sign in to comment.