Skip to content

Commit

Permalink
use separate channel for peer changes
Browse files Browse the repository at this point in the history
  • Loading branch information
superfell committed Apr 7, 2016
1 parent bf026d7 commit 2c24fd1
Showing 1 changed file with 36 additions and 50 deletions.
86 changes: 36 additions & 50 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,10 @@ type Raft struct {
logs LogStore

// Track our known peers
peerCh chan *peerFuture
peers []string
peerStore PeerStore
peerChangeCh chan *logFuture
peerCh chan *peerFuture
peers []string
peerStore PeerStore

// RPC chan comes from the transport layer
rpcCh <-chan RPC
Expand Down Expand Up @@ -218,6 +219,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
localAddr: localAddr,
logger: logger,
logs: logs,
peerChangeCh: make(chan *logFuture),
peerCh: make(chan *peerFuture),
peers: peers,
peerStore: peerStore,
Expand Down Expand Up @@ -370,7 +372,7 @@ func (r *Raft) AddPeer(peer string) Future {
}
logFuture.init()
select {
case r.applyCh <- logFuture:
case r.peerChangeCh <- logFuture:
return logFuture
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
Expand All @@ -389,7 +391,7 @@ func (r *Raft) RemovePeer(peer string) Future {
}
logFuture.init()
select {
case r.applyCh <- logFuture:
case r.peerChangeCh <- logFuture:
return logFuture
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
Expand Down Expand Up @@ -630,6 +632,10 @@ func (r *Raft) runFollower() {
case rpc := <-r.rpcCh:
r.processRPC(rpc)

case c := <-r.peerChangeCh:
// Reject any operations since we are not the leader
c.respond(ErrNotLeader)

case a := <-r.applyCh:
// Reject any operations since we are not the leader
a.respond(ErrNotLeader)
Expand Down Expand Up @@ -716,6 +722,10 @@ func (r *Raft) runCandidate() {
return
}

case c := <-r.peerChangeCh:
// Reject any operations since we are not the leader
c.respond(ErrNotLeader)

case a := <-r.applyCh:
// Reject any operations since we are not the leader
a.respond(ErrNotLeader)
Expand Down Expand Up @@ -939,46 +949,8 @@ func (r *Raft) leaderLoop() {
case p := <-r.peerCh:
p.respond(ErrLeader)

case newLog := <-r.applyCh:
// Group commit, gather all the ready commits
ready := []*logFuture{newLog}
for i := 0; i < r.conf.MaxAppendEntries; i++ {
select {
case newLog := <-r.applyCh:
ready = append(ready, newLog)
default:
break
}
}

// Handle any peer set changes
n := len(ready)
for i := 0; i < n; i++ {
// Fail all future transactions once stepDown is on
if stepDown {
ready[i].respond(ErrNotLeader)
ready[i], ready[n-1] = ready[n-1], nil
n--
i--
continue
}

// Special case AddPeer and RemovePeer
log := ready[i]
if log.log.Type != LogAddPeer && log.log.Type != LogRemovePeer {
continue
}

// Check if this log should be ignored. The logs can be
// reordered here since we have not yet assigned an index
// and are not violating any promises.
if !r.preparePeerChange(log) {
ready[i], ready[n-1] = ready[n-1], nil
n--
i--
continue
}

case log := <-r.peerChangeCh:
if r.preparePeerChange(log) {
// Apply peer set changes early and check if we will step
// down after the commit of this log. If so, we must not
// allow any future entries to make progress to avoid undefined
Expand All @@ -989,16 +961,30 @@ func (r *Raft) leaderLoop() {
} else {
r.leaderState.commitment.setVoters(append([]string{r.localAddr}, r.peers...))
}
r.dispatchLogs([]*logFuture{log})
}

// Nothing to do if all logs are invalid
if n == 0 {
continue
case newLog := <-r.applyCh:
// Group commit, gather all the ready commits
ready := []*logFuture{newLog}
for i := 0; i < r.conf.MaxAppendEntries; i++ {
select {
case newLog := <-r.applyCh:
ready = append(ready, newLog)
default:
break
}
}

// Dispatch the logs
ready = ready[:n]
r.dispatchLogs(ready)
if stepDown {
// we're in the process of stepping down as leader, don't process anything new
for i := range ready {
ready[i].respond(ErrNotLeader)
}
} else {
r.dispatchLogs(ready)
}

case <-lease:
// Check if we've exceeded the lease, potentially stepping down
Expand Down

0 comments on commit 2c24fd1

Please sign in to comment.