diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index 49f503548c1e..bd820ace51b4 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -769,7 +769,11 @@ func (a *Allocator) TransferLeaseTarget( ctx context.Context, zone *zonepb.ZoneConfig, existing []roachpb.ReplicaDescriptor, - leaseStoreID roachpb.StoreID, + leaseRepl interface { + RaftStatus() *raft.Status + StoreID() roachpb.StoreID + GetRangeID() roachpb.RangeID + }, stats *replicaStats, checkTransferLeaseSource bool, checkCandidateFullness bool, @@ -801,7 +805,7 @@ func (a *Allocator) TransferLeaseTarget( } sl = makeStoreList(filteredDescs) - source, ok := a.storePool.getStoreDescriptor(leaseStoreID) + source, ok := a.storePool.getStoreDescriptor(leaseRepl.StoreID()) if !ok { return roachpb.ReplicaDescriptor{} } @@ -819,14 +823,14 @@ func (a *Allocator) TransferLeaseTarget( // it's too big a change to make right before a major release. var candidates []roachpb.ReplicaDescriptor for _, repl := range existing { - if repl.StoreID != leaseStoreID { + if repl.StoreID != leaseRepl.StoreID() { candidates = append(candidates, repl) } } preferred = a.preferredLeaseholders(zone, candidates) } if len(preferred) == 1 { - if preferred[0].StoreID == leaseStoreID { + if preferred[0].StoreID == leaseRepl.StoreID() { return roachpb.ReplicaDescriptor{} } // Verify that the preferred replica is eligible to receive the lease. @@ -839,7 +843,7 @@ func (a *Allocator) TransferLeaseTarget( // If the current leaseholder is not preferred, set checkTransferLeaseSource // to false to motivate the below logic to transfer the lease. existing = preferred - if !storeHasReplica(leaseStoreID, preferred) { + if !storeHasReplica(leaseRepl.StoreID(), preferred) { checkTransferLeaseSource = false } } @@ -847,9 +851,28 @@ func (a *Allocator) TransferLeaseTarget( // Only consider live, non-draining replicas. existing, _ = a.storePool.liveAndDeadReplicas(existing) + // Only proceed with the lease transfer if we are also the raft leader (we + // already know we are the leaseholder at this point), and only consider + // replicas that are in `StateReplicate` as potential candidates. + // + // NB: The RaftStatus() only returns a non-empty and non-nil result on the + // Raft leader (since Raft followers do not track the progress of other + // replicas, only the leader does). + // + // NB: On every Raft tick, we try to ensure that leadership is collocated with + // leaseholdership (see + // Replica.maybeTransferRaftLeadershipToLeaseholderLocked()). This means that + // on a range that is not already borked (i.e. can accept writes), periods of + // leader/leaseholder misalignment should be ephemeral and rare. We choose to + // be pessimistic here and choose to bail on the lease transfer, as opposed to + // potentially transferring the lease to a replica that may be waiting for a + // snapshot (which will wedge the range until the replica applies that + // snapshot). + existing = excludeReplicasInNeedOfSnapshots(ctx, leaseRepl.RaftStatus(), existing) + // Short-circuit if there are no valid targets out there. - if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == leaseStoreID) { - log.VEventf(ctx, 2, "no lease transfer target found") + if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == leaseRepl.StoreID()) { + log.VEventf(ctx, 2, "no lease transfer target found for r%d", leaseRepl.GetRangeID()) return roachpb.ReplicaDescriptor{} } @@ -886,7 +909,7 @@ func (a *Allocator) TransferLeaseTarget( var bestOption roachpb.ReplicaDescriptor bestOptionLeaseCount := int32(math.MaxInt32) for _, repl := range existing { - if leaseStoreID == repl.StoreID { + if leaseRepl.StoreID() == repl.StoreID { continue } storeDesc, ok := a.storePool.getStoreDescriptor(repl.StoreID) @@ -1294,6 +1317,58 @@ func replicaIsBehind(raftStatus *raft.Status, replicaID roachpb.ReplicaID) bool return true } +// replicaMayNeedSnapshot determines whether the replica referred to by +// `replicaID` may be in need of a raft snapshot. If this function is called +// with an empty or nil `raftStatus` (as will be the case when its called by a +// replica that is not the raft leader), we pessimistically assume that +// `replicaID` may need a snapshot. +func replicaMayNeedSnapshot(raftStatus *raft.Status, replicaID roachpb.ReplicaID) bool { + if raftStatus == nil || len(raftStatus.Progress) == 0 { + return true + } + if progress, ok := raftStatus.Progress[uint64(replicaID)]; ok { + // We can only reasonably assume that the follower replica is not in need of + // a snapshot iff it is in `StateReplicate`. However, even this is racey + // because we can still possibly have an ill-timed log truncation between + // when we make this determination and when we act on it. + return progress.State != tracker.StateReplicate + } + return true +} + +// excludeReplicasInNeedOfSnapshots filters out the `replicas` that may be in +// need of a raft snapshot. If this function is called with the `raftStatus` of +// a non-raft leader replica, an empty slice is returned. +func excludeReplicasInNeedOfSnapshots( + ctx context.Context, raftStatus *raft.Status, replicas []roachpb.ReplicaDescriptor, +) []roachpb.ReplicaDescriptor { + if raftStatus == nil || len(raftStatus.Progress) == 0 { + log.VEventf( + ctx, + 5, + "raft leader not collocated with the leaseholder; will not produce any lease transfer targets", + ) + return []roachpb.ReplicaDescriptor{} + } + + filled := 0 + for _, repl := range replicas { + if replicaMayNeedSnapshot(raftStatus, repl.ReplicaID) { + log.VEventf( + ctx, + 5, + "not considering [n%d, s%d] as a potential candidate for a lease transfer"+ + " because the replica may be waiting for a snapshot", + repl.NodeID, repl.StoreID, + ) + continue + } + replicas[filled] = repl + filled++ + } + return replicas[:filled] +} + // simulateFilterUnremovableReplicas removes any unremovable replicas from the // supplied slice. Unlike filterUnremovableReplicas, brandNewReplicaID is // considered up-to-date (and thus can participate in quorum), but is not diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index 9e01b488e96d..07d563e0a2dd 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -1281,6 +1281,43 @@ func TestAllocatorRebalanceByCount(t *testing.T) { } } +// mockRepl satisfies the interface for the `leaseRepl` passed into +// `Allocator.TransferLeaseTarget()` for these tests. +type mockRepl struct { + replicationFactor int32 + storeID roachpb.StoreID + replsInNeedOfSnapshot map[roachpb.ReplicaID]struct{} +} + +func (r *mockRepl) RaftStatus() *raft.Status { + raftStatus := &raft.Status{ + Progress: make(map[uint64]tracker.Progress), + } + for i := int32(1); i <= r.replicationFactor; i++ { + state := tracker.StateReplicate + if _, ok := r.replsInNeedOfSnapshot[roachpb.ReplicaID(i)]; ok { + state = tracker.StateSnapshot + } + raftStatus.Progress[uint64(i)] = tracker.Progress{State: state} + } + return raftStatus +} + +func (r *mockRepl) StoreID() roachpb.StoreID { + return r.storeID +} + +func (r *mockRepl) GetRangeID() roachpb.RangeID { + return roachpb.RangeID(0) +} + +func (r *mockRepl) markReplAsNeedingSnapshot(id roachpb.ReplicaID) { + if r.replsInNeedOfSnapshot == nil { + r.replsInNeedOfSnapshot = make(map[roachpb.ReplicaID]struct{}) + } + r.replsInNeedOfSnapshot[id] = struct{}{} +} + func TestAllocatorTransferLeaseTarget(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1301,9 +1338,9 @@ func TestAllocatorTransferLeaseTarget(t *testing.T) { sg.GossipStores(stores, t) existing := []roachpb.ReplicaDescriptor{ - {StoreID: 1}, - {StoreID: 2}, - {StoreID: 3}, + {StoreID: 1, ReplicaID: 1}, + {StoreID: 2, ReplicaID: 2}, + {StoreID: 3, ReplicaID: 3}, } // TODO(peter): Add test cases for non-empty constraints. @@ -1331,7 +1368,10 @@ func TestAllocatorTransferLeaseTarget(t *testing.T) { context.Background(), zonepb.EmptyCompleteZoneConfig(), c.existing, - c.leaseholder, + &mockRepl{ + replicationFactor: 3, + storeID: c.leaseholder, + }, nil, /* replicaStats */ c.check, true, /* checkCandidateFullness */ @@ -1344,6 +1384,116 @@ func TestAllocatorTransferLeaseTarget(t *testing.T) { } } +func TestAllocatorTransferLeaseToReplicasNeedingSnapshot(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + existing := []roachpb.ReplicaDescriptor{ + {StoreID: 1, NodeID: 1, ReplicaID: 1}, + {StoreID: 2, NodeID: 2, ReplicaID: 2}, + {StoreID: 3, NodeID: 3, ReplicaID: 3}, + {StoreID: 4, NodeID: 4, ReplicaID: 4}, + } + stopper, g, _, a, _ := createTestAllocator(10, true /* deterministic */) + defer stopper.Stop(context.Background()) + + // 4 stores where the lease count for each store is equal to 10x the store + // ID. + var stores []*roachpb.StoreDescriptor + for i := 1; i <= 4; i++ { + stores = append(stores, &roachpb.StoreDescriptor{ + StoreID: roachpb.StoreID(i), + Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)}, + Capacity: roachpb.StoreCapacity{LeaseCount: int32(10 * i)}, + }) + } + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(stores, t) + + testCases := []struct { + existing []roachpb.ReplicaDescriptor + replsNeedingSnaps []roachpb.ReplicaID + leaseholder roachpb.StoreID + checkSource bool + transferTarget roachpb.StoreID + }{ + { + existing: existing, + replsNeedingSnaps: []roachpb.ReplicaID{1}, + leaseholder: 3, + checkSource: true, + transferTarget: 0, + }, + { + existing: existing, + replsNeedingSnaps: []roachpb.ReplicaID{1}, + leaseholder: 3, + checkSource: false, + transferTarget: 2, + }, + { + existing: existing, + replsNeedingSnaps: []roachpb.ReplicaID{1}, + leaseholder: 4, + checkSource: true, + transferTarget: 2, + }, + { + existing: existing, + replsNeedingSnaps: []roachpb.ReplicaID{1}, + leaseholder: 4, + checkSource: false, + transferTarget: 2, + }, + { + existing: existing, + replsNeedingSnaps: []roachpb.ReplicaID{1, 2}, + leaseholder: 4, + checkSource: false, + transferTarget: 3, + }, + { + existing: existing, + replsNeedingSnaps: []roachpb.ReplicaID{1, 2}, + leaseholder: 4, + checkSource: true, + transferTarget: 0, + }, + { + existing: existing, + replsNeedingSnaps: []roachpb.ReplicaID{1, 2, 3}, + leaseholder: 4, + checkSource: true, + transferTarget: 0, + }, + } + + for _, c := range testCases { + repl := &mockRepl{ + replicationFactor: 4, + storeID: c.leaseholder, + } + for _, r := range c.replsNeedingSnaps { + repl.markReplAsNeedingSnapshot(r) + } + t.Run("", func(t *testing.T) { + target := a.TransferLeaseTarget( + context.Background(), + zonepb.EmptyCompleteZoneConfig(), + c.existing, + repl, + nil, + c.checkSource, + true, /* checkCandidateFullness */ + false, /* alwaysAllowDecisionWithoutStats */ + ) + if c.transferTarget != target.StoreID { + t.Fatalf("expected %d, but found %d", c.transferTarget, target.StoreID) + } + }) + } +} + // TestAllocatorTransferLeaseTargetDraining verifies that the allocator will // not choose to transfer leases to a store that is draining. func TestAllocatorTransferLeaseTargetDraining(t *testing.T) { @@ -1392,9 +1542,9 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) { } existing := []roachpb.ReplicaDescriptor{ - {StoreID: 1}, - {StoreID: 2}, - {StoreID: 3}, + {StoreID: 1, ReplicaID: 1}, + {StoreID: 2, ReplicaID: 2}, + {StoreID: 3, ReplicaID: 3}, } testCases := []struct { @@ -1430,7 +1580,10 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) { context.Background(), c.zone, c.existing, - c.leaseholder, + &mockRepl{ + replicationFactor: 3, + storeID: c.leaseholder, + }, nil, /* replicaStats */ c.check, true, /* checkCandidateFullness */ @@ -1678,9 +1831,9 @@ func TestAllocatorTransferLeaseTargetMultiStore(t *testing.T) { sg.GossipStores(stores, t) existing := []roachpb.ReplicaDescriptor{ - {NodeID: 1, StoreID: 1}, - {NodeID: 2, StoreID: 3}, - {NodeID: 3, StoreID: 5}, + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 3, ReplicaID: 2}, + {NodeID: 3, StoreID: 5, ReplicaID: 3}, } testCases := []struct { @@ -1701,7 +1854,10 @@ func TestAllocatorTransferLeaseTargetMultiStore(t *testing.T) { context.Background(), zonepb.EmptyCompleteZoneConfig(), existing, - c.leaseholder, + &mockRepl{ + replicationFactor: 6, + storeID: c.leaseholder, + }, nil, /* replicaStats */ c.check, true, /* checkCandidateFullness */ @@ -1961,7 +2117,10 @@ func TestAllocatorLeasePreferences(t *testing.T) { context.Background(), zone, c.existing, - c.leaseholder, + &mockRepl{ + replicationFactor: 5, + storeID: c.leaseholder, + }, nil, /* replicaStats */ true, /* checkTransferLeaseSource */ true, /* checkCandidateFullness */ @@ -1974,7 +2133,10 @@ func TestAllocatorLeasePreferences(t *testing.T) { context.Background(), zone, c.existing, - c.leaseholder, + &mockRepl{ + replicationFactor: 5, + storeID: c.leaseholder, + }, nil, /* replicaStats */ false, /* checkTransferLeaseSource */ true, /* checkCandidateFullness */ @@ -2057,7 +2219,10 @@ func TestAllocatorLeasePreferencesMultipleStoresPerLocality(t *testing.T) { context.Background(), zone, c.existing, - c.leaseholder, + &mockRepl{ + replicationFactor: 6, + storeID: c.leaseholder, + }, nil, /* replicaStats */ true, /* checkTransferLeaseSource */ true, /* checkCandidateFullness */ @@ -2070,7 +2235,10 @@ func TestAllocatorLeasePreferencesMultipleStoresPerLocality(t *testing.T) { context.Background(), zone, c.existing, - c.leaseholder, + &mockRepl{ + replicationFactor: 6, + storeID: c.leaseholder, + }, nil, /* replicaStats */ false, /* checkTransferLeaseSource */ true, /* checkCandidateFullness */ @@ -3834,9 +4002,9 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) { } existing := []roachpb.ReplicaDescriptor{ - {NodeID: 1, StoreID: 1}, - {NodeID: 2, StoreID: 2}, - {NodeID: 3, StoreID: 3}, + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 3, StoreID: 3, ReplicaID: 3}, } testCases := []struct { @@ -3914,7 +4082,10 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) { context.Background(), zonepb.EmptyCompleteZoneConfig(), existing, - c.leaseholder, + &mockRepl{ + replicationFactor: 3, + storeID: c.leaseholder, + }, c.stats, c.check, true, /* checkCandidateFullness */ diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index f16a6e7bcf4d..5a91c515b957 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -961,7 +961,7 @@ func (rq *replicateQueue) findTargetAndTransferLease( ctx, zone, desc.Replicas().Voters(), - repl.store.StoreID(), + repl, repl.leaseholderStats, opts.checkTransferLeaseSource, opts.checkCandidateFullness,