Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nomad: fix race in Bootstrapped access #14120

Merged
merged 2 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ func DefaultRPCAddr() *net.TCPAddr {

// Config is used to parameterize the server
type Config struct {
// Bootstrapped indicates if Server has bootstrapped or not.
// Its value must be 0 (not bootstrapped) or 1 (bootstrapped).
// All operations on Bootstrapped must be handled via `atomic.*Int32()` calls
Bootstrapped int32

// BootstrapExpect mode is used to automatically bring up a
// collection of Nomad servers. This can be used to automatically
// bring up a collection of nodes.
Expand Down
9 changes: 4 additions & 5 deletions nomad/serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package nomad

import (
"strings"
"sync/atomic"
"time"

log "github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -83,7 +82,7 @@ func (s *Server) nodeJoin(me serf.MemberEvent) {
s.peerLock.Unlock()

// If we still expecting to bootstrap, may need to handle this
if s.config.BootstrapExpect != 0 && atomic.LoadInt32(&s.config.Bootstrapped) == 0 {
if s.config.BootstrapExpect != 0 && !s.bootstrapped.Load() {
s.maybeBootstrap()
}
}
Expand Down Expand Up @@ -117,7 +116,7 @@ func (s *Server) maybeBootstrap() {
// Bootstrap can only be done if there are no committed logs,
// remove our expectations of bootstrapping
if index != 0 {
atomic.StoreInt32(&s.config.Bootstrapped, 1)
s.bootstrapped.Store(true)
return
}

Expand Down Expand Up @@ -188,7 +187,7 @@ func (s *Server) maybeBootstrap() {
if len(peers) > 0 {
s.logger.Info("disabling bootstrap mode because existing Raft peers being reported by peer",
"peer_name", server.Name, "peer_address", server.Addr)
atomic.StoreInt32(&s.config.Bootstrapped, 1)
s.bootstrapped.Store(true)
return
}
}
Expand Down Expand Up @@ -230,7 +229,7 @@ func (s *Server) maybeBootstrap() {
}

// Bootstrapping complete, or failed for some reason, don't enter this again
atomic.StoreInt32(&s.config.Bootstrapped, 1)
s.bootstrapped.Store(true)
}

// nodeFailed is used to handle fail events on the serf cluster
Expand Down
3 changes: 1 addition & 2 deletions nomad/serf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"path"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -413,7 +412,7 @@ func TestNomad_NonBootstraping_ShouldntBootstap(t *testing.T) {
s1.maybeBootstrap()
time.Sleep(100 * time.Millisecond)

bootstrapped := atomic.LoadInt32(&s1.config.Bootstrapped)
bootstrapped := s1.bootstrapped.Load()
require.Zero(t, bootstrapped, "expecting non-bootstrapped servers")

p, _ := s1.numPeers()
Expand Down
49 changes: 27 additions & 22 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,16 @@ type Server struct {
// and automatic clustering within regions.
serf *serf.Serf

// bootstrapped indicates if Server has bootstrapped or not.
bootstrapped *atomic.Bool

// reconcileCh is used to pass events from the serf handler
// into the leader manager. Mostly used to handle when servers
// join/leave from the region.
reconcileCh chan serf.Member

// used to track when the server is ready to serve consistent reads, updated atomically
readyForConsistentReads int32
readyForConsistentReads *atomic.Bool

// eventCh is used to receive events from the serf cluster
eventCh chan serf.Event
Expand Down Expand Up @@ -341,24 +344,26 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr

// Create the server
s := &Server{
config: config,
consulCatalog: consulCatalog,
connPool: pool.NewPool(logger, serverRPCCache, serverMaxStreams, tlsWrap),
logger: logger,
tlsWrap: tlsWrap,
rpcServer: rpc.NewServer(),
streamingRpcs: structs.NewStreamingRpcRegistry(),
nodeConns: make(map[string][]*nodeConnState),
peers: make(map[string][]*serverParts),
localPeers: make(map[raft.ServerAddress]*serverParts),
reassertLeaderCh: make(chan chan error),
reconcileCh: make(chan serf.Member, 32),
eventCh: make(chan serf.Event, 256),
evalBroker: evalBroker,
blockedEvals: NewBlockedEvals(evalBroker, logger),
rpcTLS: incomingTLS,
aclCache: aclCache,
workersEventCh: make(chan interface{}, 1),
config: config,
consulCatalog: consulCatalog,
connPool: pool.NewPool(logger, serverRPCCache, serverMaxStreams, tlsWrap),
logger: logger,
tlsWrap: tlsWrap,
rpcServer: rpc.NewServer(),
streamingRpcs: structs.NewStreamingRpcRegistry(),
nodeConns: make(map[string][]*nodeConnState),
peers: make(map[string][]*serverParts),
localPeers: make(map[raft.ServerAddress]*serverParts),
bootstrapped: &atomic.Bool{},
reassertLeaderCh: make(chan chan error),
reconcileCh: make(chan serf.Member, 32),
readyForConsistentReads: &atomic.Bool{},
eventCh: make(chan serf.Event, 256),
evalBroker: evalBroker,
blockedEvals: NewBlockedEvals(evalBroker, logger),
rpcTLS: incomingTLS,
aclCache: aclCache,
workersEventCh: make(chan interface{}, 1),
}

s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -1894,17 +1899,17 @@ func (s *Server) getLeaderAcl() string {

// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
func (s *Server) setConsistentReadReady() {
atomic.StoreInt32(&s.readyForConsistentReads, 1)
s.readyForConsistentReads.Store(true)
}

// Atomically reset readiness state flag on leadership revoke
func (s *Server) resetConsistentReadReady() {
atomic.StoreInt32(&s.readyForConsistentReads, 0)
s.readyForConsistentReads.Store(false)
}

// Returns true if this server is ready to serve consistent reads
func (s *Server) isReadyForConsistentReads() bool {
return atomic.LoadInt32(&s.readyForConsistentReads) == 1
return s.readyForConsistentReads.Load()
}

// Regions returns the known regions in the cluster.
Expand Down