Skip to content

Commit

Permalink
Handle Nomad leadership flapping
Browse files Browse the repository at this point in the history
Fixes a deadlock in leadership handling if leadership flapped.

Raft propagates leadership transition to Nomad through a NotifyCh channel.
Raft blocks when writing to this channel, so channel must be buffered or
aggressively consumed[1]. Otherwise, Raft blocks indefinitely in `raft.runLeader`
until the channel is consumed[1] and does not move on to executing follower
related logic (in `raft.runFollower`).

While Raft `runLeader` defer function blocks, raft cannot process any other
raft operations.  For example, `run{Leader|Follower}` methods consume
`raft.applyCh`, and while runLeader defer is blocked, all raft log applications
or config lookup will block indefinitely.

Sadly, `leaderLoop` and `establishLeader` makes few Raft calls!
`establishLeader` attempts to auto-create autopilot/scheduler config [3]; and
`leaderLoop` attempts to check raft configuration [4].  All of these calls occur
without a timeout.

Thus, if leadership flapped quickly while `leaderLoop/establishLeadership` is
invoked and hit any of these Raft calls, Raft handler _deadlock_ forever.

Depending on how many times it flapped and where exactly we get stuck, I suspect
it's possible to get in the following case:

* Agent metrics/stats http and RPC calls hang as they check raft.Configurations
* raft.State remains in Leader state, and server attempts to handle RPC calls
  (e.g. node/alloc updates) and these hang as well

As we create goroutines per RPC call, the number of goroutines grow over time
and may trigger a out of memory errors in addition to missed updates.

[1] https://github.com/hashicorp/raft/blob/d90d6d6bdacf1b35d66940b07be515b074d89e88/config.go#L190-L193
[2] https://github.com/hashicorp/raft/blob/d90d6d6bdacf1b35d66940b07be515b074d89e88/raft.go#L425-L436
[3] https://github.com/hashicorp/nomad/blob/2a89e477465adbe6a88987f0dcb9fe80145d7b2f/nomad/leader.go#L198-L202
[4] https://github.com/hashicorp/nomad/blob/2a89e477465adbe6a88987f0dcb9fe80145d7b2f/nomad/leader.go#L877
  • Loading branch information
Mahmood Ali committed Jan 22, 2020
1 parent ccd9c14 commit 2810bf3
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 3 deletions.
21 changes: 20 additions & 1 deletion nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,29 @@ func (s *Server) monitorLeadership() {
}
}

wasLeader := false
for {
select {
case isLeader := <-s.leaderCh:
leaderStep(isLeader)
if wasLeader != isLeader {
wasLeader = isLeader
// normal case where we went through a transition
leaderStep(isLeader)
} else if wasLeader && isLeader {
// Server lost but then gained leadership immediately.
// During this time, this server may have received
// Raft transitions that haven't been applied to the FSM
// yet.
// Ensure that that FSM caught up and eval queues are refreshed
s.logger.Error("cluster leadership flapped, lost and gained leadership immediately. Leadership flaps indicate a cluster wide problems (e.g. networking).")

leaderStep(false)
leaderStep(true)
} else {
// Server gained but lost leadership immediately
// before it reacted; nothing to do, move on
s.logger.Error("cluster leadership flapped, gained and lost leadership immediately. Leadership flaps indicate a cluster wide problems (e.g. networking).")
}
case <-s.shutdownCh:
return
}
Expand Down
4 changes: 2 additions & 2 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,10 +1234,10 @@ func (s *Server) setupRaft() error {
}
}

// Setup the leader channel
// Setup the leader channel; that keeps the latest leadership alone
leaderCh := make(chan bool, 1)
s.config.RaftConfig.NotifyCh = leaderCh
s.leaderCh = leaderCh
s.leaderCh = dropButLastChannel(leaderCh, s.shutdownCh)

// Setup the Raft store
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans)
Expand Down
65 changes: 65 additions & 0 deletions nomad/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,68 @@ func getAlloc(state AllocGetter, allocID string) (*structs.Allocation, error) {

return alloc, nil
}

// dropButLastChannel returns a channel that drops all but last value from sourceCh.
//
// Useful for aggressively consuming sourceCh when intermediate values aren't relevant.
//
// This function propagates values to result quickly and drops intermediate messages
// in best effort basis. Golang scheduler may delay delivery or result in extra
// deliveries.
//
// Consider this function for example:
//
// ```
// src := make(chan bool)
// dst := dropButLastChannel(src, nil)
//
// go func() {
// src <- true
// src <- false
// }()
//
// // v can be `true` here but is very unlikely
// v := <-dst
// ```
//
func dropButLastChannel(sourceCh <-chan bool, shutdownCh <-chan struct{}) chan bool {
// buffer the most recent result
dst := make(chan bool)

go func() {
lv := false

DEQUE_SOURCE:
// wait for first message
select {
case lv = <-sourceCh:
goto ENQUEUE_DST
case <-shutdownCh:
return
}

ENQUEUE_DST:
// prioritize draining source first dequeue without blocking
for {
select {
case lv = <-sourceCh:
default:
break ENQUEUE_DST
}
}

// attempt to enqueue but keep monitoring source channel
select {
case lv = <-sourceCh:
goto ENQUEUE_DST
case dst <- lv:
// enqueued value; back to dequeing from source
goto DEQUE_SOURCE
case <-shutdownCh:
return
}
}()

return dst

}
72 changes: 72 additions & 0 deletions nomad/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"net"
"reflect"
"testing"
"time"

version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper/uuid"
Expand Down Expand Up @@ -258,3 +259,74 @@ func TestMaxUint64(t *testing.T) {
t.Fatalf("bad")
}
}

func TestDropButLastChannelDropsValues(t *testing.T) {
sourceCh := make(chan bool)
shutdownCh := make(chan struct{})

dstCh := dropButLastChannel(sourceCh, shutdownCh)

// timeout duration for any channel propagation delay
timeoutDuration := 5 * time.Millisecond

// test that dstCh doesn't emit anything initially
select {
case <-dstCh:
require.Fail(t, "received a message unexpectedly")
case <-time.After(timeoutDuration):
// yay no message - it could have been a default: but
// checking for goroutine effect
}

sourceCh <- false
select {
case v := <-dstCh:
require.False(t, v, "unexpected value from dstCh Ch")
case <-time.After(timeoutDuration):
require.Fail(t, "timed out waiting for source->dstCh propagation")
}

// channel is drained now
select {
case v := <-dstCh:
require.Failf(t, "received a message unexpectedly", "value: %v", v)
case <-time.After(timeoutDuration):
// yay no message - it could have been a default: but
// checking for goroutine effect
}

// now enqueue many messages and ensure only last one is received
// enqueueing should be fast!
sourceCh <- false
sourceCh <- false
sourceCh <- false
sourceCh <- false
sourceCh <- true

// I suspect that dstCh may contain a stale (i.e. `false`) value if golang executes
// this select before the implementation goroutine dequeues last value.
//
// However, never got it to fail in test - so leaving it now to see if it ever fails;
// and if/when test fails, we can learn of how much of an issue it is and adjust
select {
case v := <-dstCh:
require.True(t, v, "unexpected value from dstCh Ch")
case <-time.After(timeoutDuration):
require.Fail(t, "timed out waiting for source->dstCh propagation")
}

sourceCh <- true
sourceCh <- true
sourceCh <- true
sourceCh <- true
sourceCh <- true
sourceCh <- false
select {
case v := <-dstCh:
require.False(t, v, "unexpected value from dstCh Ch")
case <-time.After(timeoutDuration):
require.Fail(t, "timed out waiting for source->dstCh propagation")
}

close(shutdownCh)
}

0 comments on commit 2810bf3

Please sign in to comment.