Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leadership transfer #306

Merged
merged 53 commits into from
May 17, 2019
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
3ec1046
first step
hanshasselberg Jan 25, 2019
2935d2f
more code
hanshasselberg Jan 28, 2019
c2ae146
return more errors to the original caller
hanshasselberg Jan 28, 2019
3c9bd0e
more tests
hanshasselberg Jan 28, 2019
48e5be6
tests for pickServer
hanshasselberg Jan 28, 2019
0ca3242
try to keep it together
hanshasselberg Jan 29, 2019
12e16d5
remove debug stuff.
hanshasselberg Jan 29, 2019
8ab3eb7
add deps to integ
hanshasselberg Jan 29, 2019
5298547
stuff
hanshasselberg Jan 29, 2019
f2d3516
tests
hanshasselberg Jan 30, 2019
8c56e5e
I want the tests to pass
hanshasselberg Jan 30, 2019
23d6323
add flag on requestvote
hanshasselberg Jan 30, 2019
ac55b0c
rename to leadership transfer according to thesis
hanshasselberg Jan 30, 2019
d964151
do not transfer to itself
hanshasselberg Jan 30, 2019
2df25cb
reset lease
hanshasselberg Jan 30, 2019
af4b7a8
block client requests
hanshasselberg Jan 31, 2019
b203fbf
fix comment
hanshasselberg Jan 31, 2019
9c737da
:shrug:
hanshasselberg Jan 31, 2019
a95d26b
debug
hanshasselberg Jan 31, 2019
921ea84
duh
hanshasselberg Jan 31, 2019
97c73df
some docs
hanshasselberg Jan 31, 2019
783eff6
all the docs
hanshasselberg Jan 31, 2019
ae2ade1
docs typo
hanshasselberg Feb 1, 2019
96d1d0e
set r.leaderState.leadershipTransferInProgress to false. plus set it …
hanshasselberg Feb 1, 2019
6fd97a2
change the capacity of the leadershipTransferCh to 0
hanshasselberg Feb 11, 2019
e8232ed
timeout replication as well
hanshasselberg Feb 12, 2019
b293e7c
move to atomic value for transferInProgress flag
hanshasselberg Feb 12, 2019
44c4524
remove debug out
hanshasselberg Feb 12, 2019
38cf2a3
return follower that is most up to date.
hanshasselberg Feb 12, 2019
e241011
more tests
hanshasselberg Feb 13, 2019
688dd5b
make leadershiptransfer async for real!
hanshasselberg Feb 13, 2019
5e054f3
do not reset lease.
hanshasselberg Feb 13, 2019
c8e1dea
avoid pointer and allow verifyLeader
hanshasselberg Feb 13, 2019
8a6f91b
mention the version thingy in the comments
hanshasselberg Feb 13, 2019
672aab0
mention graceful errors
hanshasselberg Feb 13, 2019
8701e75
fuzzy test leadership transfer
hanshasselberg Feb 14, 2019
d2f396a
more comments
hanshasselberg Mar 1, 2019
6d77567
more changes to make test success more likely
hanshasselberg Mar 1, 2019
e4fa2b8
more changes
hanshasselberg Mar 1, 2019
ca72a94
another way of knowing when no longer leader
hanshasselberg Mar 4, 2019
06a27b2
log leadership transfer
hanshasselberg Mar 4, 2019
7704957
make test failure optional.
hanshasselberg Mar 4, 2019
4d0e509
comment
hanshasselberg Mar 4, 2019
073891a
remove commented code.
hanshasselberg May 14, 2019
d1fdde7
revert lease changes and remove some unused code/comments
hanshasselberg May 14, 2019
30fe721
revert makefile changes
hanshasselberg May 15, 2019
982d4dc
access leaderstate only in leaderloop
hanshasselberg May 15, 2019
e60b1b0
do not return early to get the latest follower.
hanshasselberg May 15, 2019
50f6abc
move to new logging
hanshasselberg May 15, 2019
5b66163
pass *followerReplication
hanshasselberg May 16, 2019
5911330
Update raft.go
hanshasselberg May 16, 2019
a855d10
docs
hanshasselberg May 16, 2019
0f3edd7
protect nextIndex from races by using atomic.
hanshasselberg May 16, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 61 additions & 11 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ var (
// ErrCantBootstrap is returned when attempt is made to bootstrap a
// cluster that already has state present.
ErrCantBootstrap = errors.New("bootstrap only works on new clusters")

// ErrLeadershipTransferInProgress is returned when the leader is rejecting
// client requests because it is attempting to transfer leadership.
ErrLeadershipTransferInProgress = errors.New("leadership transfer in progress")
)

// Raft implements a Raft node.
Expand Down Expand Up @@ -97,6 +101,12 @@ type Raft struct {
// leaderState used only while state is leader
leaderState leaderState

// candidateFromLeadershipTransfer is used to indicate that this server became
// candidate because the leader tries to transfer leadership. This flag is
// used in RequestVoteRequest to express that a leadership transfer is going
// on.
candidateFromLeadershipTransfer bool

// Stores our local server ID, used to avoid sending RPCs to ourself
localID ServerID

Expand Down Expand Up @@ -157,6 +167,10 @@ type Raft struct {
// is indexed by an artificial ID which is used for deregistration.
observersLock sync.RWMutex
observers map[uint64]*Observer

// leadershipTransferCh is used to start a leadership transfer from outside of
// the main thread.
leadershipTransferCh chan *leadershipTransferFuture
}

// BootstrapCluster initializes a server's storage with the given cluster
Expand Down Expand Up @@ -443,17 +457,17 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna

// Create Raft struct.
r := &Raft{
protocolVersion: protocolVersion,
applyCh: make(chan *logFuture),
conf: *conf,
fsm: fsm,
fsmMutateCh: make(chan interface{}, 128),
fsmSnapshotCh: make(chan *reqSnapshotFuture),
leaderCh: make(chan bool),
localID: localID,
localAddr: localAddr,
logger: logger,
logs: logs,
protocolVersion: protocolVersion,
applyCh: make(chan *logFuture),
conf: *conf,
fsm: fsm,
fsmMutateCh: make(chan interface{}, 128),
fsmSnapshotCh: make(chan *reqSnapshotFuture),
leaderCh: make(chan bool),
localID: localID,
localAddr: localAddr,
logger: logger,
logs: logs,
configurationChangeCh: make(chan *configurationChangeFuture),
configurations: configurations{},
rpcCh: trans.Consumer(),
Expand All @@ -467,6 +481,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
configurationsCh: make(chan *configurationsFuture, 8),
bootstrapCh: make(chan *bootstrapFuture),
observers: make(map[uint64]*Observer),
leadershipTransferCh: make(chan *leadershipTransferFuture, 1),
}

// Initialize as a follower.
Expand Down Expand Up @@ -1011,3 +1026,38 @@ func (r *Raft) LastIndex() uint64 {
func (r *Raft) AppliedIndex() uint64 {
return r.getLastApplied()
}

// LeadershipTransfer will transfer leadership to a server in the cluster.
// This can only be called from the leader, or it will fail. The leader will
// stop accepting client requests, make sure the target server is up to date
// and starts the transfer with a TimeoutNow message. This message has the same
// effect as if the election timeout on the on the target server fires. Since
// it is unlikely that another server is starting an election, it is very
// likely that the target server is able to win the election. Note that raft
// protocol version 3 is not sufficient to use LeadershipTransfer. A recent
// version of that library has to be used that includes this feature. Using
// transfer leadership is safe however in a cluster where not every node has
// the latest version. If a follower cannot be promoted, it will fail
// gracefully.
func (r *Raft) LeadershipTransfer() Future {
if r.protocolVersion < 3 {
hanshasselberg marked this conversation as resolved.
Show resolved Hide resolved
return errorFuture{ErrUnsupportedProtocol}
}

return r.initiateLeadershipTransfer(nil, nil)
}

// LeadershipTransferToServer does the same as LeadershipTransfer but takes a
// server in the arguments in case a leadership should be transitioned to a
// specific server in the cluster. Note that raft protocol version 3 is not
// sufficient to use LeadershipTransfer. A recent version of that library has
// to be used that includes this feature. Using transfer leadership is safe
// however in a cluster where not every node has the latest version. If a
// follower cannot be promoted, it will fail gracefully.
func (r *Raft) LeadershipTransferToServer(id ServerID, address ServerAddress) Future {
if r.protocolVersion < 3 {
return errorFuture{ErrUnsupportedProtocol}
}

return r.initiateLeadershipTransfer(&id, &address)
}
26 changes: 26 additions & 0 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ type RequestVoteRequest struct {
// Used to ensure safety
LastLogIndex uint64
LastLogTerm uint64

// Used to indicate to peers if this vote was triggered by a leadership
// transfer. It is required for leadership transfer to work, because servers
// wouldn't vote otherwise if they are aware of an existing leader.
LeadershipTransfer bool
}

// See WithRPCHeader.
Expand Down Expand Up @@ -149,3 +154,24 @@ type InstallSnapshotResponse struct {
func (r *InstallSnapshotResponse) GetRPCHeader() RPCHeader {
return r.RPCHeader
}

// TimeoutNowRequest is the command used by a leader to signal another server to
// start an election.
type TimeoutNowRequest struct {
RPCHeader
}

// See WithRPCHeader.
func (r *TimeoutNowRequest) GetRPCHeader() RPCHeader {
return r.RPCHeader
}

// TimeoutNowResponse is the response to TimeoutNowRequest.
type TimeoutNowResponse struct {
RPCHeader
}

// See WithRPCHeader.
func (r *TimeoutNowResponse) GetRPCHeader() RPCHeader {
return r.RPCHeader
}
15 changes: 15 additions & 0 deletions future.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ type SnapshotFuture interface {
Open() (*SnapshotMeta, io.ReadCloser, error)
}

// LeadershipTransferFuture is used for waiting on a user-triggered leadership
// transfer to complete.
type LeadershipTransferFuture interface {
Future
}

// errorFuture is used to return a static error.
type errorFuture struct {
err error
Expand Down Expand Up @@ -227,6 +233,15 @@ type verifyFuture struct {
voteLock sync.Mutex
}

// leadershipTransferFuture is used to track the progress of a leadership
// transfer internally.
type leadershipTransferFuture struct {
deferError

ID *ServerID
Address *ServerAddress
}

// configurationsFuture is used to retrieve the current configurations. This is
// used to allow safe access to this information outside of the main thread.
type configurationsFuture struct {
Expand Down
54 changes: 37 additions & 17 deletions fuzzy/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,34 +205,54 @@ func (c *cluster) appliedIndexes() map[string]uint64 {
return r
}

func (c *cluster) ApplyN(t *testing.T, leaderTimeout time.Duration, s *applySource, n uint) uint64 {
f := make([]raft.ApplyFuture, n)
func (c *cluster) generateNApplies(s *applySource, n uint) [][]byte {
data := make([][]byte, n)
startTime := time.Now()
endTime := startTime.Add(leaderTimeout)
for i := uint(0); i < n; i++ {
ldr := c.Leader(endTime.Sub(time.Now()))
if ldr != nil {
data[i] = s.nextEntry()
f[i] = ldr.raft.Apply(data[i], time.Second)
}
data[i] = s.nextEntry()
}
return data
}

func (c *cluster) leadershipTransfer(leaderTimeout time.Duration) raft.Future {
ldr := c.Leader(leaderTimeout)
return ldr.raft.LeadershipTransfer()
}

type applyFutureWithData struct {
future raft.ApplyFuture
data []byte
}

func (c *cluster) sendNApplies(leaderTimeout time.Duration, data [][]byte) []applyFutureWithData {
f := []applyFutureWithData{}

ldr := c.Leader(leaderTimeout)
for _, d := range data {
f = append(f, applyFutureWithData{future: ldr.raft.Apply(d, time.Second), data: d})
}
return f
}

func (c *cluster) checkApplyFutures(futures []applyFutureWithData) uint64 {
success := uint64(0)
for i := uint(0); i < n; i++ {
if f[i] == nil {
continue
}
if err := f[i].Error(); err == nil {
for _, a := range futures {
if err := a.future.Error(); err == nil {
success++
c.lastApplySuccess = f[i]
c.applied = append(c.applied, appliedItem{f[i].Index(), data[i]})
c.lastApplySuccess = a.future
c.applied = append(c.applied, appliedItem{a.future.Index(), a.data})
} else {
c.lastApplyFailure = f[i]
c.lastApplyFailure = a.future
}
}
return success
}

func (c *cluster) ApplyN(t *testing.T, leaderTimeout time.Duration, s *applySource, n uint) uint64 {
data := c.generateNApplies(s, n)
futures := c.sendNApplies(leaderTimeout, data)
return c.checkApplyFutures(futures)
}

func (c *cluster) VerifyFSM(t *testing.T) {
exp := c.nodes[0].fsm
expName := c.nodes[0].name
Expand Down
73 changes: 73 additions & 0 deletions fuzzy/leadershiptransfer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package fuzzy

import (
"math/rand"
"testing"
"time"

"github.com/hashicorp/raft"
)

// 5 node cluster
func TestRaft_FuzzyLeadershipTransfer(t *testing.T) {
cluster := newRaftCluster(t, testLogWriter, "lt", 5, nil)
r := rand.New(rand.NewSource(time.Now().UnixNano()))

s := newApplySource("LeadershipTransfer")
data := cluster.generateNApplies(s, uint(r.Intn(10000)))
futures := cluster.sendNApplies(time.Minute, data)
cluster.leadershipTransfer(time.Minute)

data = cluster.generateNApplies(s, uint(r.Intn(10000)))
futures = append(futures, cluster.sendNApplies(time.Minute, data)...)
cluster.leadershipTransfer(time.Minute)

data = cluster.generateNApplies(s, uint(r.Intn(10000)))
futures = append(futures, cluster.sendNApplies(time.Minute, data)...)
cluster.leadershipTransfer(time.Minute)

data = cluster.generateNApplies(s, uint(r.Intn(10000)))
futures = append(futures, cluster.sendNApplies(time.Minute, data)...)

ac := cluster.checkApplyFutures(futures)

cluster.Stop(t, time.Minute)
cluster.VerifyLog(t, ac)
cluster.VerifyFSM(t)
}

type LeadershipTransferMode int

type LeadershipTransfer struct {
verifier appendEntriesVerifier
slowNodes map[string]bool
delayMin time.Duration
delayMax time.Duration
mode LeadershipTransferMode
}

func (lt *LeadershipTransfer) Report(t *testing.T) {
lt.verifier.Report(t)
}

func (lt *LeadershipTransfer) PreRPC(s, t string, r *raft.RPC) error {
return nil
}

func (lt *LeadershipTransfer) nap() {
d := lt.delayMin + time.Duration(rand.Int63n((lt.delayMax - lt.delayMin).Nanoseconds()))
time.Sleep(d)
}

func (lt *LeadershipTransfer) PostRPC(src, target string, r *raft.RPC, res *raft.RPCResponse) error {
return nil
}

func (lt *LeadershipTransfer) PreRequestVote(src, target string, v *raft.RequestVoteRequest) (*raft.RequestVoteResponse, error) {
return nil, nil
}

func (lt *LeadershipTransfer) PreAppendEntries(src, target string, v *raft.AppendEntriesRequest) (*raft.AppendEntriesResponse, error) {
lt.verifier.PreAppendEntries(src, target, v)
return nil, nil
}
11 changes: 11 additions & 0 deletions fuzzy/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,15 @@ func (t *transport) sendRPC(target string, req interface{}, resp interface{}) er
}
rpc := raft.RPC{RespChan: rc}
var reqVote raft.RequestVoteRequest
var timeoutNow raft.TimeoutNowRequest
var appEnt raft.AppendEntriesRequest
dec := codec.NewDecoderBytes(buff.Bytes(), &codecHandle)
switch req.(type) {
case *raft.TimeoutNowRequest:
if err := dec.Decode(&timeoutNow); err != nil {
return err
}
rpc.Command = &timeoutNow
case *raft.RequestVoteRequest:
if err := dec.Decode(&reqVote); err != nil {
return err
Expand Down Expand Up @@ -166,6 +172,11 @@ func (t *transport) sendRPC(target string, req interface{}, resp interface{}) er
return result.Error
}

// TimeoutNow implements the Transport interface.
func (t *transport) TimeoutNow(id raft.ServerID, target raft.ServerAddress, args *raft.TimeoutNowRequest, resp *raft.TimeoutNowResponse) error {
return t.sendRPC(string(target), args, resp)
}

// AppendEntries sends the appropriate RPC to the target node.
func (t *transport) AppendEntries(id raft.ServerID, target raft.ServerAddress, args *raft.AppendEntriesRequest, resp *raft.AppendEntriesResponse) error {
ae := appendEntries{
Expand Down
13 changes: 13 additions & 0 deletions inmem_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,19 @@ func (i *InmemTransport) InstallSnapshot(id ServerID, target ServerAddress, args
return nil
}

// TimeoutNow implements the Transport interface.
func (i *InmemTransport) TimeoutNow(id ServerID, target ServerAddress, args *TimeoutNowRequest, resp *TimeoutNowResponse) error {
rpcResp, err := i.makeRPC(target, args, nil, 10*i.timeout)
if err != nil {
return err
}

// Copy the result back
out := rpcResp.Response.(*TimeoutNowResponse)
*resp = *out
return nil
}

func (i *InmemTransport) makeRPC(target ServerAddress, args interface{}, r io.Reader, timeout time.Duration) (rpcResp RPCResponse, err error) {
i.RLock()
peer, ok := i.peers[target]
Expand Down
Loading