From 30ced5b2be588a90dc782f4ac7fb6c4aca0fd437 Mon Sep 17 00:00:00 2001 From: Vincent Lee Date: Thu, 11 Jan 2018 12:43:55 +0800 Subject: [PATCH] raft: let raft step return error when proposal is dropped to allow fail-fast. --- raft/node_test.go | 9 ++++--- raft/raft.go | 60 ++++++++++++++++++++++++----------------- raft/raft_paper_test.go | 3 ++- raft/raft_test.go | 7 ++++- raft/rawnode_test.go | 3 ++- 5 files changed, 51 insertions(+), 31 deletions(-) diff --git a/raft/node_test.go b/raft/node_test.go index ef0c92ad145..82c3ade37a2 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -109,8 +109,9 @@ func TestNodeStepUnblock(t *testing.T) { // TestNodePropose ensures that node.Propose sends the given proposal to the underlying raft. func TestNodePropose(t *testing.T) { msgs := []raftpb.Message{} - appendStep := func(r *raft, m raftpb.Message) { + appendStep := func(r *raft, m raftpb.Message) error { msgs = append(msgs, m) + return nil } n := newNode() @@ -147,8 +148,9 @@ func TestNodePropose(t *testing.T) { // It also ensures that ReadState can be read out through ready chan. func TestNodeReadIndex(t *testing.T) { msgs := []raftpb.Message{} - appendStep := func(r *raft, m raftpb.Message) { + appendStep := func(r *raft, m raftpb.Message) error { msgs = append(msgs, m) + return nil } wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}} @@ -284,8 +286,9 @@ func TestNodeReadIndexToOldLeader(t *testing.T) { // to the underlying raft. func TestNodeProposeConfig(t *testing.T) { msgs := []raftpb.Message{} - appendStep := func(r *raft, m raftpb.Message) { + appendStep := func(r *raft, m raftpb.Message) error { msgs = append(msgs, m) + return nil } n := newNode() diff --git a/raft/raft.go b/raft/raft.go index b9939fae092..dc0f2f0ecf1 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -67,6 +67,10 @@ const ( campaignTransfer CampaignType = "CampaignTransfer" ) +// ErrProposalDropped is returned when the proposal is ignored by some cases, +// so that the proposer can be notified and fail fast. +var ErrProposalDropped = errors.New("raft proposal dropped") + // lockedRand is a small wrapper around rand.Rand to provide // synchronization. Only the methods needed by the code are exposed // (e.g. Intn). @@ -872,25 +876,28 @@ func (r *raft) Step(m pb.Message) error { } default: - r.step(r, m) + err := r.step(r, m) + if err != nil { + return err + } } return nil } -type stepFunc func(r *raft, m pb.Message) +type stepFunc func(r *raft, m pb.Message) error -func stepLeader(r *raft, m pb.Message) { +func stepLeader(r *raft, m pb.Message) error { // These message types do not require any progress for m.From. switch m.Type { case pb.MsgBeat: r.bcastHeartbeat() - return + return nil case pb.MsgCheckQuorum: if !r.checkQuorumActive() { r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id) r.becomeFollower(r.Term, None) } - return + return nil case pb.MsgProp: if len(m.Entries) == 0 { r.logger.Panicf("%x stepped empty MsgProp", r.id) @@ -899,11 +906,11 @@ func stepLeader(r *raft, m pb.Message) { // If we are not currently a member of the range (i.e. this node // was removed from the configuration while serving as leader), // drop any new proposals. - return + return ErrProposalDropped } if r.leadTransferee != None { r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee) - return + return ErrProposalDropped } for i, e := range m.Entries { @@ -919,12 +926,12 @@ func stepLeader(r *raft, m pb.Message) { } r.appendEntry(m.Entries...) r.bcastAppend() - return + return nil case pb.MsgReadIndex: if r.quorum() > 1 { if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term { // Reject read only request when this leader has not committed any log entry at its term. - return + return nil } // thinking: use an interally defined context instead of the user given context. @@ -946,14 +953,14 @@ func stepLeader(r *raft, m pb.Message) { r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data}) } - return + return nil } // All other message types require a progress for m.From (pr). pr := r.getProgress(m.From) if pr == nil { r.logger.Debugf("%x no progress available for %x", r.id, m.From) - return + return nil } switch m.Type { case pb.MsgAppResp: @@ -1009,12 +1016,12 @@ func stepLeader(r *raft, m pb.Message) { } if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 { - return + return nil } ackCount := r.readOnly.recvAck(m) if ackCount < r.quorum() { - return + return nil } rss := r.readOnly.advance(m) @@ -1028,7 +1035,7 @@ func stepLeader(r *raft, m pb.Message) { } case pb.MsgSnapStatus: if pr.State != ProgressStateSnapshot { - return + return nil } if !m.Reject { pr.becomeProbe() @@ -1052,7 +1059,7 @@ func stepLeader(r *raft, m pb.Message) { case pb.MsgTransferLeader: if pr.IsLearner { r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id) - return + return nil } leadTransferee := m.From lastLeadTransferee := r.leadTransferee @@ -1060,14 +1067,14 @@ func stepLeader(r *raft, m pb.Message) { if lastLeadTransferee == leadTransferee { r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x", r.id, r.Term, leadTransferee, leadTransferee) - return + return nil } r.abortLeaderTransfer() r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee) } if leadTransferee == r.id { r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id) - return + return nil } // Transfer leadership to third party. r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee) @@ -1081,11 +1088,12 @@ func stepLeader(r *raft, m pb.Message) { r.sendAppend(leadTransferee) } } + return nil } // stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is // whether they respond to MsgVoteResp or MsgPreVoteResp. -func stepCandidate(r *raft, m pb.Message) { +func stepCandidate(r *raft, m pb.Message) error { // Only handle vote responses corresponding to our candidacy (while in // StateCandidate, we may get stale MsgPreVoteResp messages in this term from // our pre-candidate state). @@ -1098,7 +1106,7 @@ func stepCandidate(r *raft, m pb.Message) { switch m.Type { case pb.MsgProp: r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) - return + return ErrProposalDropped case pb.MsgApp: r.becomeFollower(r.Term, m.From) r.handleAppendEntries(m) @@ -1125,17 +1133,18 @@ func stepCandidate(r *raft, m pb.Message) { case pb.MsgTimeoutNow: r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From) } + return nil } -func stepFollower(r *raft, m pb.Message) { +func stepFollower(r *raft, m pb.Message) error { switch m.Type { case pb.MsgProp: if r.lead == None { r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) - return + return ErrProposalDropped } else if r.disableProposalForwarding { r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term) - return + return ErrProposalDropped } m.To = r.lead r.send(m) @@ -1154,7 +1163,7 @@ func stepFollower(r *raft, m pb.Message) { case pb.MsgTransferLeader: if r.lead == None { r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term) - return + return nil } m.To = r.lead r.send(m) @@ -1171,17 +1180,18 @@ func stepFollower(r *raft, m pb.Message) { case pb.MsgReadIndex: if r.lead == None { r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term) - return + return nil } m.To = r.lead r.send(m) case pb.MsgReadIndexResp: if len(m.Entries) != 1 { r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries)) - return + return nil } r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data}) } + return nil } func (r *raft) handleAppendEntries(m pb.Message) { diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index 2911e8aa333..71a7d14ace1 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -79,8 +79,9 @@ func testUpdateTermFromMessage(t *testing.T, state StateType) { // Reference: section 5.1 func TestRejectStaleTermMessage(t *testing.T) { called := false - fakeStep := func(r *raft, m pb.Message) { + fakeStep := func(r *raft, m pb.Message) error { called = true + return nil } r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) r.step = fakeStep diff --git a/raft/raft_test.go b/raft/raft_test.go index b8ef596ea57..e8be5e645c4 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1231,8 +1231,9 @@ func TestPastElectionTimeout(t *testing.T) { // actual stepX function. func TestStepIgnoreOldTermMsg(t *testing.T) { called := false - fakeStep := func(r *raft, m pb.Message) { + fakeStep := func(r *raft, m pb.Message) error { called = true + return nil } sm := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) sm.step = fakeStep @@ -3219,6 +3220,10 @@ func TestLeaderTransferIgnoreProposal(t *testing.T) { } nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) + err := lead.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}}) + if err != ErrProposalDropped { + t.Fatalf("should return drop proposal error while transferring") + } if lead.prs[1].Match != 1 { t.Fatalf("node 1 has match %x, want %x", lead.prs[1].Match, 1) diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 4ccf72de45a..f034521be57 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -190,8 +190,9 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) { // to the underlying raft. It also ensures that ReadState can be read out. func TestRawNodeReadIndex(t *testing.T) { msgs := []raftpb.Message{} - appendStep := func(r *raft, m raftpb.Message) { + appendStep := func(r *raft, m raftpb.Message) error { msgs = append(msgs, m) + return nil } wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}