Skip to content

Commit

Permalink
Leadership transfer (hashicorp#306)
Browse files Browse the repository at this point in the history
This PR is implementing the leadership transfer extension described in the thesis chap 3.10.

Background:

Consul is performing some setup after acquiring leadership. It is possible that the setup fails, but there is no good way to step down as a leader. It is possible to use DemoteVoter as show in hashicorp/consul#5247, but this is suboptimal because it relies on Consul's autopilot to promote the old leader to a voter again.
Since there is a perfectly fine way described in the thesis: leadership transfer extension, we decided to implement that instead. Doing it this way also helps other teams, since it is more generic.

The necessary steps to perform are:

1. Leader picks target to transition to
2. Leader stops accepting client requests
3. Leader makes sure to replicate logs to the target
4. Leader sends TimeoutNow RPC request
5. Target receives TimeoutNow request, which triggers an election
6a. If the election is successful, a message with the new term will make the old leader step down
6b. if after electiontimeout the leadership transfer did not complete, the old leader resumes operation

Resources:

https://github.com/etcd-io/etcd/tree/master/raft
  • Loading branch information
hanshasselberg authored May 17, 2019
1 parent 2c55169 commit eba8343
Show file tree
Hide file tree
Showing 12 changed files with 835 additions and 79 deletions.
72 changes: 61 additions & 11 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ var (
// ErrCantBootstrap is returned when attempt is made to bootstrap a
// cluster that already has state present.
ErrCantBootstrap = errors.New("bootstrap only works on new clusters")

// ErrLeadershipTransferInProgress is returned when the leader is rejecting
// client requests because it is attempting to transfer leadership.
ErrLeadershipTransferInProgress = errors.New("leadership transfer in progress")
)

// Raft implements a Raft node.
Expand Down Expand Up @@ -97,6 +101,12 @@ type Raft struct {
// leaderState used only while state is leader
leaderState leaderState

// candidateFromLeadershipTransfer is used to indicate that this server became
// candidate because the leader tries to transfer leadership. This flag is
// used in RequestVoteRequest to express that a leadership transfer is going
// on.
candidateFromLeadershipTransfer bool

// Stores our local server ID, used to avoid sending RPCs to ourself
localID ServerID

Expand Down Expand Up @@ -157,6 +167,10 @@ type Raft struct {
// is indexed by an artificial ID which is used for deregistration.
observersLock sync.RWMutex
observers map[uint64]*Observer

// leadershipTransferCh is used to start a leadership transfer from outside of
// the main thread.
leadershipTransferCh chan *leadershipTransferFuture
}

// BootstrapCluster initializes a server's storage with the given cluster
Expand Down Expand Up @@ -443,17 +457,17 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna

// Create Raft struct.
r := &Raft{
protocolVersion: protocolVersion,
applyCh: make(chan *logFuture),
conf: *conf,
fsm: fsm,
fsmMutateCh: make(chan interface{}, 128),
fsmSnapshotCh: make(chan *reqSnapshotFuture),
leaderCh: make(chan bool),
localID: localID,
localAddr: localAddr,
logger: logger,
logs: logs,
protocolVersion: protocolVersion,
applyCh: make(chan *logFuture),
conf: *conf,
fsm: fsm,
fsmMutateCh: make(chan interface{}, 128),
fsmSnapshotCh: make(chan *reqSnapshotFuture),
leaderCh: make(chan bool),
localID: localID,
localAddr: localAddr,
logger: logger,
logs: logs,
configurationChangeCh: make(chan *configurationChangeFuture),
configurations: configurations{},
rpcCh: trans.Consumer(),
Expand All @@ -467,6 +481,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
configurationsCh: make(chan *configurationsFuture, 8),
bootstrapCh: make(chan *bootstrapFuture),
observers: make(map[uint64]*Observer),
leadershipTransferCh: make(chan *leadershipTransferFuture, 1),
}

// Initialize as a follower.
Expand Down Expand Up @@ -1011,3 +1026,38 @@ func (r *Raft) LastIndex() uint64 {
func (r *Raft) AppliedIndex() uint64 {
return r.getLastApplied()
}

// LeadershipTransfer will transfer leadership to a server in the cluster.
// This can only be called from the leader, or it will fail. The leader will
// stop accepting client requests, make sure the target server is up to date
// and starts the transfer with a TimeoutNow message. This message has the same
// effect as if the election timeout on the on the target server fires. Since
// it is unlikely that another server is starting an election, it is very
// likely that the target server is able to win the election. Note that raft
// protocol version 3 is not sufficient to use LeadershipTransfer. A recent
// version of that library has to be used that includes this feature. Using
// transfer leadership is safe however in a cluster where not every node has
// the latest version. If a follower cannot be promoted, it will fail
// gracefully.
func (r *Raft) LeadershipTransfer() Future {
if r.protocolVersion < 3 {
return errorFuture{ErrUnsupportedProtocol}
}

return r.initiateLeadershipTransfer(nil, nil)
}

// LeadershipTransferToServer does the same as LeadershipTransfer but takes a
// server in the arguments in case a leadership should be transitioned to a
// specific server in the cluster. Note that raft protocol version 3 is not
// sufficient to use LeadershipTransfer. A recent version of that library has
// to be used that includes this feature. Using transfer leadership is safe
// however in a cluster where not every node has the latest version. If a
// follower cannot be promoted, it will fail gracefully.
func (r *Raft) LeadershipTransferToServer(id ServerID, address ServerAddress) Future {
if r.protocolVersion < 3 {
return errorFuture{ErrUnsupportedProtocol}
}

return r.initiateLeadershipTransfer(&id, &address)
}
26 changes: 26 additions & 0 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ type RequestVoteRequest struct {
// Used to ensure safety
LastLogIndex uint64
LastLogTerm uint64

// Used to indicate to peers if this vote was triggered by a leadership
// transfer. It is required for leadership transfer to work, because servers
// wouldn't vote otherwise if they are aware of an existing leader.
LeadershipTransfer bool
}

// See WithRPCHeader.
Expand Down Expand Up @@ -149,3 +154,24 @@ type InstallSnapshotResponse struct {
func (r *InstallSnapshotResponse) GetRPCHeader() RPCHeader {
return r.RPCHeader
}

// TimeoutNowRequest is the command used by a leader to signal another server to
// start an election.
type TimeoutNowRequest struct {
RPCHeader
}

// See WithRPCHeader.
func (r *TimeoutNowRequest) GetRPCHeader() RPCHeader {
return r.RPCHeader
}

// TimeoutNowResponse is the response to TimeoutNowRequest.
type TimeoutNowResponse struct {
RPCHeader
}

// See WithRPCHeader.
func (r *TimeoutNowResponse) GetRPCHeader() RPCHeader {
return r.RPCHeader
}
15 changes: 15 additions & 0 deletions future.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ type SnapshotFuture interface {
Open() (*SnapshotMeta, io.ReadCloser, error)
}

// LeadershipTransferFuture is used for waiting on a user-triggered leadership
// transfer to complete.
type LeadershipTransferFuture interface {
Future
}

// errorFuture is used to return a static error.
type errorFuture struct {
err error
Expand Down Expand Up @@ -227,6 +233,15 @@ type verifyFuture struct {
voteLock sync.Mutex
}

// leadershipTransferFuture is used to track the progress of a leadership
// transfer internally.
type leadershipTransferFuture struct {
deferError

ID *ServerID
Address *ServerAddress
}

// configurationsFuture is used to retrieve the current configurations. This is
// used to allow safe access to this information outside of the main thread.
type configurationsFuture struct {
Expand Down
54 changes: 37 additions & 17 deletions fuzzy/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,34 +205,54 @@ func (c *cluster) appliedIndexes() map[string]uint64 {
return r
}

func (c *cluster) ApplyN(t *testing.T, leaderTimeout time.Duration, s *applySource, n uint) uint64 {
f := make([]raft.ApplyFuture, n)
func (c *cluster) generateNApplies(s *applySource, n uint) [][]byte {
data := make([][]byte, n)
startTime := time.Now()
endTime := startTime.Add(leaderTimeout)
for i := uint(0); i < n; i++ {
ldr := c.Leader(endTime.Sub(time.Now()))
if ldr != nil {
data[i] = s.nextEntry()
f[i] = ldr.raft.Apply(data[i], time.Second)
}
data[i] = s.nextEntry()
}
return data
}

func (c *cluster) leadershipTransfer(leaderTimeout time.Duration) raft.Future {
ldr := c.Leader(leaderTimeout)
return ldr.raft.LeadershipTransfer()
}

type applyFutureWithData struct {
future raft.ApplyFuture
data []byte
}

func (c *cluster) sendNApplies(leaderTimeout time.Duration, data [][]byte) []applyFutureWithData {
f := []applyFutureWithData{}

ldr := c.Leader(leaderTimeout)
for _, d := range data {
f = append(f, applyFutureWithData{future: ldr.raft.Apply(d, time.Second), data: d})
}
return f
}

func (c *cluster) checkApplyFutures(futures []applyFutureWithData) uint64 {
success := uint64(0)
for i := uint(0); i < n; i++ {
if f[i] == nil {
continue
}
if err := f[i].Error(); err == nil {
for _, a := range futures {
if err := a.future.Error(); err == nil {
success++
c.lastApplySuccess = f[i]
c.applied = append(c.applied, appliedItem{f[i].Index(), data[i]})
c.lastApplySuccess = a.future
c.applied = append(c.applied, appliedItem{a.future.Index(), a.data})
} else {
c.lastApplyFailure = f[i]
c.lastApplyFailure = a.future
}
}
return success
}

func (c *cluster) ApplyN(t *testing.T, leaderTimeout time.Duration, s *applySource, n uint) uint64 {
data := c.generateNApplies(s, n)
futures := c.sendNApplies(leaderTimeout, data)
return c.checkApplyFutures(futures)
}

func (c *cluster) VerifyFSM(t *testing.T) {
exp := c.nodes[0].fsm
expName := c.nodes[0].name
Expand Down
73 changes: 73 additions & 0 deletions fuzzy/leadershiptransfer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package fuzzy

import (
"math/rand"
"testing"
"time"

"github.com/hashicorp/raft"
)

// 5 node cluster
func TestRaft_FuzzyLeadershipTransfer(t *testing.T) {
cluster := newRaftCluster(t, testLogWriter, "lt", 5, nil)
r := rand.New(rand.NewSource(time.Now().UnixNano()))

s := newApplySource("LeadershipTransfer")
data := cluster.generateNApplies(s, uint(r.Intn(10000)))
futures := cluster.sendNApplies(time.Minute, data)
cluster.leadershipTransfer(time.Minute)

data = cluster.generateNApplies(s, uint(r.Intn(10000)))
futures = append(futures, cluster.sendNApplies(time.Minute, data)...)
cluster.leadershipTransfer(time.Minute)

data = cluster.generateNApplies(s, uint(r.Intn(10000)))
futures = append(futures, cluster.sendNApplies(time.Minute, data)...)
cluster.leadershipTransfer(time.Minute)

data = cluster.generateNApplies(s, uint(r.Intn(10000)))
futures = append(futures, cluster.sendNApplies(time.Minute, data)...)

ac := cluster.checkApplyFutures(futures)

cluster.Stop(t, time.Minute)
cluster.VerifyLog(t, ac)
cluster.VerifyFSM(t)
}

type LeadershipTransferMode int

type LeadershipTransfer struct {
verifier appendEntriesVerifier
slowNodes map[string]bool
delayMin time.Duration
delayMax time.Duration
mode LeadershipTransferMode
}

func (lt *LeadershipTransfer) Report(t *testing.T) {
lt.verifier.Report(t)
}

func (lt *LeadershipTransfer) PreRPC(s, t string, r *raft.RPC) error {
return nil
}

func (lt *LeadershipTransfer) nap() {
d := lt.delayMin + time.Duration(rand.Int63n((lt.delayMax - lt.delayMin).Nanoseconds()))
time.Sleep(d)
}

func (lt *LeadershipTransfer) PostRPC(src, target string, r *raft.RPC, res *raft.RPCResponse) error {
return nil
}

func (lt *LeadershipTransfer) PreRequestVote(src, target string, v *raft.RequestVoteRequest) (*raft.RequestVoteResponse, error) {
return nil, nil
}

func (lt *LeadershipTransfer) PreAppendEntries(src, target string, v *raft.AppendEntriesRequest) (*raft.AppendEntriesResponse, error) {
lt.verifier.PreAppendEntries(src, target, v)
return nil, nil
}
11 changes: 11 additions & 0 deletions fuzzy/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,15 @@ func (t *transport) sendRPC(target string, req interface{}, resp interface{}) er
}
rpc := raft.RPC{RespChan: rc}
var reqVote raft.RequestVoteRequest
var timeoutNow raft.TimeoutNowRequest
var appEnt raft.AppendEntriesRequest
dec := codec.NewDecoderBytes(buff.Bytes(), &codecHandle)
switch req.(type) {
case *raft.TimeoutNowRequest:
if err := dec.Decode(&timeoutNow); err != nil {
return err
}
rpc.Command = &timeoutNow
case *raft.RequestVoteRequest:
if err := dec.Decode(&reqVote); err != nil {
return err
Expand Down Expand Up @@ -166,6 +172,11 @@ func (t *transport) sendRPC(target string, req interface{}, resp interface{}) er
return result.Error
}

// TimeoutNow implements the Transport interface.
func (t *transport) TimeoutNow(id raft.ServerID, target raft.ServerAddress, args *raft.TimeoutNowRequest, resp *raft.TimeoutNowResponse) error {
return t.sendRPC(string(target), args, resp)
}

// AppendEntries sends the appropriate RPC to the target node.
func (t *transport) AppendEntries(id raft.ServerID, target raft.ServerAddress, args *raft.AppendEntriesRequest, resp *raft.AppendEntriesResponse) error {
ae := appendEntries{
Expand Down
13 changes: 13 additions & 0 deletions inmem_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,19 @@ func (i *InmemTransport) InstallSnapshot(id ServerID, target ServerAddress, args
return nil
}

// TimeoutNow implements the Transport interface.
func (i *InmemTransport) TimeoutNow(id ServerID, target ServerAddress, args *TimeoutNowRequest, resp *TimeoutNowResponse) error {
rpcResp, err := i.makeRPC(target, args, nil, 10*i.timeout)
if err != nil {
return err
}

// Copy the result back
out := rpcResp.Response.(*TimeoutNowResponse)
*resp = *out
return nil
}

func (i *InmemTransport) makeRPC(target ServerAddress, args interface{}, r io.Reader, timeout time.Duration) (rpcResp RPCResponse, err error) {
i.RLock()
peer, ok := i.peers[target]
Expand Down
Loading

0 comments on commit eba8343

Please sign in to comment.