diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index a956398e5546..b507b1141138 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1795,39 +1795,10 @@ func (r *Replica) deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked( // When calling this method, the raftMu may be held, but it does not need to be. // The Replica mu must not be held. func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) { - lastToReplica, lastFromReplica := r.getLastReplicaDescriptors() - - r.mu.RLock() - fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.From), lastToReplica) - toReplica, toErr := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(msg.To), lastFromReplica) - var startKey roachpb.RKey - if msg.Type == raftpb.MsgApp { - // When the follower is potentially an uninitialized replica waiting for - // a split trigger, send the replica's StartKey along. See the method - // below for more context: - _ = maybeDropMsgApp - // NB: this code is allocation free. - r.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, pr tracker.Progress) { - if id == msg.To && pr.State == tracker.StateProbe { - // It is moderately expensive to attach a full key to the message, but note that - // a probing follower will only be appended to once per heartbeat interval (i.e. - // on the order of seconds). See: - // - // https://github.com/etcd-io/etcd/blob/7f450bf6967638673dd88fd4e730b01d1303d5ff/raft/progress.go#L41 - startKey = r.descRLocked().StartKey - } - }) - } - r.mu.RUnlock() - - if fromErr != nil { - log.Warningf(ctx, "failed to look up sender replica %d in r%d while sending %s: %s", - msg.From, r.RangeID, msg.Type, fromErr) - return - } - if toErr != nil { - log.Warningf(ctx, "failed to look up recipient replica %d in r%d while sending %s: %s", - msg.To, r.RangeID, msg.Type, toErr) + startKey, toReplica, fromReplica, err := r.lookupLatestDescriptors(msg.To, msg.From, msg.Type) + if err != nil { + log.Warningf(ctx, "failed to look up replica %d in r%d while sending %s: %s", + msg.From, r.RangeID, msg.Type, err) return } @@ -1857,6 +1828,45 @@ func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) { } } +// lookupLatestDescriptors loads the latest descriptors for the given raft +// message. It can fall back to a prior descriptor as long as the replica id +// matches the one provided in the msg. +func (r *Replica) lookupLatestDescriptors( + To, From uint64, Type raftpb.MessageType, +) (roachpb.RKey, roachpb.ReplicaDescriptor, roachpb.ReplicaDescriptor, error) { + lastToReplica, lastFromReplica := r.getLastReplicaDescriptors() + r.mu.RLock() + defer r.mu.RUnlock() + toReplica, err := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(To), lastToReplica) + if err != nil { + return nil, roachpb.ReplicaDescriptor{}, roachpb.ReplicaDescriptor{}, err + } + fromReplica, err := r.getReplicaDescriptorByIDRLocked(roachpb.ReplicaID(From), lastFromReplica) + if err != nil { + return nil, roachpb.ReplicaDescriptor{}, roachpb.ReplicaDescriptor{}, err + } + var startKey roachpb.RKey + if Type == raftpb.MsgApp { + // When the follower is potentially an uninitialized replica waiting for + // a split trigger, send the replica's StartKey along. See the method + // below for more context: + _ = maybeDropMsgApp + // NB: this code is allocation free. + r.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, pr tracker.Progress) { + if id == To && pr.State == tracker.StateProbe { + // It is moderately expensive to attach a full key to the message, but note that + // a probing follower will only be appended to once per heartbeat interval (i.e. + // on the order of seconds). See: + // + // https://github.com/etcd-io/etcd/blob/7f450bf6967638673dd88fd4e730b01d1303d5ff/raft/progress.go#L41 + startKey = r.Desc().StartKey + } + }) + } + + return startKey, toReplica, fromReplica, nil +} + // addUnreachableRemoteReplica adds the given remote ReplicaID to be reported // as unreachable on the next tick. func (r *Replica) addUnreachableRemoteReplica(remoteReplica roachpb.ReplicaID) {