Skip to content

Commit

Permalink
raft: add no-op MsgFortifyLeader and MsgFortifyLeaderResp
Browse files Browse the repository at this point in the history
This commit adds two new messages to raft -- MsgFortifyLeader and
MsgFortifyLeaderResp. A candidate attempts to fortify its leadership
term after winning an election. It does so by broadcasting a
MsgFortifyLeader to all followers.

Currently, the handling of MsgFortify is a no-op; requests are trivially
rejected. In subsequent patches, we'll hook into store liveness and
correctly respond.

While here, we also add metrics for MsgFortify and MsgFortifyResp.

Informs #125261
Closes #124498

Release note: None
  • Loading branch information
arulajmani committed Aug 7, 2024
1 parent 48a23e8 commit 429519c
Show file tree
Hide file tree
Showing 16 changed files with 206 additions and 19 deletions.
38 changes: 26 additions & 12 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1606,6 +1606,18 @@ cache will already have moved on to newer entries.
Measurement: "Messages",
Unit: metric.Unit_COUNT,
}
metaRaftRcvdFortifyLeader = metric.Metadata{
Name: "raft.rcvd.fortifyleader",
Help: "Number of MsgFortifyLeader messages received by this store",
Measurement: "Messages",
Unit: metric.Unit_COUNT,
}
metaRaftRcvdFortifyLeaderResp = metric.Metadata{
Name: "raft.rcvd.fortifyleaderresp",
Help: "Number of MsgFortifyLeaderResp messages received by this store",
Measurement: "Messages",
Unit: metric.Unit_COUNT,
}
metaRaftRcvdDropped = metric.Metadata{
Name: "raft.rcvd.dropped",
Help: "Number of incoming Raft messages dropped (due to queue length or size)",
Expand Down Expand Up @@ -3522,18 +3534,20 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {

// Raft message metrics.
RaftRcvdMessages: [maxRaftMsgType + 1]*metric.Counter{
raftpb.MsgProp: metric.NewCounter(metaRaftRcvdProp),
raftpb.MsgApp: metric.NewCounter(metaRaftRcvdApp),
raftpb.MsgAppResp: metric.NewCounter(metaRaftRcvdAppResp),
raftpb.MsgVote: metric.NewCounter(metaRaftRcvdVote),
raftpb.MsgVoteResp: metric.NewCounter(metaRaftRcvdVoteResp),
raftpb.MsgPreVote: metric.NewCounter(metaRaftRcvdPreVote),
raftpb.MsgPreVoteResp: metric.NewCounter(metaRaftRcvdPreVoteResp),
raftpb.MsgSnap: metric.NewCounter(metaRaftRcvdSnap),
raftpb.MsgHeartbeat: metric.NewCounter(metaRaftRcvdHeartbeat),
raftpb.MsgHeartbeatResp: metric.NewCounter(metaRaftRcvdHeartbeatResp),
raftpb.MsgTransferLeader: metric.NewCounter(metaRaftRcvdTransferLeader),
raftpb.MsgTimeoutNow: metric.NewCounter(metaRaftRcvdTimeoutNow),
raftpb.MsgProp: metric.NewCounter(metaRaftRcvdProp),
raftpb.MsgApp: metric.NewCounter(metaRaftRcvdApp),
raftpb.MsgAppResp: metric.NewCounter(metaRaftRcvdAppResp),
raftpb.MsgVote: metric.NewCounter(metaRaftRcvdVote),
raftpb.MsgVoteResp: metric.NewCounter(metaRaftRcvdVoteResp),
raftpb.MsgPreVote: metric.NewCounter(metaRaftRcvdPreVote),
raftpb.MsgPreVoteResp: metric.NewCounter(metaRaftRcvdPreVoteResp),
raftpb.MsgSnap: metric.NewCounter(metaRaftRcvdSnap),
raftpb.MsgHeartbeat: metric.NewCounter(metaRaftRcvdHeartbeat),
raftpb.MsgHeartbeatResp: metric.NewCounter(metaRaftRcvdHeartbeatResp),
raftpb.MsgTransferLeader: metric.NewCounter(metaRaftRcvdTransferLeader),
raftpb.MsgTimeoutNow: metric.NewCounter(metaRaftRcvdTimeoutNow),
raftpb.MsgFortifyLeader: metric.NewCounter(metaRaftRcvdFortifyLeader),
raftpb.MsgFortifyLeaderResp: metric.NewCounter(metaRaftRcvdFortifyLeaderResp),
},
RaftRcvdDropped: metric.NewCounter(metaRaftRcvdDropped),
RaftRcvdDroppedBytes: metric.NewCounter(metaRaftRcvdDroppedBytes),
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

// maxRaftMsgType is the maximum value in the raft.MessageType enum.
const maxRaftMsgType = raftpb.MsgForgetLeader
const maxRaftMsgType = raftpb.MsgFortifyLeaderResp

func init() {
for v := range raftpb.MessageType_name {
Expand Down
46 changes: 46 additions & 0 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,10 @@ type raft struct {

// the leader id
lead pb.PeerID
// leadEpoch, if set, corresponds to the StoreLiveness epoch that this peer
// has supported the leader in. It's unset if the peer hasn't supported the
// current leader.
//
// TODO(arul): This should be populated when responding to a MsgFortify.
leadEpoch pb.Epoch
// leadTransferee is id of the leader transfer target when its value is not zero.
Expand Down Expand Up @@ -669,6 +673,17 @@ func (r *raft) sendHeartbeat(to pb.PeerID) {
pr.SentCommit(commit)
}

// sendFortify sends a fortification RPC to the given peer.
func (r *raft) sendFortify(to pb.PeerID) {
if to == r.id {
// We handle the case where the leader is trying to fortify itself specially.
// Doing so avoids a self-addressed message.
// TODO(arul): do this handling.
return
}
r.send(pb.Message{To: to, Type: pb.MsgFortifyLeader})
}

// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.trk.
func (r *raft) bcastAppend() {
Expand All @@ -690,6 +705,15 @@ func (r *raft) bcastHeartbeat() {
})
}

// bcastFortify sends an RPC to fortify the leader to all peers (including the leader itself).
func (r *raft) bcastFortify() {
assertTrue(r.state == StateLeader, "only leaders can fortify")

r.trk.Visit(func(id pb.PeerID, _ *tracker.Progress) {
r.sendFortify(id)
})
}

func (r *raft) appliedTo(index uint64, size entryEncodingSize) {
oldApplied := r.raftLog.applied
newApplied := max(index, oldApplied)
Expand Down Expand Up @@ -1326,6 +1350,8 @@ func stepLeader(r *raft, m pb.Message) error {

case pb.MsgForgetLeader:
return nil // noop on leader
case pb.MsgFortifyLeaderResp:
r.handleFortifyResp(m)
}

// All other message types require a progress for m.From (pr).
Expand Down Expand Up @@ -1610,6 +1636,9 @@ func stepCandidate(r *raft, m pb.Message) error {
case pb.MsgSnap:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleSnapshot(m)
case pb.MsgFortifyLeader:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleFortify(m)
case myVoteRespType:
gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
Expand All @@ -1619,6 +1648,7 @@ func stepCandidate(r *raft, m pb.Message) error {
r.campaign(campaignElection)
} else {
r.becomeLeader()
r.bcastFortify()
r.bcastAppend()
}
case quorum.VoteLost:
Expand Down Expand Up @@ -1664,6 +1694,8 @@ func stepFollower(r *raft, m pb.Message) error {
r.electionElapsed = 0
r.lead = m.From
r.handleSnapshot(m)
case pb.MsgFortifyLeader:
r.handleFortify(m)
case pb.MsgTransferLeader:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
Expand Down Expand Up @@ -1829,6 +1861,20 @@ func (r *raft) handleSnapshot(m pb.Message) {
}
}

func (r *raft) handleFortify(m pb.Message) {
// TODO(arul): currently a no-op; implement.
r.send(pb.Message{
To: m.From,
Type: pb.MsgFortifyLeaderResp,
Reject: true,
})
}

func (r *raft) handleFortifyResp(m pb.Message) {
assertTrue(r.state == StateLeader, "only leaders should be handling fortification responses")
assertTrue(m.Reject, "TODO(arul): implement")
}

// restore recovers the state machine from a snapshot. It restores the log and the
// configuration of state machine. If this method returns false, the snapshot was
// ignored, either because it was obsolete or because of an error.
Expand Down
2 changes: 2 additions & 0 deletions pkg/raft/raftpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ enum MessageType {
MsgStorageApply = 21;
MsgStorageApplyResp = 22;
MsgForgetLeader = 23;
MsgFortifyLeader = 24;
MsgFortifyLeaderResp = 25;
// NOTE: when adding new message types, remember to update the isLocalMsg and
// isResponseMsg arrays in raft/util.go and update the corresponding tests in
// raft/util_test.go.
Expand Down
8 changes: 8 additions & 0 deletions pkg/raft/testdata/async_storage_writes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,19 @@ stabilize
Entries:
1/11 EntryNormal ""
Messages:
1->2 MsgFortifyLeader Term:1 Log:0/0
1->3 MsgFortifyLeader Term:1 Log:0/0
1->2 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""]
1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""]
1->AppendThread MsgStorageAppend Term:1 Log:1/11 Commit:10 Vote:1 Lead:1 Entries:[1/11 EntryNormal ""] Responses:[
1->1 MsgAppResp Term:1 Log:0/11
AppendThread->1 MsgStorageAppendResp Term:0 Log:1/11
]
> 2 receiving messages
1->2 MsgFortifyLeader Term:1 Log:0/0
1->2 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""]
> 3 receiving messages
1->3 MsgFortifyLeader Term:1 Log:0/0
1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""]
> 1 processing append thread
Processing:
Expand All @@ -110,6 +114,7 @@ stabilize
Entries:
1/11 EntryNormal ""
Messages:
2->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0)
2->AppendThread MsgStorageAppend Term:1 Log:1/11 Commit:10 Vote:1 Lead:1 Entries:[1/11 EntryNormal ""] Responses:[
2->1 MsgAppResp Term:1 Log:0/11
AppendThread->2 MsgStorageAppendResp Term:0 Log:1/11
Expand All @@ -120,13 +125,16 @@ stabilize
Entries:
1/11 EntryNormal ""
Messages:
3->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0)
3->AppendThread MsgStorageAppend Term:1 Log:1/11 Commit:10 Vote:1 Lead:1 Entries:[1/11 EntryNormal ""] Responses:[
3->1 MsgAppResp Term:1 Log:0/11
AppendThread->3 MsgStorageAppendResp Term:0 Log:1/11
]
> 1 receiving messages
1->1 MsgAppResp Term:1 Log:0/11
AppendThread->1 MsgStorageAppendResp Term:0 Log:1/11
2->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0)
3->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0)
> 2 processing append thread
Processing:
2->AppendThread MsgStorageAppend Term:1 Log:1/11 Commit:10 Vote:1 Lead:1 Entries:[1/11 EntryNormal ""]
Expand Down
20 changes: 20 additions & 0 deletions pkg/raft/testdata/async_storage_writes_append_aba_race.txt
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ HardState Term:2 Vote:3 Commit:11 Lead:3
Entries:
2/12 EntryNormal ""
Messages:
3->1 MsgFortifyLeader Term:2 Log:0/0
3->2 MsgFortifyLeader Term:2 Log:0/0
3->4 MsgFortifyLeader Term:2 Log:0/0
3->5 MsgFortifyLeader Term:2 Log:0/0
3->6 MsgFortifyLeader Term:2 Log:0/0
3->7 MsgFortifyLeader Term:2 Log:0/0
3->1 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]
3->2 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]
3->4 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]
Expand All @@ -206,15 +212,21 @@ deliver-msgs 1 drop=(2,4,5,6,7)
INFO 1 [term: 1] received a MsgVote message with higher term from 3 [term: 2]
INFO 1 became follower at term 2
INFO 1 [logterm: 1, index: 12, vote: 0] rejected MsgVote from 3 [logterm: 1, index: 11] at term 2
3->1 MsgFortifyLeader Term:2 Log:0/0
3->1 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]
INFO found conflict at index 12 [existing term: 1, conflicting term: 2]
INFO replace the unstable entries from index 12
dropped: 3->2 MsgVote Term:2 Log:1/11
dropped: 3->2 MsgFortifyLeader Term:2 Log:0/0
dropped: 3->2 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]
dropped: 3->4 MsgFortifyLeader Term:2 Log:0/0
dropped: 3->4 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]
dropped: 3->5 MsgFortifyLeader Term:2 Log:0/0
dropped: 3->5 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]
dropped: 3->6 MsgFortifyLeader Term:2 Log:0/0
dropped: 3->6 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]
dropped: 3->7 MsgVote Term:2 Log:1/11
dropped: 3->7 MsgFortifyLeader Term:2 Log:0/0
dropped: 3->7 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""]

process-ready 1
Expand All @@ -224,6 +236,7 @@ HardState Term:2 Commit:11 Lead:3
Entries:
2/12 EntryNormal ""
Messages:
1->3 MsgFortifyLeaderResp Term:2 Log:0/0 Rejected (Hint: 0)
1->AppendThread MsgStorageAppend Term:2 Log:2/12 Commit:11 Lead:3 Entries:[2/12 EntryNormal ""] Responses:[
1->3 MsgVoteResp Term:2 Log:0/0 Rejected (Hint: 0)
1->3 MsgAppResp Term:2 Log:0/12
Expand Down Expand Up @@ -345,6 +358,12 @@ HardState Term:3 Vote:4 Commit:11 Lead:4
Entries:
3/12 EntryNormal ""
Messages:
4->1 MsgFortifyLeader Term:3 Log:0/0
4->2 MsgFortifyLeader Term:3 Log:0/0
4->3 MsgFortifyLeader Term:3 Log:0/0
4->5 MsgFortifyLeader Term:3 Log:0/0
4->6 MsgFortifyLeader Term:3 Log:0/0
4->7 MsgFortifyLeader Term:3 Log:0/0
4->1 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""]
4->2 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""]
4->3 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""]
Expand All @@ -365,6 +384,7 @@ Messages:
deliver-msgs drop=1
----
dropped: 4->1 MsgVote Term:3 Log:1/11
dropped: 4->1 MsgFortifyLeader Term:3 Log:0/0
dropped: 4->1 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""]

tick-heartbeat 4
Expand Down
8 changes: 8 additions & 0 deletions pkg/raft/testdata/campaign.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,28 +65,36 @@ stabilize
Entries:
1/3 EntryNormal ""
Messages:
1->2 MsgFortifyLeader Term:1 Log:0/0
1->3 MsgFortifyLeader Term:1 Log:0/0
1->2 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""]
1->3 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""]
> 2 receiving messages
1->2 MsgFortifyLeader Term:1 Log:0/0
1->2 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""]
> 3 receiving messages
1->3 MsgFortifyLeader Term:1 Log:0/0
1->3 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""]
> 2 handling Ready
Ready MustSync=true:
HardState Term:1 Vote:1 Commit:2 Lead:1
Entries:
1/3 EntryNormal ""
Messages:
2->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0)
2->1 MsgAppResp Term:1 Log:0/3
> 3 handling Ready
Ready MustSync=true:
HardState Term:1 Vote:1 Commit:2 Lead:1
Entries:
1/3 EntryNormal ""
Messages:
3->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0)
3->1 MsgAppResp Term:1 Log:0/3
> 1 receiving messages
2->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0)
2->1 MsgAppResp Term:1 Log:0/3
3->1 MsgFortifyLeaderResp Term:1 Log:0/0 Rejected (Hint: 0)
3->1 MsgAppResp Term:1 Log:0/3
> 1 handling Ready
Ready MustSync=false:
Expand Down
5 changes: 5 additions & 0 deletions pkg/raft/testdata/campaign_learner_must_vote.txt
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,22 @@ stabilize 2 3
Entries:
2/5 EntryNormal ""
Messages:
2->1 MsgFortifyLeader Term:2 Log:0/0
2->3 MsgFortifyLeader Term:2 Log:0/0
2->1 MsgApp Term:2 Log:1/4 Commit:4 Entries:[2/5 EntryNormal ""]
2->3 MsgApp Term:2 Log:1/4 Commit:4 Entries:[2/5 EntryNormal ""]
> 3 receiving messages
2->3 MsgFortifyLeader Term:2 Log:0/0
2->3 MsgApp Term:2 Log:1/4 Commit:4 Entries:[2/5 EntryNormal ""]
DEBUG 3 [logterm: 0, index: 4] rejected MsgApp [logterm: 1, index: 4] from 2
> 3 handling Ready
Ready MustSync=true:
HardState Term:2 Vote:2 Commit:3 Lead:2
Messages:
3->2 MsgFortifyLeaderResp Term:2 Log:0/0 Rejected (Hint: 0)
3->2 MsgAppResp Term:2 Log:1/4 Rejected (Hint: 3)
> 2 receiving messages
3->2 MsgFortifyLeaderResp Term:2 Log:0/0 Rejected (Hint: 0)
3->2 MsgAppResp Term:2 Log:1/4 Rejected (Hint: 3)
DEBUG 2 received MsgAppResp(rejected, hint: (index 3, term 1)) from 3 for index 4
DEBUG 2 decreased progress of 3 to [StateProbe match=0 next=4]
Expand Down
12 changes: 10 additions & 2 deletions pkg/raft/testdata/checkquorum.txt
Original file line number Diff line number Diff line change
Expand Up @@ -180,30 +180,38 @@ stabilize
Entries:
3/12 EntryNormal ""
Messages:
2->1 MsgFortifyLeader Term:3 Log:0/0
2->3 MsgFortifyLeader Term:3 Log:0/0
2->1 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""]
2->3 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""]
> 1 receiving messages
2->1 MsgFortifyLeader Term:3 Log:0/0
2->1 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""]
> 3 receiving messages
2->3 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""]
INFO 3 [term: 1] received a MsgApp message with higher term from 2 [term: 3]
2->3 MsgFortifyLeader Term:3 Log:0/0
INFO 3 [term: 1] received a MsgFortifyLeader message with higher term from 2 [term: 3]
INFO 3 became follower at term 3
2->3 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""]
> 1 handling Ready
Ready MustSync=true:
HardState Term:3 Vote:2 Commit:11 Lead:2
Entries:
3/12 EntryNormal ""
Messages:
1->2 MsgFortifyLeaderResp Term:3 Log:0/0 Rejected (Hint: 0)
1->2 MsgAppResp Term:3 Log:0/12
> 3 handling Ready
Ready MustSync=true:
HardState Term:3 Commit:11 Lead:2
Entries:
3/12 EntryNormal ""
Messages:
3->2 MsgFortifyLeaderResp Term:3 Log:0/0 Rejected (Hint: 0)
3->2 MsgAppResp Term:3 Log:0/12
> 2 receiving messages
1->2 MsgFortifyLeaderResp Term:3 Log:0/0 Rejected (Hint: 0)
1->2 MsgAppResp Term:3 Log:0/12
3->2 MsgFortifyLeaderResp Term:3 Log:0/0 Rejected (Hint: 0)
3->2 MsgAppResp Term:3 Log:0/12
> 2 handling Ready
Ready MustSync=false:
Expand Down
Loading

0 comments on commit 429519c

Please sign in to comment.