diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 9c4407ca7137..ad7d95630cd7 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -1606,6 +1606,18 @@ cache will already have moved on to newer entries. Measurement: "Messages", Unit: metric.Unit_COUNT, } + metaRaftRcvdFortifyLeader = metric.Metadata{ + Name: "raft.rcvd.fortifyleader", + Help: "Number of MsgFortifyLeader messages received by this store", + Measurement: "Messages", + Unit: metric.Unit_COUNT, + } + metaRaftRcvdFortifyLeaderResp = metric.Metadata{ + Name: "raft.rcvd.fortifyleaderresp", + Help: "Number of MsgFortifyLeaderResp messages received by this store", + Measurement: "Messages", + Unit: metric.Unit_COUNT, + } metaRaftRcvdDropped = metric.Metadata{ Name: "raft.rcvd.dropped", Help: "Number of incoming Raft messages dropped (due to queue length or size)", @@ -3522,18 +3534,20 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { // Raft message metrics. RaftRcvdMessages: [maxRaftMsgType + 1]*metric.Counter{ - raftpb.MsgProp: metric.NewCounter(metaRaftRcvdProp), - raftpb.MsgApp: metric.NewCounter(metaRaftRcvdApp), - raftpb.MsgAppResp: metric.NewCounter(metaRaftRcvdAppResp), - raftpb.MsgVote: metric.NewCounter(metaRaftRcvdVote), - raftpb.MsgVoteResp: metric.NewCounter(metaRaftRcvdVoteResp), - raftpb.MsgPreVote: metric.NewCounter(metaRaftRcvdPreVote), - raftpb.MsgPreVoteResp: metric.NewCounter(metaRaftRcvdPreVoteResp), - raftpb.MsgSnap: metric.NewCounter(metaRaftRcvdSnap), - raftpb.MsgHeartbeat: metric.NewCounter(metaRaftRcvdHeartbeat), - raftpb.MsgHeartbeatResp: metric.NewCounter(metaRaftRcvdHeartbeatResp), - raftpb.MsgTransferLeader: metric.NewCounter(metaRaftRcvdTransferLeader), - raftpb.MsgTimeoutNow: metric.NewCounter(metaRaftRcvdTimeoutNow), + raftpb.MsgProp: metric.NewCounter(metaRaftRcvdProp), + raftpb.MsgApp: metric.NewCounter(metaRaftRcvdApp), + raftpb.MsgAppResp: metric.NewCounter(metaRaftRcvdAppResp), + raftpb.MsgVote: metric.NewCounter(metaRaftRcvdVote), + raftpb.MsgVoteResp: metric.NewCounter(metaRaftRcvdVoteResp), + raftpb.MsgPreVote: metric.NewCounter(metaRaftRcvdPreVote), + raftpb.MsgPreVoteResp: metric.NewCounter(metaRaftRcvdPreVoteResp), + raftpb.MsgSnap: metric.NewCounter(metaRaftRcvdSnap), + raftpb.MsgHeartbeat: metric.NewCounter(metaRaftRcvdHeartbeat), + raftpb.MsgHeartbeatResp: metric.NewCounter(metaRaftRcvdHeartbeatResp), + raftpb.MsgTransferLeader: metric.NewCounter(metaRaftRcvdTransferLeader), + raftpb.MsgTimeoutNow: metric.NewCounter(metaRaftRcvdTimeoutNow), + raftpb.MsgFortifyLeader: metric.NewCounter(metaRaftRcvdFortifyLeader), + raftpb.MsgFortifyLeaderResp: metric.NewCounter(metaRaftRcvdFortifyLeaderResp), }, RaftRcvdDropped: metric.NewCounter(metaRaftRcvdDropped), RaftRcvdDroppedBytes: metric.NewCounter(metaRaftRcvdDroppedBytes), diff --git a/pkg/kv/kvserver/raft.go b/pkg/kv/kvserver/raft.go index 7abf3f57baa9..f23e170a3ebd 100644 --- a/pkg/kv/kvserver/raft.go +++ b/pkg/kv/kvserver/raft.go @@ -26,7 +26,7 @@ import ( ) // maxRaftMsgType is the maximum value in the raft.MessageType enum. -const maxRaftMsgType = raftpb.MsgForgetLeader +const maxRaftMsgType = raftpb.MsgFortifyLeaderResp func init() { for v := range raftpb.MessageType_name { diff --git a/pkg/raft/raft.go b/pkg/raft/raft.go index bdadf549a22a..303ea3286d77 100644 --- a/pkg/raft/raft.go +++ b/pkg/raft/raft.go @@ -353,6 +353,10 @@ type raft struct { // the leader id lead pb.PeerID + // leadEpoch, if set, corresponds to the StoreLiveness epoch that this peer + // has supported the leader in. It's unset if the peer hasn't supported the + // current leader. + // // TODO(arul): This should be populated when responding to a MsgFortify. leadEpoch pb.Epoch // leadTransferee is id of the leader transfer target when its value is not zero. @@ -669,6 +673,17 @@ func (r *raft) sendHeartbeat(to pb.PeerID) { pr.SentCommit(commit) } +// sendFortify sends a fortification RPC to the supplied follower. +func (r *raft) sendFortify(to pb.PeerID) { + if to == r.id { + // We handle the case where the leader is trying to fortify itself specially. + // Doing so avoids a self-addressed message. + // TODO(arul): do this handling. + return + } + r.send(pb.Message{To: to, Type: pb.MsgFortifyLeader}) +} + // bcastAppend sends RPC, with entries to all peers that are not up-to-date // according to the progress recorded in r.trk. func (r *raft) bcastAppend() { @@ -690,6 +705,15 @@ func (r *raft) bcastHeartbeat() { }) } +// bcastFortify sends an RPC to fortify the leader to all peers (including the leader itself). +func (r *raft) bcastFortify() { + Assert(r.state == StateLeader, "only leaders can fortify") + + r.trk.Visit(func(id pb.PeerID, _ *tracker.Progress) { + r.sendFortify(id) + }) +} + func (r *raft) appliedTo(index uint64, size entryEncodingSize) { oldApplied := r.raftLog.applied newApplied := max(index, oldApplied) @@ -1326,6 +1350,8 @@ func stepLeader(r *raft, m pb.Message) error { case pb.MsgForgetLeader: return nil // noop on leader + case pb.MsgFortifyLeaderResp: + r.handleFortifyResp(m) } // All other message types require a progress for m.From (pr). @@ -1610,6 +1636,9 @@ func stepCandidate(r *raft, m pb.Message) error { case pb.MsgSnap: r.becomeFollower(m.Term, m.From) // always m.Term == r.Term r.handleSnapshot(m) + case pb.MsgFortifyLeader: + r.becomeFollower(m.Term, m.From) // always m.Term == r.Term + r.handleFortify(m) case myVoteRespType: gr, rj, res := r.poll(m.From, m.Type, !m.Reject) r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj) @@ -1619,6 +1648,7 @@ func stepCandidate(r *raft, m pb.Message) error { r.campaign(campaignElection) } else { r.becomeLeader() + r.bcastFortify() r.bcastAppend() } case quorum.VoteLost: @@ -1664,6 +1694,8 @@ func stepFollower(r *raft, m pb.Message) error { r.electionElapsed = 0 r.lead = m.From r.handleSnapshot(m) + case pb.MsgFortifyLeader: + r.handleFortify(m) case pb.MsgTransferLeader: if r.lead == None { r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term) @@ -1829,6 +1861,20 @@ func (r *raft) handleSnapshot(m pb.Message) { } } +func (r *raft) handleFortify(m pb.Message) { + // TODO(arul): currently a no-op; implement. + r.send(pb.Message{ + To: m.From, + Type: pb.MsgFortifyLeaderResp, + Reject: true, + }) +} + +func (r *raft) handleFortifyResp(m pb.Message) { + Assert(r.state == StateLeader, "only leaders should be handling fortification responses") + Assert(m.Reject, "TODO(arul): implement") +} + // restore recovers the state machine from a snapshot. It restores the log and the // configuration of state machine. If this method returns false, the snapshot was // ignored, either because it was obsolete or because of an error. diff --git a/pkg/raft/raftpb/raft.proto b/pkg/raft/raftpb/raft.proto index 0f0e00b2c438..649034f06155 100644 --- a/pkg/raft/raftpb/raft.proto +++ b/pkg/raft/raftpb/raft.proto @@ -62,6 +62,8 @@ enum MessageType { MsgStorageApply = 21; MsgStorageApplyResp = 22; MsgForgetLeader = 23; + MsgFortifyLeader = 24; + MsgFortifyLeaderResp = 25; // NOTE: when adding new message types, remember to update the isLocalMsg and // isResponseMsg arrays in raft/util.go and update the corresponding tests in // raft/util_test.go. diff --git a/pkg/raft/testdata/async_storage_writes.txt b/pkg/raft/testdata/async_storage_writes.txt index 0fb01d1f1cf1..fde888e0c7a6 100644 --- a/pkg/raft/testdata/async_storage_writes.txt +++ b/pkg/raft/testdata/async_storage_writes.txt @@ -90,14 +90,18 @@ stabilize Messages: 1->2 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] 1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] + 1->2 MsgFortifyLeader Term:1 Log:0/0 + 1->3 MsgFortifyLeader Term:1 Log:0/0 1->AppendThread MsgStorageAppend Term:1 Log:1/11 Commit:10 Vote:1 Lead:1 Entries:[1/11 EntryNormal ""] Responses:[ 1->1 MsgAppResp Term:1 Log:0/11 AppendThread->1 MsgStorageAppendResp Term:0 Log:1/11 ] > 2 receiving messages 1->2 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] + 1->2 MsgFortifyLeader Term:1 Log:0/0 > 3 receiving messages 1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] + 1->3 MsgFortifyLeader Term:1 Log:0/0 > 1 processing append thread Processing: 1->AppendThread MsgStorageAppend Term:1 Log:1/11 Commit:10 Vote:1 Lead:1 Entries:[1/11 EntryNormal ""] @@ -110,6 +114,7 @@ stabilize Entries: 1/11 EntryNormal "" Messages: + 2->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0) 2->AppendThread MsgStorageAppend Term:1 Log:1/11 Commit:10 Vote:1 Lead:1 Entries:[1/11 EntryNormal ""] Responses:[ 2->1 MsgAppResp Term:1 Log:0/11 AppendThread->2 MsgStorageAppendResp Term:0 Log:1/11 @@ -120,6 +125,7 @@ stabilize Entries: 1/11 EntryNormal "" Messages: + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0) 3->AppendThread MsgStorageAppend Term:1 Log:1/11 Commit:10 Vote:1 Lead:1 Entries:[1/11 EntryNormal ""] Responses:[ 3->1 MsgAppResp Term:1 Log:0/11 AppendThread->3 MsgStorageAppendResp Term:0 Log:1/11 @@ -127,6 +133,8 @@ stabilize > 1 receiving messages 1->1 MsgAppResp Term:1 Log:0/11 AppendThread->1 MsgStorageAppendResp Term:0 Log:1/11 + 2->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0) + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0) > 2 processing append thread Processing: 2->AppendThread MsgStorageAppend Term:1 Log:1/11 Commit:10 Vote:1 Lead:1 Entries:[1/11 EntryNormal ""] diff --git a/pkg/raft/testdata/async_storage_writes_append_aba_race.txt b/pkg/raft/testdata/async_storage_writes_append_aba_race.txt index b19c94000ff3..07cb52d7c677 100644 --- a/pkg/raft/testdata/async_storage_writes_append_aba_race.txt +++ b/pkg/raft/testdata/async_storage_writes_append_aba_race.txt @@ -195,6 +195,12 @@ Messages: 3->5 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] 3->6 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] 3->7 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] +3->1 MsgFortifyLeader Term:2 Log:0/0 +3->2 MsgFortifyLeader Term:2 Log:0/0 +3->4 MsgFortifyLeader Term:2 Log:0/0 +3->5 MsgFortifyLeader Term:2 Log:0/0 +3->6 MsgFortifyLeader Term:2 Log:0/0 +3->7 MsgFortifyLeader Term:2 Log:0/0 3->AppendThread MsgStorageAppend Term:2 Log:2/12 Commit:11 Vote:3 Lead:3 Entries:[2/12 EntryNormal ""] Responses:[ 3->3 MsgAppResp Term:2 Log:0/12 AppendThread->3 MsgStorageAppendResp Term:0 Log:2/12 @@ -209,13 +215,19 @@ INFO 1 [logterm: 1, index: 12, vote: 0] rejected MsgVote from 3 [logterm: 1, ind 3->1 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] INFO found conflict at index 12 [existing term: 1, conflicting term: 2] INFO replace the unstable entries from index 12 +3->1 MsgFortifyLeader Term:2 Log:0/0 dropped: 3->2 MsgVote Term:2 Log:1/11 dropped: 3->2 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] +dropped: 3->2 MsgFortifyLeader Term:2 Log:0/0 dropped: 3->4 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] +dropped: 3->4 MsgFortifyLeader Term:2 Log:0/0 dropped: 3->5 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] +dropped: 3->5 MsgFortifyLeader Term:2 Log:0/0 dropped: 3->6 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] +dropped: 3->6 MsgFortifyLeader Term:2 Log:0/0 dropped: 3->7 MsgVote Term:2 Log:1/11 dropped: 3->7 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] +dropped: 3->7 MsgFortifyLeader Term:2 Log:0/0 process-ready 1 ---- @@ -224,6 +236,7 @@ HardState Term:2 Commit:11 Lead:3 Entries: 2/12 EntryNormal "" Messages: +1->3 MsgFortifyLeaderResp Term:2 Log:0/0 Rejected (Hint: 0) 1->AppendThread MsgStorageAppend Term:2 Log:2/12 Commit:11 Lead:3 Entries:[2/12 EntryNormal ""] Responses:[ 1->3 MsgVoteResp Term:2 Log:0/0 Rejected (Hint: 0) 1->3 MsgAppResp Term:2 Log:0/12 @@ -351,6 +364,12 @@ Messages: 4->5 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] 4->6 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] 4->7 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] +4->1 MsgFortifyLeader Term:3 Log:0/0 +4->2 MsgFortifyLeader Term:3 Log:0/0 +4->3 MsgFortifyLeader Term:3 Log:0/0 +4->5 MsgFortifyLeader Term:3 Log:0/0 +4->6 MsgFortifyLeader Term:3 Log:0/0 +4->7 MsgFortifyLeader Term:3 Log:0/0 4->AppendThread MsgStorageAppend Term:3 Log:3/12 Commit:11 Vote:4 Lead:4 Entries:[3/12 EntryNormal ""] Responses:[ 4->4 MsgAppResp Term:3 Log:0/12 AppendThread->4 MsgStorageAppendResp Term:0 Log:3/12 @@ -366,6 +385,7 @@ deliver-msgs drop=1 ---- dropped: 4->1 MsgVote Term:3 Log:1/11 dropped: 4->1 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] +dropped: 4->1 MsgFortifyLeader Term:3 Log:0/0 tick-heartbeat 4 ---- diff --git a/pkg/raft/testdata/campaign.txt b/pkg/raft/testdata/campaign.txt index 1f4b10e65d68..8e49da9bb92d 100644 --- a/pkg/raft/testdata/campaign.txt +++ b/pkg/raft/testdata/campaign.txt @@ -67,16 +67,21 @@ stabilize Messages: 1->2 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""] 1->3 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""] + 1->2 MsgFortifyLeader Term:1 Log:0/0 + 1->3 MsgFortifyLeader Term:1 Log:0/0 > 2 receiving messages 1->2 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""] + 1->2 MsgFortifyLeader Term:1 Log:0/0 > 3 receiving messages 1->3 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""] + 1->3 MsgFortifyLeader Term:1 Log:0/0 > 2 handling Ready Ready MustSync=true: HardState Term:1 Vote:1 Commit:2 Lead:1 Entries: 1/3 EntryNormal "" Messages: + 2->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0) 2->1 MsgAppResp Term:1 Log:0/3 > 3 handling Ready Ready MustSync=true: @@ -84,9 +89,12 @@ stabilize Entries: 1/3 EntryNormal "" Messages: + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0) 3->1 MsgAppResp Term:1 Log:0/3 > 1 receiving messages + 2->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0) 2->1 MsgAppResp Term:1 Log:0/3 + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0) 3->1 MsgAppResp Term:1 Log:0/3 > 1 handling Ready Ready MustSync=false: diff --git a/pkg/raft/testdata/campaign_learner_must_vote.txt b/pkg/raft/testdata/campaign_learner_must_vote.txt index e0510edec63f..1e2056609de4 100644 --- a/pkg/raft/testdata/campaign_learner_must_vote.txt +++ b/pkg/raft/testdata/campaign_learner_must_vote.txt @@ -102,15 +102,20 @@ stabilize 2 3 Messages: 2->1 MsgApp Term:2 Log:1/4 Commit:4 Entries:[2/5 EntryNormal ""] 2->3 MsgApp Term:2 Log:1/4 Commit:4 Entries:[2/5 EntryNormal ""] + 2->1 MsgFortifyLeader Term:2 Log:0/0 + 2->3 MsgFortifyLeader Term:2 Log:0/0 > 3 receiving messages 2->3 MsgApp Term:2 Log:1/4 Commit:4 Entries:[2/5 EntryNormal ""] DEBUG 3 [logterm: 0, index: 4] rejected MsgApp [logterm: 1, index: 4] from 2 + 2->3 MsgFortifyLeader Term:2 Log:0/0 > 3 handling Ready Ready MustSync=true: HardState Term:2 Vote:2 Commit:3 Lead:2 Messages: + 3->2 MsgFortifyLeaderResp Term:2 Log:0/0 Rejected (Hint: 0) 3->2 MsgAppResp Term:2 Log:1/4 Rejected (Hint: 3) > 2 receiving messages + 3->2 MsgFortifyLeaderResp Term:2 Log:0/0 Rejected (Hint: 0) 3->2 MsgAppResp Term:2 Log:1/4 Rejected (Hint: 3) DEBUG 2 received MsgAppResp(rejected, hint: (index 3, term 1)) from 3 for index 4 DEBUG 2 decreased progress of 3 to [StateProbe match=0 next=4] diff --git a/pkg/raft/testdata/checkquorum.txt b/pkg/raft/testdata/checkquorum.txt index 015e05943678..6cfa5777b180 100644 --- a/pkg/raft/testdata/checkquorum.txt +++ b/pkg/raft/testdata/checkquorum.txt @@ -182,18 +182,23 @@ stabilize Messages: 2->1 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] 2->3 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] + 2->1 MsgFortifyLeader Term:3 Log:0/0 + 2->3 MsgFortifyLeader Term:3 Log:0/0 > 1 receiving messages 2->1 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] + 2->1 MsgFortifyLeader Term:3 Log:0/0 > 3 receiving messages 2->3 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] INFO 3 [term: 1] received a MsgApp message with higher term from 2 [term: 3] INFO 3 became follower at term 3 + 2->3 MsgFortifyLeader Term:3 Log:0/0 > 1 handling Ready Ready MustSync=true: HardState Term:3 Vote:2 Commit:11 Lead:2 Entries: 3/12 EntryNormal "" Messages: + 1->2 MsgFortifyLeaderResp Term:3 Log:0/0 Rejected (Hint: 0) 1->2 MsgAppResp Term:3 Log:0/12 > 3 handling Ready Ready MustSync=true: @@ -201,9 +206,12 @@ stabilize Entries: 3/12 EntryNormal "" Messages: + 3->2 MsgFortifyLeaderResp Term:3 Log:0/0 Rejected (Hint: 0) 3->2 MsgAppResp Term:3 Log:0/12 > 2 receiving messages + 1->2 MsgFortifyLeaderResp Term:3 Log:0/0 Rejected (Hint: 0) 1->2 MsgAppResp Term:3 Log:0/12 + 3->2 MsgFortifyLeaderResp Term:3 Log:0/0 Rejected (Hint: 0) 3->2 MsgAppResp Term:3 Log:0/12 > 2 handling Ready Ready MustSync=false: diff --git a/pkg/raft/testdata/confchange_v2_replace_leader.txt b/pkg/raft/testdata/confchange_v2_replace_leader.txt index 44c16fc83668..2232b568d5de 100644 --- a/pkg/raft/testdata/confchange_v2_replace_leader.txt +++ b/pkg/raft/testdata/confchange_v2_replace_leader.txt @@ -233,18 +233,25 @@ stabilize 4->1 MsgApp Term:2 Log:1/4 Commit:4 Entries:[2/5 EntryNormal ""] 4->2 MsgApp Term:2 Log:1/4 Commit:4 Entries:[2/5 EntryNormal ""] 4->3 MsgApp Term:2 Log:1/4 Commit:4 Entries:[2/5 EntryNormal ""] + 4->1 MsgFortifyLeader Term:2 Log:0/0 + 4->2 MsgFortifyLeader Term:2 Log:0/0 + 4->3 MsgFortifyLeader Term:2 Log:0/0 > 1 receiving messages 4->1 MsgApp Term:2 Log:1/4 Commit:4 Entries:[2/5 EntryNormal ""] + 4->1 MsgFortifyLeader Term:2 Log:0/0 > 2 receiving messages 4->2 MsgApp Term:2 Log:1/4 Commit:4 Entries:[2/5 EntryNormal ""] + 4->2 MsgFortifyLeader Term:2 Log:0/0 > 3 receiving messages 4->3 MsgApp Term:2 Log:1/4 Commit:4 Entries:[2/5 EntryNormal ""] + 4->3 MsgFortifyLeader Term:2 Log:0/0 > 1 handling Ready Ready MustSync=true: HardState Term:2 Vote:4 Commit:4 Lead:4 Entries: 2/5 EntryNormal "" Messages: + 1->4 MsgFortifyLeaderResp Term:2 Log:0/0 Rejected (Hint: 0) 1->4 MsgAppResp Term:2 Log:0/5 > 2 handling Ready Ready MustSync=true: @@ -252,6 +259,7 @@ stabilize Entries: 2/5 EntryNormal "" Messages: + 2->4 MsgFortifyLeaderResp Term:2 Log:0/0 Rejected (Hint: 0) 2->4 MsgAppResp Term:2 Log:0/5 > 3 handling Ready Ready MustSync=true: @@ -259,10 +267,14 @@ stabilize Entries: 2/5 EntryNormal "" Messages: + 3->4 MsgFortifyLeaderResp Term:2 Log:0/0 Rejected (Hint: 0) 3->4 MsgAppResp Term:2 Log:0/5 > 4 receiving messages + 1->4 MsgFortifyLeaderResp Term:2 Log:0/0 Rejected (Hint: 0) 1->4 MsgAppResp Term:2 Log:0/5 + 2->4 MsgFortifyLeaderResp Term:2 Log:0/0 Rejected (Hint: 0) 2->4 MsgAppResp Term:2 Log:0/5 + 3->4 MsgFortifyLeaderResp Term:2 Log:0/0 Rejected (Hint: 0) 3->4 MsgAppResp Term:2 Log:0/5 > 4 handling Ready Ready MustSync=false: diff --git a/pkg/raft/testdata/prevote.txt b/pkg/raft/testdata/prevote.txt index 0c6ff0e2a56e..70ab0dc371a5 100644 --- a/pkg/raft/testdata/prevote.txt +++ b/pkg/raft/testdata/prevote.txt @@ -215,16 +215,21 @@ stabilize Messages: 2->1 MsgApp Term:2 Log:1/12 Commit:12 Entries:[2/13 EntryNormal ""] 2->3 MsgApp Term:2 Log:1/12 Commit:12 Entries:[2/13 EntryNormal ""] + 2->1 MsgFortifyLeader Term:2 Log:0/0 + 2->3 MsgFortifyLeader Term:2 Log:0/0 > 1 receiving messages 2->1 MsgApp Term:2 Log:1/12 Commit:12 Entries:[2/13 EntryNormal ""] + 2->1 MsgFortifyLeader Term:2 Log:0/0 > 3 receiving messages 2->3 MsgApp Term:2 Log:1/12 Commit:12 Entries:[2/13 EntryNormal ""] + 2->3 MsgFortifyLeader Term:2 Log:0/0 > 1 handling Ready Ready MustSync=true: HardState Term:2 Vote:2 Commit:12 Lead:2 Entries: 2/13 EntryNormal "" Messages: + 1->2 MsgFortifyLeaderResp Term:2 Log:0/0 Rejected (Hint: 0) 1->2 MsgAppResp Term:2 Log:0/13 > 3 handling Ready Ready MustSync=true: @@ -232,9 +237,12 @@ stabilize Entries: 2/13 EntryNormal "" Messages: + 3->2 MsgFortifyLeaderResp Term:2 Log:0/0 Rejected (Hint: 0) 3->2 MsgAppResp Term:2 Log:0/13 > 2 receiving messages + 1->2 MsgFortifyLeaderResp Term:2 Log:0/0 Rejected (Hint: 0) 1->2 MsgAppResp Term:2 Log:0/13 + 3->2 MsgFortifyLeaderResp Term:2 Log:0/0 Rejected (Hint: 0) 3->2 MsgAppResp Term:2 Log:0/13 > 2 handling Ready Ready MustSync=false: diff --git a/pkg/raft/testdata/prevote_checkquorum.txt b/pkg/raft/testdata/prevote_checkquorum.txt index 7a5a7c86a339..94c2cebd7a57 100644 --- a/pkg/raft/testdata/prevote_checkquorum.txt +++ b/pkg/raft/testdata/prevote_checkquorum.txt @@ -137,12 +137,16 @@ stabilize Messages: 3->1 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] 3->2 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] + 3->1 MsgFortifyLeader Term:2 Log:0/0 + 3->2 MsgFortifyLeader Term:2 Log:0/0 > 1 receiving messages 3->1 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] INFO 1 [term: 1] received a MsgApp message with higher term from 3 [term: 2] INFO 1 became follower at term 2 + 3->1 MsgFortifyLeader Term:2 Log:0/0 > 2 receiving messages 3->2 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] + 3->2 MsgFortifyLeader Term:2 Log:0/0 > 1 handling Ready Ready MustSync=true: State:StateFollower @@ -150,6 +154,7 @@ stabilize Entries: 2/12 EntryNormal "" Messages: + 1->3 MsgFortifyLeaderResp Term:2 Log:0/0 Rejected (Hint: 0) 1->3 MsgAppResp Term:2 Log:0/12 > 2 handling Ready Ready MustSync=true: @@ -157,9 +162,12 @@ stabilize Entries: 2/12 EntryNormal "" Messages: + 2->3 MsgFortifyLeaderResp Term:2 Log:0/0 Rejected (Hint: 0) 2->3 MsgAppResp Term:2 Log:0/12 > 3 receiving messages + 1->3 MsgFortifyLeaderResp Term:2 Log:0/0 Rejected (Hint: 0) 1->3 MsgAppResp Term:2 Log:0/12 + 2->3 MsgFortifyLeaderResp Term:2 Log:0/0 Rejected (Hint: 0) 2->3 MsgAppResp Term:2 Log:0/12 > 3 handling Ready Ready MustSync=false: @@ -296,18 +304,23 @@ stabilize Messages: 2->1 MsgApp Term:3 Log:2/12 Commit:12 Entries:[3/13 EntryNormal ""] 2->3 MsgApp Term:3 Log:2/12 Commit:12 Entries:[3/13 EntryNormal ""] + 2->1 MsgFortifyLeader Term:3 Log:0/0 + 2->3 MsgFortifyLeader Term:3 Log:0/0 > 1 receiving messages 2->1 MsgApp Term:3 Log:2/12 Commit:12 Entries:[3/13 EntryNormal ""] + 2->1 MsgFortifyLeader Term:3 Log:0/0 > 3 receiving messages 2->3 MsgApp Term:3 Log:2/12 Commit:12 Entries:[3/13 EntryNormal ""] INFO 3 [term: 2] received a MsgApp message with higher term from 2 [term: 3] INFO 3 became follower at term 3 + 2->3 MsgFortifyLeader Term:3 Log:0/0 > 1 handling Ready Ready MustSync=true: HardState Term:3 Vote:2 Commit:12 Lead:2 Entries: 3/13 EntryNormal "" Messages: + 1->2 MsgFortifyLeaderResp Term:3 Log:0/0 Rejected (Hint: 0) 1->2 MsgAppResp Term:3 Log:0/13 > 3 handling Ready Ready MustSync=true: @@ -316,9 +329,12 @@ stabilize Entries: 3/13 EntryNormal "" Messages: + 3->2 MsgFortifyLeaderResp Term:3 Log:0/0 Rejected (Hint: 0) 3->2 MsgAppResp Term:3 Log:0/13 > 2 receiving messages + 1->2 MsgFortifyLeaderResp Term:3 Log:0/0 Rejected (Hint: 0) 1->2 MsgAppResp Term:3 Log:0/13 + 3->2 MsgFortifyLeaderResp Term:3 Log:0/0 Rejected (Hint: 0) 3->2 MsgAppResp Term:3 Log:0/13 > 2 handling Ready Ready MustSync=false: diff --git a/pkg/raft/testdata/probe_and_replicate.txt b/pkg/raft/testdata/probe_and_replicate.txt index a7c3b2a8265e..307f1ae7c39a 100644 --- a/pkg/raft/testdata/probe_and_replicate.txt +++ b/pkg/raft/testdata/probe_and_replicate.txt @@ -479,18 +479,27 @@ stabilize 1 1->5 MsgApp Term:8 Log:6/20 Commit:18 Entries:[8/21 EntryNormal ""] 1->6 MsgApp Term:8 Log:6/20 Commit:18 Entries:[8/21 EntryNormal ""] 1->7 MsgApp Term:8 Log:6/20 Commit:18 Entries:[8/21 EntryNormal ""] + 1->2 MsgFortifyLeader Term:8 Log:0/0 + 1->3 MsgFortifyLeader Term:8 Log:0/0 + 1->4 MsgFortifyLeader Term:8 Log:0/0 + 1->5 MsgFortifyLeader Term:8 Log:0/0 + 1->6 MsgFortifyLeader Term:8 Log:0/0 + 1->7 MsgFortifyLeader Term:8 Log:0/0 ## Recover each follower, one by one. stabilize 1 2 ---- > 2 receiving messages 1->2 MsgApp Term:8 Log:6/20 Commit:18 Entries:[8/21 EntryNormal ""] + 1->2 MsgFortifyLeader Term:8 Log:0/0 > 2 handling Ready Ready MustSync=true: HardState Term:8 Vote:1 Commit:18 Lead:1 Messages: + 2->1 MsgFortifyLeaderResp Term:8 Log:0/0 Rejected (Hint: 0) 2->1 MsgAppResp Term:8 Log:6/20 Rejected (Hint: 19) > 1 receiving messages + 2->1 MsgFortifyLeaderResp Term:8 Log:0/0 Rejected (Hint: 0) 2->1 MsgAppResp Term:8 Log:6/20 Rejected (Hint: 19) > 1 handling Ready Ready MustSync=false: @@ -518,12 +527,15 @@ stabilize 1 3 ---- > 3 receiving messages 1->3 MsgApp Term:8 Log:6/20 Commit:18 Entries:[8/21 EntryNormal ""] + 1->3 MsgFortifyLeader Term:8 Log:0/0 > 3 handling Ready Ready MustSync=true: HardState Term:8 Vote:1 Commit:14 Lead:1 Messages: + 3->1 MsgFortifyLeaderResp Term:8 Log:0/0 Rejected (Hint: 0) 3->1 MsgAppResp Term:8 Log:4/20 Rejected (Hint: 14) > 1 receiving messages + 3->1 MsgFortifyLeaderResp Term:8 Log:0/0 Rejected (Hint: 0) 3->1 MsgAppResp Term:8 Log:4/20 Rejected (Hint: 14) > 1 handling Ready Ready MustSync=false: @@ -574,14 +586,17 @@ stabilize 1 4 1->4 MsgApp Term:8 Log:6/20 Commit:18 Entries:[8/21 EntryNormal ""] INFO found conflict at index 21 [existing term: 6, conflicting term: 8] INFO replace the unstable entries from index 21 + 1->4 MsgFortifyLeader Term:8 Log:0/0 > 4 handling Ready Ready MustSync=true: HardState Term:8 Commit:18 Lead:1 Entries: 8/21 EntryNormal "" Messages: + 4->1 MsgFortifyLeaderResp Term:8 Log:0/0 Rejected (Hint: 0) 4->1 MsgAppResp Term:8 Log:0/21 > 1 receiving messages + 4->1 MsgFortifyLeaderResp Term:8 Log:0/0 Rejected (Hint: 0) 4->1 MsgAppResp Term:8 Log:0/21 > 1 handling Ready Ready MustSync=false: @@ -612,12 +627,15 @@ stabilize 1 5 ---- > 5 receiving messages 1->5 MsgApp Term:8 Log:6/20 Commit:18 Entries:[8/21 EntryNormal ""] + 1->5 MsgFortifyLeader Term:8 Log:0/0 > 5 handling Ready Ready MustSync=true: HardState Term:8 Commit:18 Lead:1 Messages: + 5->1 MsgFortifyLeaderResp Term:8 Log:0/0 Rejected (Hint: 0) 5->1 MsgAppResp Term:8 Log:6/20 Rejected (Hint: 18) > 1 receiving messages + 5->1 MsgFortifyLeaderResp Term:8 Log:0/0 Rejected (Hint: 0) 5->1 MsgAppResp Term:8 Log:6/20 Rejected (Hint: 18) > 1 handling Ready Ready MustSync=false: @@ -655,12 +673,15 @@ stabilize 1 6 ---- > 6 receiving messages 1->6 MsgApp Term:8 Log:6/20 Commit:18 Entries:[8/21 EntryNormal ""] + 1->6 MsgFortifyLeader Term:8 Log:0/0 > 6 handling Ready Ready MustSync=true: HardState Term:8 Vote:1 Commit:15 Lead:1 Messages: + 6->1 MsgFortifyLeaderResp Term:8 Log:0/0 Rejected (Hint: 0) 6->1 MsgAppResp Term:8 Log:4/20 Rejected (Hint: 17) > 1 receiving messages + 6->1 MsgFortifyLeaderResp Term:8 Log:0/0 Rejected (Hint: 0) 6->1 MsgAppResp Term:8 Log:4/20 Rejected (Hint: 17) > 1 handling Ready Ready MustSync=false: @@ -710,12 +731,15 @@ stabilize 1 7 ---- > 7 receiving messages 1->7 MsgApp Term:8 Log:6/20 Commit:18 Entries:[8/21 EntryNormal ""] + 1->7 MsgFortifyLeader Term:8 Log:0/0 > 7 handling Ready Ready MustSync=true: HardState Term:8 Vote:1 Commit:13 Lead:1 Messages: + 7->1 MsgFortifyLeaderResp Term:8 Log:0/0 Rejected (Hint: 0) 7->1 MsgAppResp Term:8 Log:3/20 Rejected (Hint: 20) > 1 receiving messages + 7->1 MsgFortifyLeaderResp Term:8 Log:0/0 Rejected (Hint: 0) 7->1 MsgAppResp Term:8 Log:3/20 Rejected (Hint: 20) > 1 handling Ready Ready MustSync=false: diff --git a/pkg/raft/testdata/snapshot_succeed_via_app_resp_behind.txt b/pkg/raft/testdata/snapshot_succeed_via_app_resp_behind.txt index 38ca5791fd68..825532d7e227 100644 --- a/pkg/raft/testdata/snapshot_succeed_via_app_resp_behind.txt +++ b/pkg/raft/testdata/snapshot_succeed_via_app_resp_behind.txt @@ -107,6 +107,7 @@ Messages: # 3 receives and applies the snapshot, but doesn't respond with MsgAppResp yet. deliver-msgs 3 ---- +1->3 MsgFortifyLeader Term:1 Log:0/0 1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false INFO log [committed=5, applied=5, applying=5, unstable.offset=6, unstable.offsetInProgress=6, len(unstable.Entries)=0] starts to restore snapshot [index: 11, term: 1] @@ -146,11 +147,13 @@ stabilize 3 HardState Term:1 Vote:1 Commit:11 Lead:1 Snapshot Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false Messages: + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0) 3->1 MsgAppResp Term:1 Log:0/11 stabilize 1 ---- > 1 receiving messages + 3->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0) 3->1 MsgAppResp Term:1 Log:0/11 DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=13 paused pendingSnap=12] > 1 handling Ready diff --git a/pkg/raft/util.go b/pkg/raft/util.go index 069d9f930e06..feaac15765f0 100644 --- a/pkg/raft/util.go +++ b/pkg/raft/util.go @@ -49,6 +49,7 @@ var isResponseMsg = [...]bool{ pb.MsgPreVoteResp: true, pb.MsgStorageAppendResp: true, pb.MsgStorageApplyResp: true, + pb.MsgFortifyLeaderResp: true, } func isMsgInArray(msgt pb.MessageType, arr []bool) bool { @@ -313,6 +314,14 @@ func assertConfStatesEquivalent(l Logger, cs1, cs2 pb.ConfState) { l.Panic(err) } +// Assert panics with the supplied message if the condition does not hold +// true. +func Assert(condition bool, msg string) { + if !condition { + panic(msg) + } +} + // extend appends vals to the given dst slice. It differs from the standard // slice append only in the way it allocates memory. If cap(dst) is not enough // for appending the values, precisely size len(dst)+len(vals) is allocated. diff --git a/pkg/raft/util_test.go b/pkg/raft/util_test.go index c66fcd260c44..bc5ad610b454 100644 --- a/pkg/raft/util_test.go +++ b/pkg/raft/util_test.go @@ -96,7 +96,8 @@ func TestIsLocalMsg(t *testing.T) { {pb.MsgStorageAppendResp, true}, {pb.MsgStorageApply, true}, {pb.MsgStorageApplyResp, true}, - {pb.MsgForgetLeader, false}, + {pb.MsgFortifyLeader, false}, + {pb.MsgFortifyLeaderResp, false}, } for _, tt := range tests { @@ -133,6 +134,8 @@ func TestIsResponseMsg(t *testing.T) { {pb.MsgStorageApply, false}, {pb.MsgStorageApplyResp, true}, {pb.MsgForgetLeader, false}, + {pb.MsgFortifyLeader, false}, + {pb.MsgFortifyLeaderResp, true}, } for i, tt := range tests {