Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block rpc handling until state store is caught up #5911

Merged
merged 1 commit into from
Aug 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
return err
}

s.setConsistentReadReady()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we need to wait till the end of this method? This method is called after the barrier write, so any reason not to toggle this at the beginning of this method rather than wait for all the other housekeeping that happens in this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was being defensive for fear of an RPC handling accessing/mutating brokers before they are enabled. I'd hope that establishing leadership is a quick call and happens rarely such that optimizing to find exact ideal place to set consistent readiness isn't worth it. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agreed that its a minor optimization, lets leave it as is


return nil
}

Expand Down Expand Up @@ -714,6 +716,8 @@ func (s *Server) iterateJobSummaryMetrics(summary *structs.JobSummary) {
func (s *Server) revokeLeadership() error {
defer metrics.MeasureSince([]string{"nomad", "leader", "revoke_leadership"}, time.Now())

s.resetConsistentReadReady()

// Clear the leader token since we are no longer the leader.
s.setLeaderAcl("")

Expand Down
19 changes: 19 additions & 0 deletions nomad/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,25 @@ func TestLeader_RevokeLeadership_MultipleTimes(t *testing.T) {
require.Nil(t, s1.revokeLeadership())
}

func TestLeader_TransitionsUpdateConsistencyRead(t *testing.T) {
s1 := TestServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

testutil.WaitForResult(func() (bool, error) {
return s1.isReadyForConsistentReads(), nil
}, func(err error) {
require.Fail(t, "should have finished establish leader loop")
})

require.Nil(t, s1.revokeLeadership())
require.False(t, s1.isReadyForConsistentReads())

ch := make(chan struct{})
require.Nil(t, s1.establishLeadership(ch))
require.True(t, s1.isReadyForConsistentReads())
}

// Test doing an inplace upgrade on a server from raft protocol 2 to 3
// This verifies that removing the server and adding it back with a uuid works
// even if the server's address stays the same.
Expand Down
8 changes: 6 additions & 2 deletions nomad/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ CHECK_LEADER:
isLeader, remoteServer := r.getLeader()

// Handle the case we are the leader
if isLeader {
if isLeader && r.Server.isReadyForConsistentReads() {
return false, nil
}

Expand All @@ -457,7 +457,11 @@ CHECK_LEADER:
}
}

// No leader found and hold time exceeded
// hold time exceeeded without being ready to respond
if isLeader {
return true, structs.ErrNotReadyForConsistentReads
}

return true, structs.ErrNoLeader
}

Expand Down
41 changes: 41 additions & 0 deletions nomad/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,47 @@ func TestRPC_forwardLeader(t *testing.T) {
}
}

func TestRPC_WaitForConsistentReads(t *testing.T) {
t.Parallel()
s1 := TestServer(t, func(c *Config) {
c.RPCHoldTimeout = 20 * time.Millisecond
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

isLeader, _ := s1.getLeader()
require.True(t, isLeader)
require.True(t, s1.isReadyForConsistentReads())

s1.resetConsistentReadReady()
require.False(t, s1.isReadyForConsistentReads())

codec := rpcClient(t, s1)

get := &structs.JobListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: "default",
},
}

// check timeout while waiting for consistency
var resp structs.JobListResponse
err := msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp)
require.Error(t, err)
require.Contains(t, err.Error(), structs.ErrNotReadyForConsistentReads.Error())

// check we wait and block
go func() {
time.Sleep(5 * time.Millisecond)
s1.setConsistentReadReady()
}()

err = msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp)
require.NoError(t, err)

}

func TestRPC_forwardRegion(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
Expand Down
18 changes: 18 additions & 0 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ type Server struct {
// join/leave from the region.
reconcileCh chan serf.Member

// used to track when the server is ready to serve consistent reads, updated atomically
readyForConsistentReads int32

// eventCh is used to receive events from the serf cluster
eventCh chan serf.Event

Expand Down Expand Up @@ -1400,6 +1403,21 @@ func (s *Server) getLeaderAcl() string {
return s.leaderAcl
}

// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
func (s *Server) setConsistentReadReady() {
atomic.StoreInt32(&s.readyForConsistentReads, 1)
}

// Atomically reset readiness state flag on leadership revoke
func (s *Server) resetConsistentReadReady() {
atomic.StoreInt32(&s.readyForConsistentReads, 0)
}

// Returns true if this server is ready to serve consistent reads
func (s *Server) isReadyForConsistentReads() bool {
return atomic.LoadInt32(&s.readyForConsistentReads) == 1
}

// Regions returns the known regions in the cluster.
func (s *Server) Regions() []string {
s.peerLock.RLock()
Expand Down
34 changes: 18 additions & 16 deletions nomad/structs/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import (
)

const (
errNoLeader = "No cluster leader"
errNoRegionPath = "No path to region"
errTokenNotFound = "ACL token not found"
errPermissionDenied = "Permission denied"
errNoNodeConn = "No path to node"
errUnknownMethod = "Unknown rpc method"
errUnknownNomadVersion = "Unable to determine Nomad version"
errNodeLacksRpc = "Node does not support RPC; requires 0.8 or later"
errNoLeader = "No cluster leader"
errNotReadyForConsistentReads = "Not ready to serve consistent reads"
errNoRegionPath = "No path to region"
errTokenNotFound = "ACL token not found"
errPermissionDenied = "Permission denied"
errNoNodeConn = "No path to node"
errUnknownMethod = "Unknown rpc method"
errUnknownNomadVersion = "Unable to determine Nomad version"
errNodeLacksRpc = "Node does not support RPC; requires 0.8 or later"

// Prefix based errors that are used to check if the error is of a given
// type. These errors should be created with the associated constructor.
Expand All @@ -26,14 +27,15 @@ const (
)

var (
ErrNoLeader = errors.New(errNoLeader)
ErrNoRegionPath = errors.New(errNoRegionPath)
ErrTokenNotFound = errors.New(errTokenNotFound)
ErrPermissionDenied = errors.New(errPermissionDenied)
ErrNoNodeConn = errors.New(errNoNodeConn)
ErrUnknownMethod = errors.New(errUnknownMethod)
ErrUnknownNomadVersion = errors.New(errUnknownNomadVersion)
ErrNodeLacksRpc = errors.New(errNodeLacksRpc)
ErrNoLeader = errors.New(errNoLeader)
ErrNotReadyForConsistentReads = errors.New(errNotReadyForConsistentReads)
ErrNoRegionPath = errors.New(errNoRegionPath)
ErrTokenNotFound = errors.New(errTokenNotFound)
ErrPermissionDenied = errors.New(errPermissionDenied)
ErrNoNodeConn = errors.New(errNoNodeConn)
ErrUnknownMethod = errors.New(errUnknownMethod)
ErrUnknownNomadVersion = errors.New(errUnknownNomadVersion)
ErrNodeLacksRpc = errors.New(errNodeLacksRpc)
)

// IsErrNoLeader returns whether the error is due to there being no leader.
Expand Down