Skip to content

Commit

Permalink
raft: add no-op MsgFortifyLeader and MsgFortifyLeaderResp
Browse files Browse the repository at this point in the history
This commit adds two new messages to raft -- MsgFortifyLeader and
MsgFortifyLeaderResp. A candidate attempts to fortify its leadership
term after winning an election. It does so by broadcasting a
MsgFortifyLeader to all followers. Fow now, the broadcast is a no-op;
we'll meaningfully implement it in an upcoming patch.

Informs cockroachdb#125261

Release note: None
  • Loading branch information
arulajmani committed Aug 8, 2024
1 parent 48a23e8 commit f5b6338
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

// maxRaftMsgType is the maximum value in the raft.MessageType enum.
const maxRaftMsgType = raftpb.MsgForgetLeader
const maxRaftMsgType = raftpb.MsgFortifyLeaderResp

func init() {
for v := range raftpb.MessageType_name {
Expand Down
34 changes: 34 additions & 0 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,10 @@ type raft struct {

// the leader id
lead pb.PeerID
// leadEpoch, if set, corresponds to the StoreLiveness epoch that this peer
// has supported the leader in. It's unset if the peer hasn't supported the
// current leader.
//
// TODO(arul): This should be populated when responding to a MsgFortify.
leadEpoch pb.Epoch
// leadTransferee is id of the leader transfer target when its value is not zero.
Expand Down Expand Up @@ -690,6 +694,14 @@ func (r *raft) bcastHeartbeat() {
})
}

// bcastFortify sends an RPC to fortify the leader to all peers (including the leader itself).
func (r *raft) bcastFortify() {
assertTrue(r.state == StateLeader, "only leaders can fortify")

// TODO(arul): this needs to be hooked up to a version check. For now, treat
// it as a no-op.
}

func (r *raft) appliedTo(index uint64, size entryEncodingSize) {
oldApplied := r.raftLog.applied
newApplied := max(index, oldApplied)
Expand Down Expand Up @@ -1326,6 +1338,8 @@ func stepLeader(r *raft, m pb.Message) error {

case pb.MsgForgetLeader:
return nil // noop on leader
case pb.MsgFortifyLeaderResp:
r.handleFortifyResp(m)
}

// All other message types require a progress for m.From (pr).
Expand Down Expand Up @@ -1610,6 +1624,9 @@ func stepCandidate(r *raft, m pb.Message) error {
case pb.MsgSnap:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleSnapshot(m)
case pb.MsgFortifyLeader:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleFortify(m)
case myVoteRespType:
gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
Expand All @@ -1619,6 +1636,7 @@ func stepCandidate(r *raft, m pb.Message) error {
r.campaign(campaignElection)
} else {
r.becomeLeader()
r.bcastFortify()
r.bcastAppend()
}
case quorum.VoteLost:
Expand Down Expand Up @@ -1664,6 +1682,8 @@ func stepFollower(r *raft, m pb.Message) error {
r.electionElapsed = 0
r.lead = m.From
r.handleSnapshot(m)
case pb.MsgFortifyLeader:
r.handleFortify(m)
case pb.MsgTransferLeader:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
Expand Down Expand Up @@ -1829,6 +1849,20 @@ func (r *raft) handleSnapshot(m pb.Message) {
}
}

func (r *raft) handleFortify(m pb.Message) {
// TODO(arul): currently a no-op; implement.
r.send(pb.Message{
To: m.From,
Type: pb.MsgFortifyLeaderResp,
Reject: true,
})
}

func (r *raft) handleFortifyResp(m pb.Message) {
assertTrue(r.state == StateLeader, "only leaders should be handling fortification responses")
assertTrue(m.Reject, "TODO(arul): implement")
}

// restore recovers the state machine from a snapshot. It restores the log and the
// configuration of state machine. If this method returns false, the snapshot was
// ignored, either because it was obsolete or because of an error.
Expand Down
2 changes: 2 additions & 0 deletions pkg/raft/raftpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ enum MessageType {
MsgStorageApply = 21;
MsgStorageApplyResp = 22;
MsgForgetLeader = 23;
MsgFortifyLeader = 24;
MsgFortifyLeaderResp = 25;
// NOTE: when adding new message types, remember to update the isLocalMsg and
// isResponseMsg arrays in raft/util.go and update the corresponding tests in
// raft/util_test.go.
Expand Down
9 changes: 9 additions & 0 deletions pkg/raft/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var isResponseMsg = [...]bool{
pb.MsgPreVoteResp: true,
pb.MsgStorageAppendResp: true,
pb.MsgStorageApplyResp: true,
pb.MsgFortifyLeaderResp: true,
}

func isMsgInArray(msgt pb.MessageType, arr []bool) bool {
Expand Down Expand Up @@ -313,6 +314,14 @@ func assertConfStatesEquivalent(l Logger, cs1, cs2 pb.ConfState) {
l.Panic(err)
}

// assertTrue panics with the supplied message if the condition does not hold
// true.
func assertTrue(condition bool, msg string) {
if !condition {
panic(msg)
}
}

// extend appends vals to the given dst slice. It differs from the standard
// slice append only in the way it allocates memory. If cap(dst) is not enough
// for appending the values, precisely size len(dst)+len(vals) is allocated.
Expand Down
4 changes: 4 additions & 0 deletions pkg/raft/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ func TestIsLocalMsg(t *testing.T) {
{pb.MsgStorageApply, true},
{pb.MsgStorageApplyResp, true},
{pb.MsgForgetLeader, false},
{pb.MsgFortifyLeader, false},
{pb.MsgFortifyLeaderResp, false},
}

for _, tt := range tests {
Expand Down Expand Up @@ -133,6 +135,8 @@ func TestIsResponseMsg(t *testing.T) {
{pb.MsgStorageApply, false},
{pb.MsgStorageApplyResp, true},
{pb.MsgForgetLeader, false},
{pb.MsgFortifyLeader, false},
{pb.MsgFortifyLeaderResp, true},
}

for i, tt := range tests {
Expand Down

0 comments on commit f5b6338

Please sign in to comment.