Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: let raft step return error when proposal is dropped to allow fail-fast #9067

Merged
merged 1 commit into from
Jan 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ func TestNodeStepUnblock(t *testing.T) {
// TestNodePropose ensures that node.Propose sends the given proposal to the underlying raft.
func TestNodePropose(t *testing.T) {
msgs := []raftpb.Message{}
appendStep := func(r *raft, m raftpb.Message) {
appendStep := func(r *raft, m raftpb.Message) error {
msgs = append(msgs, m)
return nil
}

n := newNode()
Expand Down Expand Up @@ -147,8 +148,9 @@ func TestNodePropose(t *testing.T) {
// It also ensures that ReadState can be read out through ready chan.
func TestNodeReadIndex(t *testing.T) {
msgs := []raftpb.Message{}
appendStep := func(r *raft, m raftpb.Message) {
appendStep := func(r *raft, m raftpb.Message) error {
msgs = append(msgs, m)
return nil
}
wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}

Expand Down Expand Up @@ -284,8 +286,9 @@ func TestNodeReadIndexToOldLeader(t *testing.T) {
// to the underlying raft.
func TestNodeProposeConfig(t *testing.T) {
msgs := []raftpb.Message{}
appendStep := func(r *raft, m raftpb.Message) {
appendStep := func(r *raft, m raftpb.Message) error {
msgs = append(msgs, m)
return nil
}

n := newNode()
Expand Down
60 changes: 35 additions & 25 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ const (
campaignTransfer CampaignType = "CampaignTransfer"
)

// ErrProposalDropped is returned when the proposal is ignored by some cases,
// so that the proposer can be notified and fail fast.
var ErrProposalDropped = errors.New("raft proposal dropped")

// lockedRand is a small wrapper around rand.Rand to provide
// synchronization. Only the methods needed by the code are exposed
// (e.g. Intn).
Expand Down Expand Up @@ -872,25 +876,28 @@ func (r *raft) Step(m pb.Message) error {
}

default:
r.step(r, m)
err := r.step(r, m)
if err != nil {
return err
}
}
return nil
}

type stepFunc func(r *raft, m pb.Message)
type stepFunc func(r *raft, m pb.Message) error

func stepLeader(r *raft, m pb.Message) {
func stepLeader(r *raft, m pb.Message) error {
// These message types do not require any progress for m.From.
switch m.Type {
case pb.MsgBeat:
r.bcastHeartbeat()
return
return nil
case pb.MsgCheckQuorum:
if !r.checkQuorumActive() {
r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
r.becomeFollower(r.Term, None)
}
return
return nil
case pb.MsgProp:
if len(m.Entries) == 0 {
r.logger.Panicf("%x stepped empty MsgProp", r.id)
Expand All @@ -899,11 +906,11 @@ func stepLeader(r *raft, m pb.Message) {
// If we are not currently a member of the range (i.e. this node
// was removed from the configuration while serving as leader),
// drop any new proposals.
return
return ErrProposalDropped
}
if r.leadTransferee != None {
r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
return
return ErrProposalDropped
}

for i, e := range m.Entries {
Expand All @@ -919,12 +926,12 @@ func stepLeader(r *raft, m pb.Message) {
}
r.appendEntry(m.Entries...)
r.bcastAppend()
return
return nil
case pb.MsgReadIndex:
if r.quorum() > 1 {
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
// Reject read only request when this leader has not committed any log entry at its term.
return
return nil
}

// thinking: use an interally defined context instead of the user given context.
Expand All @@ -946,14 +953,14 @@ func stepLeader(r *raft, m pb.Message) {
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
}

return
return nil
}

// All other message types require a progress for m.From (pr).
pr := r.getProgress(m.From)
if pr == nil {
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
return
return nil
}
switch m.Type {
case pb.MsgAppResp:
Expand Down Expand Up @@ -1009,12 +1016,12 @@ func stepLeader(r *raft, m pb.Message) {
}

if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
return
return nil
}

ackCount := r.readOnly.recvAck(m)
if ackCount < r.quorum() {
return
return nil
}

rss := r.readOnly.advance(m)
Expand All @@ -1028,7 +1035,7 @@ func stepLeader(r *raft, m pb.Message) {
}
case pb.MsgSnapStatus:
if pr.State != ProgressStateSnapshot {
return
return nil
}
if !m.Reject {
pr.becomeProbe()
Expand All @@ -1052,22 +1059,22 @@ func stepLeader(r *raft, m pb.Message) {
case pb.MsgTransferLeader:
if pr.IsLearner {
r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id)
return
return nil
}
leadTransferee := m.From
lastLeadTransferee := r.leadTransferee
if lastLeadTransferee != None {
if lastLeadTransferee == leadTransferee {
r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
r.id, r.Term, leadTransferee, leadTransferee)
return
return nil
}
r.abortLeaderTransfer()
r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)
}
if leadTransferee == r.id {
r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)
return
return nil
}
// Transfer leadership to third party.
r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee)
Expand All @@ -1081,11 +1088,12 @@ func stepLeader(r *raft, m pb.Message) {
r.sendAppend(leadTransferee)
}
}
return nil
}

// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
// whether they respond to MsgVoteResp or MsgPreVoteResp.
func stepCandidate(r *raft, m pb.Message) {
func stepCandidate(r *raft, m pb.Message) error {
// Only handle vote responses corresponding to our candidacy (while in
// StateCandidate, we may get stale MsgPreVoteResp messages in this term from
// our pre-candidate state).
Expand All @@ -1098,7 +1106,7 @@ func stepCandidate(r *raft, m pb.Message) {
switch m.Type {
case pb.MsgProp:
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return
return ErrProposalDropped
case pb.MsgApp:
r.becomeFollower(r.Term, m.From)
r.handleAppendEntries(m)
Expand All @@ -1125,17 +1133,18 @@ func stepCandidate(r *raft, m pb.Message) {
case pb.MsgTimeoutNow:
r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
}
return nil
}

func stepFollower(r *raft, m pb.Message) {
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgProp:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return
return ErrProposalDropped
} else if r.disableProposalForwarding {
r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
return
return ErrProposalDropped
}
m.To = r.lead
r.send(m)
Expand All @@ -1154,7 +1163,7 @@ func stepFollower(r *raft, m pb.Message) {
case pb.MsgTransferLeader:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
return
return nil
}
m.To = r.lead
r.send(m)
Expand All @@ -1171,17 +1180,18 @@ func stepFollower(r *raft, m pb.Message) {
case pb.MsgReadIndex:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
return
return nil
}
m.To = r.lead
r.send(m)
case pb.MsgReadIndexResp:
if len(m.Entries) != 1 {
r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
return
return nil
}
r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
}
return nil
}

func (r *raft) handleAppendEntries(m pb.Message) {
Expand Down
3 changes: 2 additions & 1 deletion raft/raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ func testUpdateTermFromMessage(t *testing.T, state StateType) {
// Reference: section 5.1
func TestRejectStaleTermMessage(t *testing.T) {
called := false
fakeStep := func(r *raft, m pb.Message) {
fakeStep := func(r *raft, m pb.Message) error {
called = true
return nil
}
r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
r.step = fakeStep
Expand Down
7 changes: 6 additions & 1 deletion raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1231,8 +1231,9 @@ func TestPastElectionTimeout(t *testing.T) {
// actual stepX function.
func TestStepIgnoreOldTermMsg(t *testing.T) {
called := false
fakeStep := func(r *raft, m pb.Message) {
fakeStep := func(r *raft, m pb.Message) error {
called = true
return nil
}
sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
sm.step = fakeStep
Expand Down Expand Up @@ -3219,6 +3220,10 @@ func TestLeaderTransferIgnoreProposal(t *testing.T) {
}

nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
err := lead.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
if err != ErrProposalDropped {
t.Fatalf("should return drop proposal error while transferring")
}

if lead.prs[1].Match != 1 {
t.Fatalf("node 1 has match %x, want %x", lead.prs[1].Match, 1)
Expand Down
3 changes: 2 additions & 1 deletion raft/rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,9 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) {
// to the underlying raft. It also ensures that ReadState can be read out.
func TestRawNodeReadIndex(t *testing.T) {
msgs := []raftpb.Message{}
appendStep := func(r *raft, m raftpb.Message) {
appendStep := func(r *raft, m raftpb.Message) error {
msgs = append(msgs, m)
return nil
}
wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}

Expand Down