Skip to content

Commit

Permalink
kvserver: use defer unlock in sendRaftMessage
Browse files Browse the repository at this point in the history
Extract a method to enable the use of deferred lock release.

Release note: None

Part of: cockroachdb#105366
  • Loading branch information
andrewbaptist committed May 14, 2024
1 parent 6e1b82d commit 2a6e47e
Showing 1 changed file with 43 additions and 33 deletions.
76 changes: 43 additions & 33 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1795,39 +1795,10 @@ func (r *Replica) deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked(
// When calling this method, the raftMu may be held, but it does not need to be.
// The Replica mu must not be held.
func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) {
lastToReplica, lastFromReplica := r.getLastReplicaDescriptors()

r.mu.RLock()
fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.From), lastToReplica)
toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.To), lastFromReplica)
var startKey roachpb.RKey
if msg.Type == raftpb.MsgApp {
// When the follower is potentially an uninitialized replica waiting for
// a split trigger, send the replica's StartKey along. See the method
// below for more context:
_ = maybeDropMsgApp
// NB: this code is allocation free.
r.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, pr tracker.Progress) {
if id == msg.To && pr.State == tracker.StateProbe {
// It is moderately expensive to attach a full key to the message, but note that
// a probing follower will only be appended to once per heartbeat interval (i.e.
// on the order of seconds). See:
//
// https://github.com/etcd-io/etcd/blob/7f450bf6967638673dd88fd4e730b01d1303d5ff/raft/progress.go#L41
startKey = r.descRLocked().StartKey
}
})
}
r.mu.RUnlock()

if fromErr != nil {
log.Warningf(ctx, "failed to look up sender replica %d in r%d while sending %s: %s",
msg.From, r.RangeID, msg.Type, fromErr)
return
}
if toErr != nil {
log.Warningf(ctx, "failed to look up recipient replica %d in r%d while sending %s: %s",
msg.To, r.RangeID, msg.Type, toErr)
startKey, toReplica, fromReplica, err := r.lookupLatestDescriptors(msg.To, msg.From, msg.Type)
if err != nil {
log.Warningf(ctx, "failed to look up replica %d in r%d while sending %s: %s",
msg.From, r.RangeID, msg.Type, err)
return
}

Expand Down Expand Up @@ -1857,6 +1828,45 @@ func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) {
}
}

// lookupLatestDescriptors loads the latest descriptors for the given raft
// message. It can fall back to a prior descriptor as long as the replica id
// matches the one provided in the msg.
func (r *Replica) lookupLatestDescriptors(
To, From uint64, Type raftpb.MessageType,
) (roachpb.RKey, roachpb.ReplicaDescriptor, roachpb.ReplicaDescriptor, error) {
lastToReplica, lastFromReplica := r.getLastReplicaDescriptors()
r.mu.RLock()
defer r.mu.RUnlock()
toReplica, err := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(To), lastToReplica)
if err != nil {
return nil, roachpb.ReplicaDescriptor{}, roachpb.ReplicaDescriptor{}, err
}
fromReplica, err := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(From), lastFromReplica)
if err != nil {
return nil, roachpb.ReplicaDescriptor{}, roachpb.ReplicaDescriptor{}, err
}
var startKey roachpb.RKey
if Type == raftpb.MsgApp {
// When the follower is potentially an uninitialized replica waiting for
// a split trigger, send the replica's StartKey along. See the method
// below for more context:
_ = maybeDropMsgApp
// NB: this code is allocation free.
r.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, pr tracker.Progress) {
if id == To && pr.State == tracker.StateProbe {
// It is moderately expensive to attach a full key to the message, but note that
// a probing follower will only be appended to once per heartbeat interval (i.e.
// on the order of seconds). See:
//
// https://github.com/etcd-io/etcd/blob/7f450bf6967638673dd88fd4e730b01d1303d5ff/raft/progress.go#L41
startKey = r.Desc().StartKey
}
})
}

return startKey, toReplica, fromReplica, nil
}

// addUnreachableRemoteReplica adds the given remote ReplicaID to be reported
// as unreachable on the next tick.
func (r *Replica) addUnreachableRemoteReplica(remoteReplica roachpb.ReplicaID) {
Expand Down

0 comments on commit 2a6e47e

Please sign in to comment.