From eba83432bf24fbc71177da3718314fe166633ad0 Mon Sep 17 00:00:00 2001 From: Hans Hasselberg Date: Fri, 17 May 2019 18:01:46 +0200 Subject: [PATCH] Leadership transfer (#306) This PR is implementing the leadership transfer extension described in the thesis chap 3.10. Background: Consul is performing some setup after acquiring leadership. It is possible that the setup fails, but there is no good way to step down as a leader. It is possible to use DemoteVoter as show in hashicorp/consul#5247, but this is suboptimal because it relies on Consul's autopilot to promote the old leader to a voter again. Since there is a perfectly fine way described in the thesis: leadership transfer extension, we decided to implement that instead. Doing it this way also helps other teams, since it is more generic. The necessary steps to perform are: 1. Leader picks target to transition to 2. Leader stops accepting client requests 3. Leader makes sure to replicate logs to the target 4. Leader sends TimeoutNow RPC request 5. Target receives TimeoutNow request, which triggers an election 6a. If the election is successful, a message with the new term will make the old leader step down 6b. if after electiontimeout the leadership transfer did not complete, the old leader resumes operation Resources: https://github.com/etcd-io/etcd/tree/master/raft --- api.go | 72 +++++-- commands.go | 26 +++ future.go | 15 ++ fuzzy/cluster.go | 54 ++++-- fuzzy/leadershiptransfer_test.go | 73 +++++++ fuzzy/transport.go | 11 ++ inmem_transport.go | 13 ++ net_transport.go | 13 ++ raft.go | 319 +++++++++++++++++++++++++++---- raft_test.go | 276 +++++++++++++++++++++++++- replication.go | 39 +++- transport.go | 3 + 12 files changed, 835 insertions(+), 79 deletions(-) create mode 100644 fuzzy/leadershiptransfer_test.go diff --git a/api.go b/api.go index c6f947f2416..7dbb3393aee 100644 --- a/api.go +++ b/api.go @@ -49,6 +49,10 @@ var ( // ErrCantBootstrap is returned when attempt is made to bootstrap a // cluster that already has state present. ErrCantBootstrap = errors.New("bootstrap only works on new clusters") + + // ErrLeadershipTransferInProgress is returned when the leader is rejecting + // client requests because it is attempting to transfer leadership. + ErrLeadershipTransferInProgress = errors.New("leadership transfer in progress") ) // Raft implements a Raft node. @@ -97,6 +101,12 @@ type Raft struct { // leaderState used only while state is leader leaderState leaderState + // candidateFromLeadershipTransfer is used to indicate that this server became + // candidate because the leader tries to transfer leadership. This flag is + // used in RequestVoteRequest to express that a leadership transfer is going + // on. + candidateFromLeadershipTransfer bool + // Stores our local server ID, used to avoid sending RPCs to ourself localID ServerID @@ -157,6 +167,10 @@ type Raft struct { // is indexed by an artificial ID which is used for deregistration. observersLock sync.RWMutex observers map[uint64]*Observer + + // leadershipTransferCh is used to start a leadership transfer from outside of + // the main thread. + leadershipTransferCh chan *leadershipTransferFuture } // BootstrapCluster initializes a server's storage with the given cluster @@ -443,17 +457,17 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna // Create Raft struct. r := &Raft{ - protocolVersion: protocolVersion, - applyCh: make(chan *logFuture), - conf: *conf, - fsm: fsm, - fsmMutateCh: make(chan interface{}, 128), - fsmSnapshotCh: make(chan *reqSnapshotFuture), - leaderCh: make(chan bool), - localID: localID, - localAddr: localAddr, - logger: logger, - logs: logs, + protocolVersion: protocolVersion, + applyCh: make(chan *logFuture), + conf: *conf, + fsm: fsm, + fsmMutateCh: make(chan interface{}, 128), + fsmSnapshotCh: make(chan *reqSnapshotFuture), + leaderCh: make(chan bool), + localID: localID, + localAddr: localAddr, + logger: logger, + logs: logs, configurationChangeCh: make(chan *configurationChangeFuture), configurations: configurations{}, rpcCh: trans.Consumer(), @@ -467,6 +481,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna configurationsCh: make(chan *configurationsFuture, 8), bootstrapCh: make(chan *bootstrapFuture), observers: make(map[uint64]*Observer), + leadershipTransferCh: make(chan *leadershipTransferFuture, 1), } // Initialize as a follower. @@ -1011,3 +1026,38 @@ func (r *Raft) LastIndex() uint64 { func (r *Raft) AppliedIndex() uint64 { return r.getLastApplied() } + +// LeadershipTransfer will transfer leadership to a server in the cluster. +// This can only be called from the leader, or it will fail. The leader will +// stop accepting client requests, make sure the target server is up to date +// and starts the transfer with a TimeoutNow message. This message has the same +// effect as if the election timeout on the on the target server fires. Since +// it is unlikely that another server is starting an election, it is very +// likely that the target server is able to win the election. Note that raft +// protocol version 3 is not sufficient to use LeadershipTransfer. A recent +// version of that library has to be used that includes this feature. Using +// transfer leadership is safe however in a cluster where not every node has +// the latest version. If a follower cannot be promoted, it will fail +// gracefully. +func (r *Raft) LeadershipTransfer() Future { + if r.protocolVersion < 3 { + return errorFuture{ErrUnsupportedProtocol} + } + + return r.initiateLeadershipTransfer(nil, nil) +} + +// LeadershipTransferToServer does the same as LeadershipTransfer but takes a +// server in the arguments in case a leadership should be transitioned to a +// specific server in the cluster. Note that raft protocol version 3 is not +// sufficient to use LeadershipTransfer. A recent version of that library has +// to be used that includes this feature. Using transfer leadership is safe +// however in a cluster where not every node has the latest version. If a +// follower cannot be promoted, it will fail gracefully. +func (r *Raft) LeadershipTransferToServer(id ServerID, address ServerAddress) Future { + if r.protocolVersion < 3 { + return errorFuture{ErrUnsupportedProtocol} + } + + return r.initiateLeadershipTransfer(&id, &address) +} diff --git a/commands.go b/commands.go index 5d89e7bcdb1..17416311d8a 100644 --- a/commands.go +++ b/commands.go @@ -76,6 +76,11 @@ type RequestVoteRequest struct { // Used to ensure safety LastLogIndex uint64 LastLogTerm uint64 + + // Used to indicate to peers if this vote was triggered by a leadership + // transfer. It is required for leadership transfer to work, because servers + // wouldn't vote otherwise if they are aware of an existing leader. + LeadershipTransfer bool } // See WithRPCHeader. @@ -149,3 +154,24 @@ type InstallSnapshotResponse struct { func (r *InstallSnapshotResponse) GetRPCHeader() RPCHeader { return r.RPCHeader } + +// TimeoutNowRequest is the command used by a leader to signal another server to +// start an election. +type TimeoutNowRequest struct { + RPCHeader +} + +// See WithRPCHeader. +func (r *TimeoutNowRequest) GetRPCHeader() RPCHeader { + return r.RPCHeader +} + +// TimeoutNowResponse is the response to TimeoutNowRequest. +type TimeoutNowResponse struct { + RPCHeader +} + +// See WithRPCHeader. +func (r *TimeoutNowResponse) GetRPCHeader() RPCHeader { + return r.RPCHeader +} diff --git a/future.go b/future.go index fac59a5cc47..cc1e905ef01 100644 --- a/future.go +++ b/future.go @@ -58,6 +58,12 @@ type SnapshotFuture interface { Open() (*SnapshotMeta, io.ReadCloser, error) } +// LeadershipTransferFuture is used for waiting on a user-triggered leadership +// transfer to complete. +type LeadershipTransferFuture interface { + Future +} + // errorFuture is used to return a static error. type errorFuture struct { err error @@ -227,6 +233,15 @@ type verifyFuture struct { voteLock sync.Mutex } +// leadershipTransferFuture is used to track the progress of a leadership +// transfer internally. +type leadershipTransferFuture struct { + deferError + + ID *ServerID + Address *ServerAddress +} + // configurationsFuture is used to retrieve the current configurations. This is // used to allow safe access to this information outside of the main thread. type configurationsFuture struct { diff --git a/fuzzy/cluster.go b/fuzzy/cluster.go index 75cc9beaa41..39f7c8be16d 100644 --- a/fuzzy/cluster.go +++ b/fuzzy/cluster.go @@ -205,34 +205,54 @@ func (c *cluster) appliedIndexes() map[string]uint64 { return r } -func (c *cluster) ApplyN(t *testing.T, leaderTimeout time.Duration, s *applySource, n uint) uint64 { - f := make([]raft.ApplyFuture, n) +func (c *cluster) generateNApplies(s *applySource, n uint) [][]byte { data := make([][]byte, n) - startTime := time.Now() - endTime := startTime.Add(leaderTimeout) for i := uint(0); i < n; i++ { - ldr := c.Leader(endTime.Sub(time.Now())) - if ldr != nil { - data[i] = s.nextEntry() - f[i] = ldr.raft.Apply(data[i], time.Second) - } + data[i] = s.nextEntry() + } + return data +} + +func (c *cluster) leadershipTransfer(leaderTimeout time.Duration) raft.Future { + ldr := c.Leader(leaderTimeout) + return ldr.raft.LeadershipTransfer() +} + +type applyFutureWithData struct { + future raft.ApplyFuture + data []byte +} + +func (c *cluster) sendNApplies(leaderTimeout time.Duration, data [][]byte) []applyFutureWithData { + f := []applyFutureWithData{} + + ldr := c.Leader(leaderTimeout) + for _, d := range data { + f = append(f, applyFutureWithData{future: ldr.raft.Apply(d, time.Second), data: d}) } + return f +} + +func (c *cluster) checkApplyFutures(futures []applyFutureWithData) uint64 { success := uint64(0) - for i := uint(0); i < n; i++ { - if f[i] == nil { - continue - } - if err := f[i].Error(); err == nil { + for _, a := range futures { + if err := a.future.Error(); err == nil { success++ - c.lastApplySuccess = f[i] - c.applied = append(c.applied, appliedItem{f[i].Index(), data[i]}) + c.lastApplySuccess = a.future + c.applied = append(c.applied, appliedItem{a.future.Index(), a.data}) } else { - c.lastApplyFailure = f[i] + c.lastApplyFailure = a.future } } return success } +func (c *cluster) ApplyN(t *testing.T, leaderTimeout time.Duration, s *applySource, n uint) uint64 { + data := c.generateNApplies(s, n) + futures := c.sendNApplies(leaderTimeout, data) + return c.checkApplyFutures(futures) +} + func (c *cluster) VerifyFSM(t *testing.T) { exp := c.nodes[0].fsm expName := c.nodes[0].name diff --git a/fuzzy/leadershiptransfer_test.go b/fuzzy/leadershiptransfer_test.go new file mode 100644 index 00000000000..8d254a2e75e --- /dev/null +++ b/fuzzy/leadershiptransfer_test.go @@ -0,0 +1,73 @@ +package fuzzy + +import ( + "math/rand" + "testing" + "time" + + "github.com/hashicorp/raft" +) + +// 5 node cluster +func TestRaft_FuzzyLeadershipTransfer(t *testing.T) { + cluster := newRaftCluster(t, testLogWriter, "lt", 5, nil) + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + s := newApplySource("LeadershipTransfer") + data := cluster.generateNApplies(s, uint(r.Intn(10000))) + futures := cluster.sendNApplies(time.Minute, data) + cluster.leadershipTransfer(time.Minute) + + data = cluster.generateNApplies(s, uint(r.Intn(10000))) + futures = append(futures, cluster.sendNApplies(time.Minute, data)...) + cluster.leadershipTransfer(time.Minute) + + data = cluster.generateNApplies(s, uint(r.Intn(10000))) + futures = append(futures, cluster.sendNApplies(time.Minute, data)...) + cluster.leadershipTransfer(time.Minute) + + data = cluster.generateNApplies(s, uint(r.Intn(10000))) + futures = append(futures, cluster.sendNApplies(time.Minute, data)...) + + ac := cluster.checkApplyFutures(futures) + + cluster.Stop(t, time.Minute) + cluster.VerifyLog(t, ac) + cluster.VerifyFSM(t) +} + +type LeadershipTransferMode int + +type LeadershipTransfer struct { + verifier appendEntriesVerifier + slowNodes map[string]bool + delayMin time.Duration + delayMax time.Duration + mode LeadershipTransferMode +} + +func (lt *LeadershipTransfer) Report(t *testing.T) { + lt.verifier.Report(t) +} + +func (lt *LeadershipTransfer) PreRPC(s, t string, r *raft.RPC) error { + return nil +} + +func (lt *LeadershipTransfer) nap() { + d := lt.delayMin + time.Duration(rand.Int63n((lt.delayMax - lt.delayMin).Nanoseconds())) + time.Sleep(d) +} + +func (lt *LeadershipTransfer) PostRPC(src, target string, r *raft.RPC, res *raft.RPCResponse) error { + return nil +} + +func (lt *LeadershipTransfer) PreRequestVote(src, target string, v *raft.RequestVoteRequest) (*raft.RequestVoteResponse, error) { + return nil, nil +} + +func (lt *LeadershipTransfer) PreAppendEntries(src, target string, v *raft.AppendEntriesRequest) (*raft.AppendEntriesResponse, error) { + lt.verifier.PreAppendEntries(src, target, v) + return nil, nil +} diff --git a/fuzzy/transport.go b/fuzzy/transport.go index 6d08de649e6..c45f3576823 100644 --- a/fuzzy/transport.go +++ b/fuzzy/transport.go @@ -112,9 +112,15 @@ func (t *transport) sendRPC(target string, req interface{}, resp interface{}) er } rpc := raft.RPC{RespChan: rc} var reqVote raft.RequestVoteRequest + var timeoutNow raft.TimeoutNowRequest var appEnt raft.AppendEntriesRequest dec := codec.NewDecoderBytes(buff.Bytes(), &codecHandle) switch req.(type) { + case *raft.TimeoutNowRequest: + if err := dec.Decode(&timeoutNow); err != nil { + return err + } + rpc.Command = &timeoutNow case *raft.RequestVoteRequest: if err := dec.Decode(&reqVote); err != nil { return err @@ -166,6 +172,11 @@ func (t *transport) sendRPC(target string, req interface{}, resp interface{}) er return result.Error } +// TimeoutNow implements the Transport interface. +func (t *transport) TimeoutNow(id raft.ServerID, target raft.ServerAddress, args *raft.TimeoutNowRequest, resp *raft.TimeoutNowResponse) error { + return t.sendRPC(string(target), args, resp) +} + // AppendEntries sends the appropriate RPC to the target node. func (t *transport) AppendEntries(id raft.ServerID, target raft.ServerAddress, args *raft.AppendEntriesRequest, resp *raft.AppendEntriesResponse) error { ae := appendEntries{ diff --git a/inmem_transport.go b/inmem_transport.go index bb42eeb68b9..7f493f4871c 100644 --- a/inmem_transport.go +++ b/inmem_transport.go @@ -135,6 +135,19 @@ func (i *InmemTransport) InstallSnapshot(id ServerID, target ServerAddress, args return nil } +// TimeoutNow implements the Transport interface. +func (i *InmemTransport) TimeoutNow(id ServerID, target ServerAddress, args *TimeoutNowRequest, resp *TimeoutNowResponse) error { + rpcResp, err := i.makeRPC(target, args, nil, 10*i.timeout) + if err != nil { + return err + } + + // Copy the result back + out := rpcResp.Response.(*TimeoutNowResponse) + *resp = *out + return nil +} + func (i *InmemTransport) makeRPC(target ServerAddress, args interface{}, r io.Reader, timeout time.Duration) (rpcResp RPCResponse, err error) { i.RLock() peer, ok := i.peers[target] diff --git a/net_transport.go b/net_transport.go index 4f1f101e004..523fa698e55 100644 --- a/net_transport.go +++ b/net_transport.go @@ -19,6 +19,7 @@ const ( rpcAppendEntries uint8 = iota rpcRequestVote rpcInstallSnapshot + rpcTimeoutNow // DefaultTimeoutScale is the default TimeoutScale in a NetworkTransport. DefaultTimeoutScale = 256 * 1024 // 256KB @@ -459,6 +460,11 @@ func (n *NetworkTransport) DecodePeer(buf []byte) ServerAddress { return ServerAddress(buf) } +// TimeoutNow implements the Transport interface. +func (n *NetworkTransport) TimeoutNow(id ServerID, target ServerAddress, args *TimeoutNowRequest, resp *TimeoutNowResponse) error { + return n.genericRPC(id, target, rpcTimeoutNow, args, resp) +} + // listen is used to handling incoming connections. func (n *NetworkTransport) listen() { const baseDelay = 5 * time.Millisecond @@ -577,6 +583,13 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, en rpc.Command = &req rpc.Reader = io.LimitReader(r, req.Size) + case rpcTimeoutNow: + var req TimeoutNowRequest + if err := dec.Decode(&req); err != nil { + return err + } + rpc.Command = &req + default: return fmt.Errorf("unknown rpc type %d", rpcType) } diff --git a/raft.go b/raft.go index a759230bc98..af05d641f96 100644 --- a/raft.go +++ b/raft.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "io/ioutil" + "sync/atomic" "time" "github.com/armon/go-metrics" @@ -77,12 +78,13 @@ type commitTuple struct { // leaderState is state that is used while we are a leader. type leaderState struct { - commitCh chan struct{} - commitment *commitment - inflight *list.List // list of logFuture in log index order - replState map[ServerID]*followerReplication - notify map[*verifyFuture]struct{} - stepDown chan struct{} + leadershipTransferInProgress int32 // indicates that a leadership transfer is in progress. + commitCh chan struct{} + commitment *commitment + inflight *list.List // list of logFuture in log index order + replState map[ServerID]*followerReplication + notify map[*verifyFuture]struct{} + stepDown chan struct{} } // setLeader is used to modify the current leader of the cluster @@ -148,7 +150,8 @@ func (r *Raft) runFollower() { r.logger.Info(fmt.Sprintf("%v entering Follower state (Leader: %q)", r, r.Leader())) metrics.IncrCounter([]string{"raft", "state", "follower"}, 1) heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout) - for { + + for r.getState() == Follower { select { case rpc := <-r.rpcCh: r.processRPC(rpc) @@ -169,6 +172,10 @@ func (r *Raft) runFollower() { // Reject any restores since we are not the leader r.respond(ErrNotLeader) + case r := <-r.leadershipTransferCh: + // Reject any operations since we are not the leader + r.respond(ErrNotLeader) + case c := <-r.configurationsCh: c.configurations = r.configurations.Clone() c.respond(nil) @@ -243,6 +250,14 @@ func (r *Raft) runCandidate() { // Start vote for us, and set a timeout voteCh := r.electSelf() + + // Make sure the leadership transfer flag is reset after each run. Having this + // flag will set the field LeadershipTransfer in a RequestVoteRequst to true, + // which will make other servers vote even though they have a leader already. + // It is important to reset that flag, because this priviledge could be abused + // otherwise. + defer func() { r.candidateFromLeadershipTransfer = false }() + electionTimer := randomTimeout(r.conf.ElectionTimeout) // Tally the votes, need a simple majority @@ -314,6 +329,33 @@ func (r *Raft) runCandidate() { } } +func (r *Raft) setLeadershipTransferInProgress(v bool) { + if v { + atomic.StoreInt32(&r.leaderState.leadershipTransferInProgress, 1) + } else { + atomic.StoreInt32(&r.leaderState.leadershipTransferInProgress, 0) + } +} + +func (r *Raft) getLeadershipTransferInProgress() bool { + v := atomic.LoadInt32(&r.leaderState.leadershipTransferInProgress) + if v == 1 { + return true + } + return false +} + +func (r *Raft) setupLeaderState() { + r.leaderState.commitCh = make(chan struct{}, 1) + r.leaderState.commitment = newCommitment(r.leaderState.commitCh, + r.configurations.latest, + r.getLastIndex()+1 /* first index that may be committed in this term */) + r.leaderState.inflight = list.New() + r.leaderState.replState = make(map[ServerID]*followerReplication) + r.leaderState.notify = make(map[*verifyFuture]struct{}) + r.leaderState.stepDown = make(chan struct{}, 1) +} + // runLeader runs the FSM for a leader. Do the setup here and drop into // the leaderLoop for the hot loop. func (r *Raft) runLeader() { @@ -331,15 +373,9 @@ func (r *Raft) runLeader() { } } - // Setup leader state - r.leaderState.commitCh = make(chan struct{}, 1) - r.leaderState.commitment = newCommitment(r.leaderState.commitCh, - r.configurations.latest, - r.getLastIndex()+1 /* first index that may be committed in this term */) - r.leaderState.inflight = list.New() - r.leaderState.replState = make(map[ServerID]*followerReplication) - r.leaderState.notify = make(map[*verifyFuture]struct{}) - r.leaderState.stepDown = make(chan struct{}, 1) + // setup leader state. This is only supposed to be accessed within the + // leaderloop. + r.setupLeaderState() // Cleanup state on step down defer func() { @@ -436,16 +472,17 @@ func (r *Raft) startStopReplication() { if _, ok := r.leaderState.replState[server.ID]; !ok { r.logger.Info(fmt.Sprintf("Added peer %v, starting replication", server.ID)) s := &followerReplication{ - peer: server, - commitment: r.leaderState.commitment, - stopCh: make(chan uint64, 1), - triggerCh: make(chan struct{}, 1), - currentTerm: r.getCurrentTerm(), - nextIndex: lastIdx + 1, - lastContact: time.Now(), - notify: make(map[*verifyFuture]struct{}), - notifyCh: make(chan struct{}, 1), - stepDown: r.leaderState.stepDown, + peer: server, + commitment: r.leaderState.commitment, + stopCh: make(chan uint64, 1), + triggerCh: make(chan struct{}, 1), + triggerDeferErrorCh: make(chan *deferError, 1), + currentTerm: r.getCurrentTerm(), + nextIndex: lastIdx + 1, + lastContact: time.Now(), + notify: make(map[*verifyFuture]struct{}), + notifyCh: make(chan struct{}, 1), + stepDown: r.leaderState.stepDown, } r.leaderState.replState[server.ID] = s r.goFunc(func() { r.replicate(s) }) @@ -496,8 +533,8 @@ func (r *Raft) leaderLoop() { // only a single peer (ourself) and replicating to an undefined set // of peers. stepDown := false - lease := time.After(r.conf.LeaderLeaseTimeout) + for r.getState() == Leader { select { case rpc := <-r.rpcCh: @@ -506,6 +543,74 @@ func (r *Raft) leaderLoop() { case <-r.leaderState.stepDown: r.setState(Follower) + case future := <-r.leadershipTransferCh: + if r.getLeadershipTransferInProgress() { + r.logger.Debug(ErrLeadershipTransferInProgress.Error()) + future.respond(ErrLeadershipTransferInProgress) + continue + } + r.logger.Debug("starting leadership transfer to %v: %v", future.ID, future.Address) + + // When we are leaving leaderLoop, we are no longer + // leader, so we should stop transferring. + leftLeaderLoop := make(chan struct{}) + defer func() { close(leftLeaderLoop) }() + + stopCh := make(chan struct{}) + doneCh := make(chan error, 1) + + // This is intentionally being setup outside of the + // leadershipTransfer function. Because the TimeoutNow + // call is blocking and there is no way to abort that + // in case eg the timer expires. + // The leadershipTransfer function is controlled with + // the stopCh and doneCh. + go func() { + select { + case <-time.After(r.conf.ElectionTimeout): + close(stopCh) + err := fmt.Errorf("leadership transfer timeout") + r.logger.Debug(err.Error()) + future.respond(err) + <-doneCh + case <-leftLeaderLoop: + close(stopCh) + err := fmt.Errorf("lost leadership during transfer (expected)") + r.logger.Debug(err.Error()) + future.respond(nil) + <-doneCh + case err := <-doneCh: + if err != nil { + r.logger.Debug(err.Error()) + } + future.respond(err) + } + }() + + // leaderState.replState is accessed here before + // starting leadership transfer asynchronously because + // leaderState is only supposed to be accessed in the + // leaderloop. + id := future.ID + address := future.Address + if id == nil { + s := r.pickServer() + if s != nil { + id = &s.ID + address = &s.Address + } else { + doneCh <- fmt.Errorf("cannot find peer") + continue + } + } + state, ok := r.leaderState.replState[*id] + if !ok { + doneCh <- fmt.Errorf("cannot find replication state for %v", id) + continue + } + + go r.leadershipTransfer(*id, *address, state, stopCh, doneCh) + case <-r.leaderState.commitCh: // Process the newly committed entries oldCommitIndex := r.getCommitIndex() @@ -584,20 +689,40 @@ func (r *Raft) leaderLoop() { } case future := <-r.userRestoreCh: + if r.getLeadershipTransferInProgress() { + r.logger.Debug(ErrLeadershipTransferInProgress.Error()) + future.respond(ErrLeadershipTransferInProgress) + continue + } err := r.restoreUserSnapshot(future.meta, future.reader) future.respond(err) - case c := <-r.configurationsCh: - c.configurations = r.configurations.Clone() - c.respond(nil) + case future := <-r.configurationsCh: + if r.getLeadershipTransferInProgress() { + r.logger.Debug(ErrLeadershipTransferInProgress.Error()) + future.respond(ErrLeadershipTransferInProgress) + continue + } + future.configurations = r.configurations.Clone() + future.respond(nil) case future := <-r.configurationChangeChIfStable(): + if r.getLeadershipTransferInProgress() { + r.logger.Debug(ErrLeadershipTransferInProgress.Error()) + future.respond(ErrLeadershipTransferInProgress) + continue + } r.appendConfigurationEntry(future) case b := <-r.bootstrapCh: b.respond(ErrCantBootstrap) case newLog := <-r.applyCh: + if r.getLeadershipTransferInProgress() { + r.logger.Debug(ErrLeadershipTransferInProgress.Error()) + newLog.respond(ErrLeadershipTransferInProgress) + continue + } // Group commit, gather all the ready commits ready := []*logFuture{newLog} for i := 0; i < r.conf.MaxAppendEntries; i++ { @@ -665,6 +790,54 @@ func (r *Raft) verifyLeader(v *verifyFuture) { } } +// leadershipTransfer is doing the heavy lifting for the leadership transfer. +func (r *Raft) leadershipTransfer(id ServerID, address ServerAddress, repl *followerReplication, stopCh chan struct{}, doneCh chan error) { + + // make sure we are not already stopped + select { + case <-stopCh: + doneCh <- nil + return + default: + } + + // Step 1: set this field which stops this leader from responding to any client requests. + r.setLeadershipTransferInProgress(true) + defer func() { r.setLeadershipTransferInProgress(false) }() + + for atomic.LoadUint64(&repl.nextIndex) <= r.getLastIndex() { + err := &deferError{} + err.init() + repl.triggerDeferErrorCh <- err + select { + case err := <-err.errCh: + if err != nil { + doneCh <- err + return + } + case <-stopCh: + doneCh <- nil + return + } + } + + // Step ?: the thesis describes in chap 6.4.1: Using clocks to reduce + // messaging for read-only queries. If this is implemented, the lease + // has to be reset as well, in case leadership is transferred. This + // implementation also has a lease, but it serves another purpose and + // doesn't need to be reset. The lease mechanism in our raft lib, is + // setup in a similar way to the one in the thesis, but in practice + // it's a timer that just tells the leader how often to check + // heartbeats are still coming in. + + // Step 3: send TimeoutNow message to target server. + err := r.trans.TimeoutNow(id, address, &TimeoutNowRequest{RPCHeader: r.getRPCHeader()}, &TimeoutNowResponse{}) + if err != nil { + err = fmt.Errorf("failed to make TimeoutNow RPC to %v: %v", id, err) + } + doneCh <- err +} + // checkLeaderLease is used to check if we can contact a quorum of nodes // within the last leader lease interval. If not, we need to step down, // as we may have lost connectivity. Returns the maximum duration without @@ -981,6 +1154,8 @@ func (r *Raft) processRPC(rpc RPC) { r.requestVote(rpc, cmd) case *InstallSnapshotRequest: r.installSnapshot(rpc, cmd) + case *TimeoutNowRequest: + r.timeoutNow(rpc, cmd) default: r.logger.Error(fmt.Sprintf("Got unexpected command: %#v", rpc.Command)) rpc.Respond(nil, fmt.Errorf("unexpected command")) @@ -1184,9 +1359,12 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { resp.Peers = encodePeers(r.configurations.latest, r.trans) } - // Check if we have an existing leader [who's not the candidate] + // Check if we have an existing leader [who's not the candidate] and also + // check the LeadershipTransfer flag is set. Usually votes are rejected if + // there is a known leader. But if the leader initiated a leadership transfer, + // vote! candidate := r.trans.DecodePeer(req.Candidate) - if leader := r.Leader(); leader != "" && leader != candidate { + if leader := r.Leader(); leader != "" && leader != candidate && !req.LeadershipTransfer { r.logger.Warn(fmt.Sprintf("Rejecting vote request from %v since we have a leader: %v", candidate, leader)) return @@ -1200,6 +1378,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { // Increase the term if we see a newer one if req.Term > r.getCurrentTerm() { // Ensure transition to follower + r.logger.Debug("lost leadership because received a requestvote with newer term") r.setState(Follower) r.setCurrentTerm(req.Term) resp.Term = req.Term @@ -1404,11 +1583,12 @@ func (r *Raft) electSelf() <-chan *voteResult { // Construct the request lastIdx, lastTerm := r.getLastEntry() req := &RequestVoteRequest{ - RPCHeader: r.getRPCHeader(), - Term: r.getCurrentTerm(), - Candidate: r.trans.EncodePeer(r.localID, r.localAddr), - LastLogIndex: lastIdx, - LastLogTerm: lastTerm, + RPCHeader: r.getRPCHeader(), + Term: r.getCurrentTerm(), + Candidate: r.trans.EncodePeer(r.localID, r.localAddr), + LastLogIndex: lastIdx, + LastLogTerm: lastTerm, + LeadershipTransfer: r.candidateFromLeadershipTransfer, } // Construct a function to ask for a vote @@ -1484,3 +1664,68 @@ func (r *Raft) setState(state RaftState) { r.observe(state) } } + +// LookupServer looks up a server by ServerID. +func (r *Raft) lookupServer(id ServerID) *Server { + for _, server := range r.configurations.latest.Servers { + if server.ID != r.localID { + return &server + } + } + return nil +} + +// pickServer returns the follower that is most up to date. Because it accesses +// leaderstate, it should only be called from the leaderloop. +func (r *Raft) pickServer() *Server { + var pick *Server + var current uint64 + for _, server := range r.configurations.latest.Servers { + if server.ID == r.localID { + continue + } + state, ok := r.leaderState.replState[server.ID] + if !ok { + continue + } + nextIdx := atomic.LoadUint64(&state.nextIndex) + if nextIdx > current { + current = nextIdx + tmp := server + pick = &tmp + } + } + return pick +} + +// initiateLeadershipTransfer starts the leadership on the leader side, by +// sending a message to the leadershipTransferCh, to make sure it runs in the +// mainloop. +func (r *Raft) initiateLeadershipTransfer(id *ServerID, address *ServerAddress) LeadershipTransferFuture { + future := &leadershipTransferFuture{ID: id, Address: address} + future.init() + + if id != nil && *id == r.localID { + err := fmt.Errorf("cannot transfer leadership to itself") + r.logger.Info(err.Error()) + future.respond(err) + return future + } + + select { + case r.leadershipTransferCh <- future: + return future + case <-r.shutdownCh: + return errorFuture{ErrRaftShutdown} + default: + return errorFuture{ErrEnqueueTimeout} + } +} + +// timeoutNow is what happens when a server receives a TimeoutNowRequest. +func (r *Raft) timeoutNow(rpc RPC, req *TimeoutNowRequest) { + r.setLeader("") + r.setState(Candidate) + r.candidateFromLeadershipTransfer = true + rpc.Respond(&TimeoutNowResponse{}, nil) +} diff --git a/raft_test.go b/raft_test.go index f2c73e3ce11..b2e6ee89581 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2277,11 +2277,12 @@ func TestRaft_Voting(t *testing.T) { ldrT := c.trans[c.IndexOf(ldr)] reqVote := RequestVoteRequest{ - RPCHeader: ldr.getRPCHeader(), - Term: ldr.getCurrentTerm() + 10, - Candidate: ldrT.EncodePeer(ldr.localID, ldr.localAddr), - LastLogIndex: ldr.LastIndex(), - LastLogTerm: ldr.getCurrentTerm(), + RPCHeader: ldr.getRPCHeader(), + Term: ldr.getCurrentTerm() + 10, + Candidate: ldrT.EncodePeer(ldr.localID, ldr.localAddr), + LastLogIndex: ldr.LastIndex(), + LastLogTerm: ldr.getCurrentTerm(), + LeadershipTransfer: false, } // a follower that thinks there's a leader should vote for that leader. var resp RequestVoteResponse @@ -2291,7 +2292,7 @@ func TestRaft_Voting(t *testing.T) { if !resp.Granted { c.FailNowf("[ERR] expected vote to be granted, but wasn't %+v", resp) } - // a follow that thinks there's a leader shouldn't vote for a different candidate + // a follower that thinks there's a leader shouldn't vote for a different candidate reqVote.Candidate = ldrT.EncodePeer(followers[0].localID, followers[0].localAddr) if err := ldrT.RequestVote(followers[1].localID, followers[1].localAddr, &reqVote, &resp); err != nil { c.FailNowf("[ERR] RequestVote RPC failed %v", err) @@ -2299,6 +2300,16 @@ func TestRaft_Voting(t *testing.T) { if resp.Granted { c.FailNowf("[ERR] expected vote not to be granted, but was %+v", resp) } + // a follower that thinks there's a leader, but the request has the leadership transfer flag, should + // vote for a different candidate + reqVote.LeadershipTransfer = true + reqVote.Candidate = ldrT.EncodePeer(followers[0].localID, followers[0].localAddr) + if err := ldrT.RequestVote(followers[1].localID, followers[1].localAddr, &reqVote, &resp); err != nil { + c.FailNowf("[ERR] RequestVote RPC failed %v", err) + } + if !resp.Granted { + c.FailNowf("[ERR] expected vote to be granted, but wasn't %+v", resp) + } } func TestRaft_ProtocolVersion_RejectRPC(t *testing.T) { @@ -2410,6 +2421,259 @@ func TestRaft_ProtocolVersion_Upgrade_2_3(t *testing.T) { } } +func TestRaft_LeadershipTransferInProgress(t *testing.T) { + r := &Raft{leaderState: leaderState{}} + r.setupLeaderState() + + if r.getLeadershipTransferInProgress() != false { + t.Errorf("should be true after setup") + } + + r.setLeadershipTransferInProgress(true) + if r.getLeadershipTransferInProgress() != true { + t.Errorf("should be true because we set it before") + } + r.setLeadershipTransferInProgress(false) + if r.getLeadershipTransferInProgress() != false { + t.Errorf("should be false because we set it before") + } +} + +func pointerToString(s string) *string { + return &s +} + +func TestRaft_LeadershipTransferPickServer(t *testing.T) { + type variant struct { + lastLogIndex int + servers map[string]uint64 + expected *string + } + leaderID := "z" + variants := []variant{ + {lastLogIndex: 10, servers: map[string]uint64{}, expected: nil}, + {lastLogIndex: 10, servers: map[string]uint64{leaderID: 11, "a": 9}, expected: pointerToString("a")}, + {lastLogIndex: 10, servers: map[string]uint64{leaderID: 11, "a": 9, "b": 8}, expected: pointerToString("a")}, + {lastLogIndex: 10, servers: map[string]uint64{leaderID: 11, "c": 9, "b": 8, "a": 8}, expected: pointerToString("c")}, + {lastLogIndex: 10, servers: map[string]uint64{leaderID: 11, "a": 7, "b": 11, "c": 8}, expected: pointerToString("b")}, + } + for i, v := range variants { + servers := []Server{} + replState := map[ServerID]*followerReplication{} + for id, idx := range v.servers { + servers = append(servers, Server{ID: ServerID(id)}) + replState[ServerID(id)] = &followerReplication{nextIndex: idx} + } + r := Raft{leaderState: leaderState{}, localID: ServerID(leaderID), configurations: configurations{latest: Configuration{Servers: servers}}} + r.lastLogIndex = uint64(v.lastLogIndex) + r.leaderState.replState = replState + + actual := r.pickServer() + if v.expected == nil && actual == nil { + continue + } else if v.expected == nil && actual != nil { + t.Errorf("case %d: actual: %v doesn't match expected: %v", i, actual, v.expected) + } else if actual == nil && v.expected != nil { + t.Errorf("case %d: actual: %v doesn't match expected: %v", i, actual, v.expected) + } else if string(actual.ID) != *v.expected { + t.Errorf("case %d: actual: %v doesn't match expected: %v", i, actual.ID, *v.expected) + } + } +} + +func TestRaft_LeadershipTransfer(t *testing.T) { + c := MakeCluster(3, t, nil) + defer c.Close() + + oldLeader := string(c.Leader().localID) + err := c.Leader().LeadershipTransfer() + if err.Error() != nil { + t.Fatalf("Didn't expect error: %v", err.Error()) + } + newLeader := string(c.Leader().localID) + if oldLeader == newLeader { + t.Error("Leadership should have been transitioned to another peer.") + } +} + +func TestRaft_LeadershipTransferWithOneNode(t *testing.T) { + c := MakeCluster(1, t, nil) + defer c.Close() + + future := c.Leader().LeadershipTransfer() + if future.Error() == nil { + t.Fatal("leadership transfer should err") + } + + expected := "cannot find peer" + actual := future.Error().Error() + if !strings.Contains(actual, expected) { + t.Errorf("leadership transfer should err with: %s", expected) + } +} + +func TestRaft_LeadershipTransferWithSevenNodes(t *testing.T) { + c := MakeCluster(7, t, nil) + defer c.Close() + + oldLeader := c.Leader().localID + follower := c.GetInState(Follower)[0] + future := c.Leader().LeadershipTransferToServer(follower.localID, follower.localAddr) + if future.Error() != nil { + t.Fatalf("Didn't expect error: %v", future.Error()) + } + if oldLeader == c.Leader().localID { + t.Error("Leadership should have been transitioned to specified server.") + } +} + +func TestRaft_LeadershipTransferToInvalidID(t *testing.T) { + c := MakeCluster(3, t, nil) + defer c.Close() + + future := c.Leader().LeadershipTransferToServer(ServerID("abc"), ServerAddress("127.0.0.1")) + if future.Error() == nil { + t.Fatal("leadership transfer should err") + } + + expected := "cannot find replication state" + actual := future.Error().Error() + if !strings.Contains(actual, expected) { + t.Errorf("leadership transfer should err with: %s", expected) + } +} + +func TestRaft_LeadershipTransferToInvalidAddress(t *testing.T) { + c := MakeCluster(3, t, nil) + defer c.Close() + + follower := c.GetInState(Follower)[0] + future := c.Leader().LeadershipTransferToServer(follower.localID, ServerAddress("127.0.0.1")) + if future.Error() == nil { + t.Fatal("leadership transfer should err") + } + expected := "failed to make TimeoutNow RPC" + actual := future.Error().Error() + if !strings.Contains(actual, expected) { + t.Errorf("leadership transfer should err with: %s", expected) + } +} + +func TestRaft_LeadershipTransferToBehindServer(t *testing.T) { + c := MakeCluster(3, t, nil) + defer c.Close() + + l := c.Leader() + behind := c.GetInState(Follower)[0] + + // Commit a lot of things + for i := 0; i < 1000; i++ { + l.Apply([]byte(fmt.Sprintf("test%d", i)), 0) + } + + future := l.LeadershipTransferToServer(behind.localID, behind.localAddr) + if future.Error() != nil { + t.Fatalf("This is not supposed to error: %v", future.Error()) + } + if c.Leader().localID != behind.localID { + t.Fatal("Behind server did not get leadership") + } +} + +func TestRaft_LeadershipTransferToItself(t *testing.T) { + c := MakeCluster(3, t, nil) + defer c.Close() + + l := c.Leader() + + future := l.LeadershipTransferToServer(l.localID, l.localAddr) + if future.Error() == nil { + t.Fatal("leadership transfer should err") + } + expected := "cannot transfer leadership to itself" + actual := future.Error().Error() + if !strings.Contains(actual, expected) { + t.Errorf("leadership transfer should err with: %s", expected) + } +} + +func TestRaft_LeadershipTransferLeaderRejectsClientRequests(t *testing.T) { + c := MakeCluster(3, t, nil) + defer c.Close() + l := c.Leader() + l.setLeadershipTransferInProgress(true) + + // tests for API > protocol version 3 is missing here because leadership transfer + // is only available for protocol version >= 3 + // TODO: is something missing here? + futures := []Future{ + l.AddNonvoter(ServerID(""), ServerAddress(""), 0, 0), + l.AddVoter(ServerID(""), ServerAddress(""), 0, 0), + l.Apply([]byte("test"), 0), + l.Barrier(0), + l.DemoteVoter(ServerID(""), 0, 0), + l.GetConfiguration(), + + // the API is tested, but here we are making sure we reject any config change. + l.requestConfigChange(configurationChangeRequest{}, 100*time.Millisecond), + } + futures = append(futures, l.LeadershipTransfer()) + select { + case <-l.leadershipTransferCh: + default: + } + + futures = append(futures, l.LeadershipTransferToServer(ServerID(""), ServerAddress(""))) + + for i, f := range futures { + if f.Error() != ErrLeadershipTransferInProgress { + t.Errorf("case %d: should have errored with: %s, instead of %s", i, ErrLeadershipTransferInProgress, f.Error()) + } + } +} + +func TestRaft_LeadershipTransferLeaderReplicationTimeout(t *testing.T) { + c := MakeCluster(3, t, nil) + defer c.Close() + + l := c.Leader() + behind := c.GetInState(Follower)[0] + + // Commit a lot of things, so that the timeout can kick in + for i := 0; i < 10000; i++ { + l.Apply([]byte(fmt.Sprintf("test%d", i)), 0) + } + + // set ElectionTimeout really short because this is used to determine + // how long a transfer is allowed to take. + l.conf.ElectionTimeout = 1 * time.Nanosecond + + future := l.LeadershipTransferToServer(behind.localID, behind.localAddr) + if future.Error() == nil { + t.Log("This test is fishing for a replication timeout, but this is not guaranteed to happen.") + } else { + expected := "leadership transfer timeout" + actual := future.Error().Error() + if !strings.Contains(actual, expected) { + t.Errorf("leadership transfer should err with: %s", expected) + } + } +} + +func TestRaft_LeadershipTransferStopRightAway(t *testing.T) { + r := Raft{leaderState: leaderState{}} + r.setupLeaderState() + + stopCh := make(chan struct{}) + doneCh := make(chan error, 1) + close(stopCh) + r.leadershipTransfer(ServerID("a"), ServerAddress(""), &followerReplication{}, stopCh, doneCh) + err := <-doneCh + if err != nil { + t.Errorf("leadership shouldn't have started, but instead it error with: %v", err) + } +} + // TODO: These are test cases we'd like to write for appendEntries(). // Unfortunately, it's difficult to do so with the current way this file is // tested. diff --git a/replication.go b/replication.go index 1f5f1007f5a..b62e0d0d287 100644 --- a/replication.go +++ b/replication.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" "github.com/armon/go-metrics" @@ -40,12 +41,18 @@ type followerReplication struct { // index; replication should be attempted with a best effort up through that // index, before exiting. stopCh chan uint64 + // triggerCh is notified every time new entries are appended to the log. triggerCh chan struct{} + // triggerDeferErrorCh is used to provide a backchannel. By sending a + // deferErr, the sender can be notifed when the replication is done. + triggerDeferErrorCh chan *deferError + // currentTerm is the term of this leader, to be included in AppendEntries // requests. currentTerm uint64 + // nextIndex is the index of the next log entry to send to the follower, // which may fall past the end of the log. nextIndex uint64 @@ -134,6 +141,14 @@ RPC: r.replicateTo(s, maxIndex) } return + case deferErr := <-s.triggerDeferErrorCh: + lastLogIdx, _ := r.getLastLog() + shouldStop = r.replicateTo(s, lastLogIdx) + if !shouldStop { + deferErr.respond(nil) + } else { + deferErr.respond(fmt.Errorf("replication failed")) + } case <-s.triggerCh: lastLogIdx, _ := r.getLastLog() shouldStop = r.replicateTo(s, lastLogIdx) @@ -187,7 +202,7 @@ START: } // Setup the request - if err := r.setupAppendEntries(s, &req, s.nextIndex, lastIndex); err == ErrLogNotFound { + if err := r.setupAppendEntries(s, &req, atomic.LoadUint64(&s.nextIndex), lastIndex); err == ErrLogNotFound { goto SEND_SNAP } else if err != nil { return @@ -220,13 +235,13 @@ START: s.failures = 0 s.allowPipeline = true } else { - s.nextIndex = max(min(s.nextIndex-1, resp.LastLog+1), 1) + atomic.StoreUint64(&s.nextIndex, max(min(s.nextIndex-1, resp.LastLog+1), 1)) if resp.NoRetryBackoff { s.failures = 0 } else { s.failures++ } - r.logger.Warn(fmt.Sprintf("AppendEntries to %v rejected, sending older logs (next: %d)", s.peer, s.nextIndex)) + r.logger.Warn(fmt.Sprintf("AppendEntries to %v rejected, sending older logs (next: %d)", s.peer, atomic.LoadUint64(&s.nextIndex))) } CHECK_MORE: @@ -242,7 +257,7 @@ CHECK_MORE: } // Check if there are more logs to replicate - if s.nextIndex <= lastIndex { + if atomic.LoadUint64(&s.nextIndex) <= lastIndex { goto START } return @@ -321,7 +336,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) { // Check for success if resp.Success { // Update the indexes - s.nextIndex = meta.Index + 1 + atomic.StoreUint64(&s.nextIndex, meta.Index+1) s.commitment.match(s.peer.ID, meta.Index) // Clear any failures @@ -397,7 +412,7 @@ func (r *Raft) pipelineReplicate(s *followerReplication) error { r.goFunc(func() { r.pipelineDecode(s, pipeline, stopCh, finishCh) }) // Start pipeline sends at the last good nextIndex - nextIndex := s.nextIndex + nextIndex := atomic.LoadUint64(&s.nextIndex) shouldStop := false SEND: @@ -411,6 +426,14 @@ SEND: r.pipelineSend(s, pipeline, &nextIndex, maxIndex) } break SEND + case deferErr := <-s.triggerDeferErrorCh: + lastLogIdx, _ := r.getLastLog() + shouldStop = r.pipelineSend(s, pipeline, &nextIndex, lastLogIdx) + if !shouldStop { + deferErr.respond(nil) + } else { + deferErr.respond(fmt.Errorf("replication failed")) + } case <-s.triggerCh: lastLogIdx, _ := r.getLastLog() shouldStop = r.pipelineSend(s, pipeline, &nextIndex, lastLogIdx) @@ -447,7 +470,7 @@ func (r *Raft) pipelineSend(s *followerReplication, p AppendPipeline, nextIdx *u // Increase the next send log to avoid re-sending old logs if n := len(req.Entries); n > 0 { last := req.Entries[n-1] - *nextIdx = last.Index + 1 + atomic.StoreUint64(nextIdx, last.Index+1) } return false } @@ -563,7 +586,7 @@ func updateLastAppended(s *followerReplication, req *AppendEntriesRequest) { // Mark any inflight logs as committed if logs := req.Entries; len(logs) > 0 { last := logs[len(logs)-1] - s.nextIndex = last.Index + 1 + atomic.StoreUint64(&s.nextIndex, last.Index+1) s.commitment.match(s.peer.ID, last.Index) } diff --git a/transport.go b/transport.go index 85459b221d1..b18d245938c 100644 --- a/transport.go +++ b/transport.go @@ -58,6 +58,9 @@ type Transport interface { // disk IO. If a Transport does not support this, it can simply // ignore the call, and push the heartbeat onto the Consumer channel. SetHeartbeatHandler(cb func(rpc RPC)) + + // TimeoutNow is used to start a leadership transfer to the target node. + TimeoutNow(id ServerID, target ServerAddress, args *TimeoutNowRequest, resp *TimeoutNowResponse) error } // WithClose is an interface that a transport may provide which