Skip to content

Commit

Permalink
Merge pull request #9345 from gyuho/raft-step-candidate
Browse files Browse the repository at this point in the history
raft: clarify candidate message handling, test candidate to follower transition with message from leader
  • Loading branch information
gyuho authored Feb 25, 2018
2 parents 4fd378e + 01db389 commit 2dd1a99
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 4 deletions.
8 changes: 5 additions & 3 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1128,13 +1128,13 @@ func stepCandidate(r *raft, m pb.Message) error {
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return ErrProposalDropped
case pb.MsgApp:
r.becomeFollower(r.Term, m.From)
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleAppendEntries(m)
case pb.MsgHeartbeat:
r.becomeFollower(r.Term, m.From)
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleHeartbeat(m)
case pb.MsgSnap:
r.becomeFollower(m.Term, m.From)
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleSnapshot(m)
case myVoteRespType:
gr := r.poll(m.From, m.Type, !m.Reject)
Expand All @@ -1148,6 +1148,8 @@ func stepCandidate(r *raft, m pb.Message) error {
r.bcastAppend()
}
case len(r.votes) - gr:
// pb.MsgPreVoteResp contains future term of pre-candidate
// m.Term > r.Term; reuse r.Term
r.becomeFollower(r.Term, None)
}
case pb.MsgTimeoutNow:
Expand Down
2 changes: 1 addition & 1 deletion raft/raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func TestLeaderElectionInOneRoundRPC(t *testing.T) {

r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
for id, vote := range tt.votes {
r.Step(pb.Message{From: id, To: 1, Type: pb.MsgVoteResp, Reject: !vote})
r.Step(pb.Message{From: id, To: 1, Term: r.Term, Type: pb.MsgVoteResp, Reject: !vote})
}

if r.state != tt.state {
Expand Down
68 changes: 68 additions & 0 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1697,6 +1697,74 @@ func TestAllServerStepdown(t *testing.T) {
}
}

func TestCandidateResetTermMsgHeartbeat(t *testing.T) {
testCandidateResetTerm(t, pb.MsgHeartbeat)
}

func TestCandidateResetTermMsgApp(t *testing.T) {
testCandidateResetTerm(t, pb.MsgApp)
}

// testCandidateResetTerm tests when a candidate receives a
// MsgHeartbeat or MsgApp from leader, "Step" resets the term
// with leader's and reverts back to follower.
func testCandidateResetTerm(t *testing.T, mt pb.MessageType) {
a := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
b := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
c := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())

nt := newNetwork(a, b, c)

nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
if a.state != StateLeader {
t.Errorf("state = %s, want %s", a.state, StateLeader)
}
if b.state != StateFollower {
t.Errorf("state = %s, want %s", b.state, StateFollower)
}
if c.state != StateFollower {
t.Errorf("state = %s, want %s", c.state, StateFollower)
}

// isolate 3 and increase term in rest
nt.isolate(3)

nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})

if a.state != StateLeader {
t.Errorf("state = %s, want %s", a.state, StateLeader)
}
if b.state != StateFollower {
t.Errorf("state = %s, want %s", b.state, StateFollower)
}

// trigger campaign in isolated c
c.resetRandomizedElectionTimeout()
for i := 0; i < c.randomizedElectionTimeout; i++ {
c.tick()
}

if c.state != StateCandidate {
t.Errorf("state = %s, want %s", c.state, StateCandidate)
}

nt.recover()

// leader sends to isolated candidate
// and expects candidate to revert to follower
nt.send(pb.Message{From: 1, To: 3, Term: a.Term, Type: mt})

if c.state != StateFollower {
t.Errorf("state = %s, want %s", c.state, StateFollower)
}

// follower c term is reset with leader's
if a.Term != c.Term {
t.Errorf("follower term expected same term as leader's %d, got %d", a.Term, c.Term)
}
}

func TestLeaderStepdownWhenQuorumActive(t *testing.T) {
sm := newTestRaft(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage())

Expand Down

0 comments on commit 2dd1a99

Please sign in to comment.