Skip to content

Commit

Permalink
nomad: fix race in Bootstrapped access (#14120)
Browse files Browse the repository at this point in the history
* nomad: fix race in Bootstrapped access
  • Loading branch information
schmichael authored Aug 16, 2022
1 parent 45cb95b commit 094a455
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 34 deletions.
5 changes: 0 additions & 5 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ func DefaultRPCAddr() *net.TCPAddr {

// Config is used to parameterize the server
type Config struct {
// Bootstrapped indicates if Server has bootstrapped or not.
// Its value must be 0 (not bootstrapped) or 1 (bootstrapped).
// All operations on Bootstrapped must be handled via `atomic.*Int32()` calls
Bootstrapped int32

// BootstrapExpect mode is used to automatically bring up a
// collection of Nomad servers. This can be used to automatically
// bring up a collection of nodes.
Expand Down
9 changes: 4 additions & 5 deletions nomad/serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package nomad

import (
"strings"
"sync/atomic"
"time"

log "github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -83,7 +82,7 @@ func (s *Server) nodeJoin(me serf.MemberEvent) {
s.peerLock.Unlock()

// If we still expecting to bootstrap, may need to handle this
if s.config.BootstrapExpect != 0 && atomic.LoadInt32(&s.config.Bootstrapped) == 0 {
if s.config.BootstrapExpect != 0 && !s.bootstrapped.Load() {
s.maybeBootstrap()
}
}
Expand Down Expand Up @@ -117,7 +116,7 @@ func (s *Server) maybeBootstrap() {
// Bootstrap can only be done if there are no committed logs,
// remove our expectations of bootstrapping
if index != 0 {
atomic.StoreInt32(&s.config.Bootstrapped, 1)
s.bootstrapped.Store(true)
return
}

Expand Down Expand Up @@ -188,7 +187,7 @@ func (s *Server) maybeBootstrap() {
if len(peers) > 0 {
s.logger.Info("disabling bootstrap mode because existing Raft peers being reported by peer",
"peer_name", server.Name, "peer_address", server.Addr)
atomic.StoreInt32(&s.config.Bootstrapped, 1)
s.bootstrapped.Store(true)
return
}
}
Expand Down Expand Up @@ -230,7 +229,7 @@ func (s *Server) maybeBootstrap() {
}

// Bootstrapping complete, or failed for some reason, don't enter this again
atomic.StoreInt32(&s.config.Bootstrapped, 1)
s.bootstrapped.Store(true)
}

// nodeFailed is used to handle fail events on the serf cluster
Expand Down
3 changes: 1 addition & 2 deletions nomad/serf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"path"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -413,7 +412,7 @@ func TestNomad_NonBootstraping_ShouldntBootstap(t *testing.T) {
s1.maybeBootstrap()
time.Sleep(100 * time.Millisecond)

bootstrapped := atomic.LoadInt32(&s1.config.Bootstrapped)
bootstrapped := s1.bootstrapped.Load()
require.Zero(t, bootstrapped, "expecting non-bootstrapped servers")

p, _ := s1.numPeers()
Expand Down
49 changes: 27 additions & 22 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,16 @@ type Server struct {
// and automatic clustering within regions.
serf *serf.Serf

// bootstrapped indicates if Server has bootstrapped or not.
bootstrapped *atomic.Bool

// reconcileCh is used to pass events from the serf handler
// into the leader manager. Mostly used to handle when servers
// join/leave from the region.
reconcileCh chan serf.Member

// used to track when the server is ready to serve consistent reads, updated atomically
readyForConsistentReads int32
readyForConsistentReads *atomic.Bool

// eventCh is used to receive events from the serf cluster
eventCh chan serf.Event
Expand Down Expand Up @@ -341,24 +344,26 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr

// Create the server
s := &Server{
config: config,
consulCatalog: consulCatalog,
connPool: pool.NewPool(logger, serverRPCCache, serverMaxStreams, tlsWrap),
logger: logger,
tlsWrap: tlsWrap,
rpcServer: rpc.NewServer(),
streamingRpcs: structs.NewStreamingRpcRegistry(),
nodeConns: make(map[string][]*nodeConnState),
peers: make(map[string][]*serverParts),
localPeers: make(map[raft.ServerAddress]*serverParts),
reassertLeaderCh: make(chan chan error),
reconcileCh: make(chan serf.Member, 32),
eventCh: make(chan serf.Event, 256),
evalBroker: evalBroker,
blockedEvals: NewBlockedEvals(evalBroker, logger),
rpcTLS: incomingTLS,
aclCache: aclCache,
workersEventCh: make(chan interface{}, 1),
config: config,
consulCatalog: consulCatalog,
connPool: pool.NewPool(logger, serverRPCCache, serverMaxStreams, tlsWrap),
logger: logger,
tlsWrap: tlsWrap,
rpcServer: rpc.NewServer(),
streamingRpcs: structs.NewStreamingRpcRegistry(),
nodeConns: make(map[string][]*nodeConnState),
peers: make(map[string][]*serverParts),
localPeers: make(map[raft.ServerAddress]*serverParts),
bootstrapped: &atomic.Bool{},
reassertLeaderCh: make(chan chan error),
reconcileCh: make(chan serf.Member, 32),
readyForConsistentReads: &atomic.Bool{},
eventCh: make(chan serf.Event, 256),
evalBroker: evalBroker,
blockedEvals: NewBlockedEvals(evalBroker, logger),
rpcTLS: incomingTLS,
aclCache: aclCache,
workersEventCh: make(chan interface{}, 1),
}

s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -1894,17 +1899,17 @@ func (s *Server) getLeaderAcl() string {

// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
func (s *Server) setConsistentReadReady() {
atomic.StoreInt32(&s.readyForConsistentReads, 1)
s.readyForConsistentReads.Store(true)
}

// Atomically reset readiness state flag on leadership revoke
func (s *Server) resetConsistentReadReady() {
atomic.StoreInt32(&s.readyForConsistentReads, 0)
s.readyForConsistentReads.Store(false)
}

// Returns true if this server is ready to serve consistent reads
func (s *Server) isReadyForConsistentReads() bool {
return atomic.LoadInt32(&s.readyForConsistentReads) == 1
return s.readyForConsistentReads.Load()
}

// Regions returns the known regions in the cluster.
Expand Down

0 comments on commit 094a455

Please sign in to comment.