diff --git a/.changelog/19560.txt b/.changelog/19560.txt new file mode 100644 index 00000000000..03a48506b96 --- /dev/null +++ b/.changelog/19560.txt @@ -0,0 +1,3 @@ +```release-note:bug +server: Fix server not waiting for workers to submit nacks for dequeued evaluations before shutting down +``` diff --git a/client/client.go b/client/client.go index 18f5532a568..ec1004df706 100644 --- a/client/client.go +++ b/client/client.go @@ -48,6 +48,7 @@ import ( "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/envoy" + "github.com/hashicorp/nomad/helper/group" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/pool" hstats "github.com/hashicorp/nomad/helper/stats" @@ -255,7 +256,7 @@ type Client struct { // shutdownGroup are goroutines that exit when shutdownCh is closed. // Shutdown() blocks on Wait() after closing shutdownCh. - shutdownGroup group + shutdownGroup group.Group // tokensClient is Nomad Client's custom Consul client for requesting Consul // Service Identity tokens through Nomad Server. @@ -840,7 +841,7 @@ func (c *Client) Shutdown() error { // Stop Garbage collector c.garbageCollector.Stop() - arGroup := group{} + arGroup := group.Group{} if c.GetConfig().DevMode { // In DevMode destroy all the running allocations. for _, ar := range c.getAllocRunners() { @@ -3354,33 +3355,6 @@ func (c *Client) GetTaskEventHandler(allocID, taskName string) drivermanager.Eve return nil } -// group wraps a func() in a goroutine and provides a way to block until it -// exits. Inspired by https://godoc.org/golang.org/x/sync/errgroup -type group struct { - wg sync.WaitGroup -} - -// Go starts f in a goroutine and must be called before Wait. -func (g *group) Go(f func()) { - g.wg.Add(1) - go func() { - defer g.wg.Done() - f() - }() -} - -func (g *group) AddCh(ch <-chan struct{}) { - g.Go(func() { - <-ch - }) -} - -// Wait for all goroutines to exit. Must be called after all calls to Go -// complete. -func (g *group) Wait() { - g.wg.Wait() -} - // pendingClientUpdates are the set of allocation updates that the client is // waiting to send type pendingClientUpdates struct { diff --git a/helper/broker/notify.go b/helper/broker/notify.go index da2fda4c0e0..7a3f400e696 100644 --- a/helper/broker/notify.go +++ b/helper/broker/notify.go @@ -4,6 +4,7 @@ package broker import ( + "context" "time" "github.com/hashicorp/nomad/helper" @@ -21,15 +22,18 @@ type GenericNotifier struct { // subscription membership mapping. subscribeCh chan chan interface{} unsubscribeCh chan chan interface{} + + ctx context.Context } // NewGenericNotifier returns a generic notifier which can be used by a process // to notify many subscribers when a specific update is triggered. -func NewGenericNotifier() *GenericNotifier { +func NewGenericNotifier(ctx context.Context) *GenericNotifier { return &GenericNotifier{ publishCh: make(chan interface{}, 1), subscribeCh: make(chan chan interface{}, 1), unsubscribeCh: make(chan chan interface{}, 1), + ctx: ctx, } } @@ -46,7 +50,7 @@ func (g *GenericNotifier) Notify(msg interface{}) { // Run is a long-lived process which handles updating subscribers as well as // ensuring any update is sent to them. The passed stopCh is used to coordinate // shutdown. -func (g *GenericNotifier) Run(stopCh <-chan struct{}) { +func (g *GenericNotifier) Run() { // Store our subscribers inline with a map. This map can only be accessed // via a single channel update at a time, meaning we can manage without @@ -55,7 +59,7 @@ func (g *GenericNotifier) Run(stopCh <-chan struct{}) { for { select { - case <-stopCh: + case <-g.ctx.Done(): return case msgCh := <-g.subscribeCh: subscribers[msgCh] = struct{}{} @@ -83,7 +87,11 @@ func (g *GenericNotifier) WaitForChange(timeout time.Duration) interface{} { // Create a channel and subscribe to any update. This channel is buffered // to ensure we do not block the main broker process. updateCh := make(chan interface{}, 1) - g.subscribeCh <- updateCh + select { + case <-g.ctx.Done(): + return "shutting down" + case g.subscribeCh <- updateCh: + } // Create a timeout timer and use the helper to ensure this routine doesn't // panic and making the stop call clear. @@ -93,7 +101,10 @@ func (g *GenericNotifier) WaitForChange(timeout time.Duration) interface{} { // subscriber once it has been notified of a change, or reached its wait // timeout. defer func() { - g.unsubscribeCh <- updateCh + select { + case <-g.ctx.Done(): + case g.unsubscribeCh <- updateCh: + } close(updateCh) timeoutStop() }() @@ -101,6 +112,8 @@ func (g *GenericNotifier) WaitForChange(timeout time.Duration) interface{} { // Enter the main loop which listens for an update or timeout and returns // this information to the subscriber. select { + case <-g.ctx.Done(): + return "shutting down" case <-timeoutTimer.C: return "wait timed out after " + timeout.String() case update := <-updateCh: diff --git a/helper/broker/notify_test.go b/helper/broker/notify_test.go index 4d80c8fa1be..69b55d8082a 100644 --- a/helper/broker/notify_test.go +++ b/helper/broker/notify_test.go @@ -4,6 +4,7 @@ package broker import ( + "context" "sync" "testing" "time" @@ -16,11 +17,11 @@ func TestGenericNotifier(t *testing.T) { ci.Parallel(t) // Create the new notifier. - stopChan := make(chan struct{}) - defer close(stopChan) + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() - notifier := NewGenericNotifier() - go notifier.Run(stopChan) + notifier := NewGenericNotifier(ctx) + go notifier.Run() // Ensure we have buffered channels. require.Equal(t, 1, cap(notifier.publishCh)) diff --git a/helper/group/group.go b/helper/group/group.go new file mode 100644 index 00000000000..8803aa31224 --- /dev/null +++ b/helper/group/group.go @@ -0,0 +1,50 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package group + +import ( + "context" + "sync" +) + +// group wraps a func() in a goroutine and provides a way to block until it +// exits. Inspired by https://godoc.org/golang.org/x/sync/errgroup +type Group struct { + wg sync.WaitGroup +} + +// Go starts f in a goroutine and must be called before Wait. +func (g *Group) Go(f func()) { + g.wg.Add(1) + go func() { + defer g.wg.Done() + f() + }() +} + +func (g *Group) AddCh(ch <-chan struct{}) { + g.Go(func() { + <-ch + }) +} + +// Wait for all goroutines to exit. Must be called after all calls to Go +// complete. +func (g *Group) Wait() { + g.wg.Wait() +} + +// Wait for all goroutines to exit, or for the context to finish. +// Must be called after all calls to Go complete. +func (g *Group) WaitWithContext(ctx context.Context) { + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + g.Wait() + }() + select { + case <-doneCh: + case <-ctx.Done(): + } +} diff --git a/helper/raftutil/fsm.go b/helper/raftutil/fsm.go index 249f46b6cb5..f39016c8969 100644 --- a/helper/raftutil/fsm.go +++ b/helper/raftutil/fsm.go @@ -4,6 +4,7 @@ package raftutil import ( + "context" "fmt" "io" "path/filepath" @@ -79,7 +80,7 @@ func dummyFSM(logger hclog.Logger) (nomadFSM, error) { // use dummy non-enabled FSM dependencies periodicDispatch := nomad.NewPeriodicDispatch(logger, nil) blockedEvals := nomad.NewBlockedEvals(nil, logger) - evalBroker, err := nomad.NewEvalBroker(1, 1, 1, 1) + evalBroker, err := nomad.NewEvalBroker(context.Background(), 1, 1, 1, 1) if err != nil { return nil, err } diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index bee38f9cc9d..249d9a1e4a1 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -137,7 +137,7 @@ type PendingEvaluations []*structs.Evaluation // initialNackDelay is the delay before making a Nacked evaluation available // again for the first Nack and subsequentNackDelay is the compounding delay // after the first Nack. -func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, deliveryLimit int) (*EvalBroker, error) { +func NewEvalBroker(ctx context.Context, timeout, initialNackDelay, subsequentNackDelay time.Duration, deliveryLimit int) (*EvalBroker, error) { if timeout < 0 { return nil, fmt.Errorf("timeout cannot be negative") } @@ -145,7 +145,7 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, nackTimeout: timeout, deliveryLimit: deliveryLimit, enabled: false, - enabledNotifier: broker.NewGenericNotifier(), + enabledNotifier: broker.NewGenericNotifier(ctx), stats: new(BrokerStats), evals: make(map[string]int), jobEvals: make(map[structs.NamespacedID]string), diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index 53a0bd5815e..5245aa39220 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -5,6 +5,7 @@ package nomad import ( "container/heap" + "context" "encoding/json" "errors" "fmt" @@ -54,7 +55,7 @@ func testBroker(t *testing.T, timeout time.Duration) *EvalBroker { } func testBrokerFromConfig(t *testing.T, c *Config) *EvalBroker { - b, err := NewEvalBroker(c.EvalNackTimeout, c.EvalNackInitialReenqueueDelay, c.EvalNackSubsequentReenqueueDelay, 3) + b, err := NewEvalBroker(context.Background(), c.EvalNackTimeout, c.EvalNackInitialReenqueueDelay, c.EvalNackSubsequentReenqueueDelay, 3) if err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/server.go b/nomad/server.go index 477a6bd40d8..bbc897426f9 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -35,6 +35,7 @@ import ( "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/codec" + "github.com/hashicorp/nomad/helper/group" "github.com/hashicorp/nomad/helper/pool" "github.com/hashicorp/nomad/helper/stats" "github.com/hashicorp/nomad/helper/tlsutil" @@ -83,6 +84,10 @@ const ( // to replicate to gracefully leave the cluster. raftRemoveGracePeriod = 5 * time.Second + // workerShutdownGracePeriod is the maximum time we will wait for workers to stop + // gracefully when the server shuts down + workerShutdownGracePeriod = 5 * time.Second + // defaultConsulDiscoveryInterval is how often to poll Consul for new // servers if there is no leader. defaultConsulDiscoveryInterval time.Duration = 3 * time.Second @@ -265,6 +270,10 @@ type Server struct { // aclCache is used to maintain the parsed ACL objects aclCache *structs.ACLCache[*acl.ACL] + // workerShutdownGroup tracks the running worker goroutines so that Shutdown() + // can wait on their completion + workerShutdownGroup group.Group + // oidcProviderCache maintains a cache of OIDC providers. This is useful as // the provider performs background HTTP requests. When the Nomad server is // shutting down, the oidcProviderCache.Shutdown() function must be called. @@ -299,16 +308,6 @@ type Server struct { // configuration, potentially returning an error func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntries consul.ConfigAPI, consulACLs consul.ACLsAPI) (*Server, error) { - // Create an eval broker - evalBroker, err := NewEvalBroker( - config.EvalNackTimeout, - config.EvalNackInitialReenqueueDelay, - config.EvalNackSubsequentReenqueueDelay, - config.EvalDeliveryLimit) - if err != nil { - return nil, err - } - // Configure TLS tlsConf, err := tlsutil.NewTLSConfiguration(config.TLSConfig, true, true) if err != nil { @@ -347,9 +346,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr reconcileCh: make(chan serf.Member, 32), readyForConsistentReads: &atomic.Bool{}, eventCh: make(chan serf.Event, 256), - evalBroker: evalBroker, reapCancelableEvalsCh: make(chan struct{}), - blockedEvals: NewBlockedEvals(evalBroker, logger), rpcTLS: incomingTLS, aclCache: aclCache, workersEventCh: make(chan interface{}, 1), @@ -358,6 +355,21 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background()) s.shutdownCh = s.shutdownCtx.Done() + // Create an eval broker + evalBroker, err := NewEvalBroker( + s.shutdownCtx, + config.EvalNackTimeout, + config.EvalNackInitialReenqueueDelay, + config.EvalNackSubsequentReenqueueDelay, + config.EvalDeliveryLimit) + if err != nil { + return nil, err + } + s.evalBroker = evalBroker + + // Create the blocked evals + s.blockedEvals = NewBlockedEvals(s.evalBroker, s.logger) + // Create the RPC handler s.rpcHandler = newRpcHandler(s) @@ -456,7 +468,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr // Start the eval broker notification system so any subscribers can get // updates when the processes SetEnabled is triggered. - go s.evalBroker.enabledNotifier.Run(s.shutdownCh) + go s.evalBroker.enabledNotifier.Run() // Setup the node drainer. s.setupNodeDrainer() @@ -673,6 +685,13 @@ func (s *Server) Shutdown() error { s.shutdown = true s.shutdownCancel() + s.workerLock.Lock() + defer s.workerLock.Unlock() + s.stopOldWorkers(s.workers) + workerShutdownTimeoutCtx, cancelWorkerShutdownTimeoutCtx := context.WithTimeout(context.Background(), workerShutdownGracePeriod) + defer cancelWorkerShutdownTimeoutCtx() + s.workerShutdownGroup.WaitWithContext(workerShutdownTimeoutCtx) + if s.serf != nil { s.serf.Shutdown() } @@ -1743,7 +1762,7 @@ func (s *Server) setupWorkersLocked(ctx context.Context, poolArgs SchedulerWorke return err } else { s.logger.Debug("started scheduling worker", "id", w.ID(), "index", i+1, "of", s.config.NumSchedulers) - + s.workerShutdownGroup.AddCh(w.ShutdownCh()) s.workers = append(s.workers, w) } } diff --git a/nomad/worker.go b/nomad/worker.go index ad66f454924..b49ebf4a2dc 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -98,6 +98,9 @@ type Worker struct { workloadStatus SchedulerWorkerStatus statusLock sync.RWMutex + // shutdownCh is closed when the run function has exited + shutdownCh chan struct{} + pauseFlag bool pauseLock sync.Mutex pauseCond *sync.Cond @@ -134,6 +137,7 @@ func newWorker(ctx context.Context, srv *Server, args SchedulerWorkerPoolArgs) * srv: srv, start: time.Now(), status: WorkerStarting, + shutdownCh: make(chan struct{}), enabledSchedulers: make([]string, len(args.EnabledSchedulers)), failureBackoff: time.Duration(0), } @@ -393,6 +397,7 @@ func (w *Worker) workerShuttingDown() bool { func (w *Worker) run(raftSyncLimit time.Duration) { defer func() { w.markStopped() + close(w.shutdownCh) }() w.setStatuses(WorkerStarted, WorkloadRunning) w.logger.Debug("running") @@ -894,3 +899,7 @@ func (w *Worker) backoffErr(base, limit time.Duration) bool { func (w *Worker) backoffReset() { w.failures = 0 } + +func (w *Worker) ShutdownCh() <-chan struct{} { + return w.shutdownCh +} diff --git a/nomad/worker_test.go b/nomad/worker_test.go index a3c389b992c..777f694b147 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -57,20 +57,6 @@ func init() { } } -// NewTestWorker returns the worker without calling it's run method. -func NewTestWorker(shutdownCtx context.Context, srv *Server) *Worker { - w := &Worker{ - srv: srv, - start: time.Now(), - id: uuid.Generate(), - enabledSchedulers: srv.config.EnabledSchedulers, - } - w.logger = srv.logger.ResetNamed("worker").With("worker_id", w.id) - w.pauseCond = sync.NewCond(&w.pauseLock) - w.ctx, w.cancelFn = context.WithCancel(shutdownCtx) - return w -} - func TestWorker_dequeueEvaluation(t *testing.T) { ci.Parallel(t) @@ -364,7 +350,8 @@ func TestWorker_runBackoff(t *testing.T) { workerCtx, workerCancel := context.WithCancel(srv.shutdownCtx) defer workerCancel() - w := NewTestWorker(workerCtx, srv) + poolArgs := getSchedulerWorkerPoolArgsFromConfigLocked(srv.config).Copy() + w := newWorker(workerCtx, srv, poolArgs) doneCh := make(chan struct{}) go func() {