Skip to content

Commit

Permalink
kvserver: use withRaftGroup instead of withRaftGroupLocked
Browse files Browse the repository at this point in the history
Release note: None

Part of: cockroachdb#105366
  • Loading branch information
andrewbaptist committed May 14, 2024
1 parent 7c956de commit 5b69b25
Showing 1 changed file with 13 additions and 12 deletions.
25 changes: 13 additions & 12 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,15 +804,14 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
var softState *raft.SoftState
var outboundMsgs []raftpb.Message
var msgStorageAppend, msgStorageApply raftpb.Message
r.mu.Lock()
state := logstore.RaftState{ // used for append below
LastIndex: r.mu.lastIndexNotDurable,
LastTerm: r.mu.lastTermNotDurable,
ByteSize: r.mu.raftLogSize,
}
leaderID := r.mu.leaderID
lastLeaderID := leaderID
err := r.withRaftGroupLocked(func(raftGroup *raft.RawNode) (bool, error) {
var state logstore.RaftState
var leaderID, lastLeaderID roachpb.ReplicaID
var pausedFollowers map[roachpb.ReplicaID]struct{}
err := r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) {
state = r.createRaftStateLocked()
leaderID = r.mu.leaderID
lastLeaderID = leaderID
pausedFollowers = r.mu.pausedFollowers
r.deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked(ctx, raftGroup)

numFlushed, err := r.mu.proposalBuf.FlushLockedWithRaftGroup(ctx, raftGroup)
Expand All @@ -838,6 +837,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
asyncRd := makeAsyncReady(syncRd)
softState = asyncRd.SoftState
outboundMsgs, msgStorageAppend, msgStorageApply = splitLocalStorageMsgs(asyncRd.Messages)
r.mu.applyingEntries = hasMsg(msgStorageApply)
}
// We unquiesce if we have a Ready (= there's work to do). We also have
// to unquiesce if we just flushed some proposals but there isn't a
Expand All @@ -854,9 +854,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
unquiesceAndWakeLeader := hasReady || numFlushed > 0 || len(r.mu.proposals) > 0
return unquiesceAndWakeLeader, nil
})
r.mu.applyingEntries = hasMsg(msgStorageApply)
pausedFollowers := r.mu.pausedFollowers
r.mu.Unlock()
if errors.Is(err, errRemoved) {
// If we've been removed then just return.
return stats, nil
Expand Down Expand Up @@ -1176,6 +1173,10 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
func (r *Replica) createRaftState() logstore.RaftState {
r.mu.RLock()
defer r.mu.RUnlock()
return r.createRaftStateLocked()
}

func (r *Replica) createRaftStateLocked() logstore.RaftState {
return logstore.RaftState{
LastIndex: r.mu.lastIndexNotDurable,
LastTerm: r.mu.lastTermNotDurable,
Expand Down

0 comments on commit 5b69b25

Please sign in to comment.