Skip to content

Commit

Permalink
Merge pull request hashicorp#81 from hashicorp/f-stepdown
Browse files Browse the repository at this point in the history
Step down if we are processing a RemovePeer of the active leader
  • Loading branch information
armon committed Jan 19, 2016
2 parents c5adfbb + 6c6653e commit b95f335
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 7 deletions.
48 changes: 42 additions & 6 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,14 @@ func (r *Raft) startReplication(peer string) {
// leaderLoop is the hot loop for a leader. It is invoked
// after all the various leader setup is done.
func (r *Raft) leaderLoop() {
// stepDown is used to track if there is an inflight log that
// would cause us to lose leadership (specifically a RemovePeer of
// ourselves). If this is the case, we must not allow any logs to
// be processed in parallel, otherwise we are basing commit on
// only a single peer (ourself) and replicating to an undefined set
// of peers.
stepDown := false

lease := time.After(r.conf.LeaderLeaseTimeout)
for r.getState() == Leader {
select {
Expand Down Expand Up @@ -908,22 +916,38 @@ func (r *Raft) leaderLoop() {
// Handle any peer set changes
n := len(ready)
for i := 0; i < n; i++ {
// Fail all future transactions once stepDown is on
if stepDown {
ready[i].respond(ErrNotLeader)
ready[i], ready[n-1] = ready[n-1], nil
n--
i--
continue
}

// Special case AddPeer and RemovePeer
log := ready[i]
if log.log.Type != LogAddPeer && log.log.Type != LogRemovePeer {
continue
}

// Check if this log should be ignored
// Check if this log should be ignored. The logs can be
// reordered here since we have not yet assigned an index
// and are not violating any promises.
if !r.preparePeerChange(log) {
ready[i], ready[n-1] = ready[n-1], nil
n--
i--
continue
}

// Apply peer set changes early
r.processLog(&log.log, nil, true)
// Apply peer set changes early and check if we will step
// down after the commit of this log. If so, we must not
// allow any future entries to make progress to avoid undefined
// behavior.
if ok := r.processLog(&log.log, nil, true); ok {
stepDown = true
}
}

// Nothing to do if all logs are invalid
Expand Down Expand Up @@ -1129,7 +1153,8 @@ func (r *Raft) processLogs(index uint64, future *logFuture) {
}

// processLog is invoked to process the application of a single committed log.
func (r *Raft) processLog(l *Log, future *logFuture, precommit bool) {
// Returns if this log entry would cause us to stepDown after it commits.
func (r *Raft) processLog(l *Log, future *logFuture, precommit bool) (stepDown bool) {
switch l.Type {
case LogBarrier:
// Barrier is handled by the FSM
Expand Down Expand Up @@ -1158,8 +1183,18 @@ func (r *Raft) processLog(l *Log, future *logFuture, precommit bool) {
// If the peer set does not include us, remove all other peers
removeSelf := !PeerContained(peers, r.localAddr) && l.Type == LogRemovePeer
if removeSelf {
r.peers = nil
r.peerStore.SetPeers([]string{r.localAddr})
// Mark that this operation will cause us to step down as
// leader. This prevents the future logs from being Applied
// from this leader.
stepDown = true

// We only modify the peers after the commit, otherwise we
// would be using a quorum size of 1 for the RemovePeer operation.
// This is used with the stepDown guard to prevent any other logs.
if !precommit {
r.peers = nil
r.peerStore.SetPeers([]string{r.localAddr})
}
} else {
r.peers = ExcludePeer(peers, r.localAddr)
r.peerStore.SetPeers(peers)
Expand Down Expand Up @@ -1214,6 +1249,7 @@ func (r *Raft) processLog(l *Log, future *logFuture, precommit bool) {
if future != nil && !precommit {
future.respond(nil)
}
return
}

// processRPC is called to handle an incoming RPC request.
Expand Down
7 changes: 6 additions & 1 deletion raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,10 +907,15 @@ func TestRaft_RemoveLeader_NoShutdown(t *testing.T) {
// Remove the leader
var removeFuture Future
for i := byte(0); i < 100; i++ {
leader.Apply([]byte{i}, 0)
future := leader.Apply([]byte{i}, 0)
if i == 80 {
removeFuture = leader.RemovePeer(leader.localAddr)
}
if i > 80 {
if err := future.Error(); err == nil || err != ErrNotLeader {
t.Fatalf("err: %v, future entries should fail", err)
}
}
}

if err := removeFuture.Error(); err != nil {
Expand Down

0 comments on commit b95f335

Please sign in to comment.