Skip to content

Commit

Permalink
kvserver: introduce Allocator.ValidLeaseTargets()
Browse files Browse the repository at this point in the history
This commit is a minor refactor of the `Allocator.TransferLeaseTarget` logic in
order to make it more readable and, to abstract out a new exported `Allocator`
method called `ValidLeaseTargets()`.

The contract of `ValidLeaseTargets()` is as follows:
```
// ValidLeaseTargets returns a set of candidate stores that are suitable to be
// transferred a lease for the given range.
//
// - It excludes stores that are dead, or marked draining or suspect.
// - If the range has lease_preferences, and there are any non-draining,
// non-suspect nodes that match those preferences, it excludes stores that don't
// match those preferences.
// - It excludes replicas that may need snapshots. If replica calling this
// method is not the Raft leader (meaning that it doesn't know whether follower
// replicas need a snapshot or not), produces no results.

```

Previously, there were multiple places where we were performing the logic
that's encapsulated by `ValidLeaseTargets()`, which was a potential source of
bugs. This is an attempt to unify this logic in one place that's relatively
well-tested. This commit is only a refactor, and does not attempt to change any
behavior. As such, no existing tests have been changed, with the exception of a
subtest inside `TestAllocatorTransferLeaseTargetDraining`. See the comment over
that subtest to understand why the behavior change made by this patch is
desirable.

The next commit in this PR uses this method to fix (at least part of) cockroachdb#74691.

Release note: none
  • Loading branch information
aayushshah15 committed Apr 6, 2022
1 parent 60b250f commit a6a8d5c
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 110 deletions.
262 changes: 160 additions & 102 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1474,6 +1474,120 @@ func (a *Allocator) scorerOptionsForScatter() *scatterScorerOptions {
}
}

// ValidLeaseTargets returns a set of candidate stores that are suitable to be
// transferred a lease for the given range.
//
// - It excludes stores that are dead, or marked draining or suspect.
// - If the range has lease_preferences, and there are any non-draining,
// non-suspect nodes that match those preferences, it excludes stores that don't
// match those preferences.
// - It excludes replicas that may need snapshots. If replica calling this
// method is not the Raft leader (meaning that it doesn't know whether follower
// replicas need a snapshot or not), produces no results.
func (a *Allocator) ValidLeaseTargets(
ctx context.Context,
conf roachpb.SpanConfig,
existing []roachpb.ReplicaDescriptor,
leaseRepl interface {
RaftStatus() *raft.Status
StoreID() roachpb.StoreID
},
// excludeLeaseRepl dictates whether the result set can include the source
// replica.
excludeLeaseRepl bool,
) []roachpb.ReplicaDescriptor {
candidates := make([]roachpb.ReplicaDescriptor, 0, len(existing))
for i := range existing {
if existing[i].GetType() != roachpb.VOTER_FULL {
continue
}
// If we're not allowed to include the current replica, remove it from
// consideration here.
if existing[i].StoreID == leaseRepl.StoreID() && excludeLeaseRepl {
continue
}
candidates = append(candidates, existing[i])
}
candidates, _ = a.storePool.liveAndDeadReplicas(
candidates, false, /* includeSuspectAndDrainingStores */
)

if a.knobs == nil || !a.knobs.AllowLeaseTransfersToReplicasNeedingSnapshots {
// Only proceed with the lease transfer if we are also the raft leader (we
// already know we are the leaseholder at this point), and only consider
// replicas that are in `StateReplicate` as potential candidates.
//
// NB: The RaftStatus() only returns a non-empty and non-nil result on the
// Raft leader (since Raft followers do not track the progress of other
// replicas, only the leader does).
//
// NB: On every Raft tick, we try to ensure that leadership is collocated with
// leaseholdership (see
// Replica.maybeTransferRaftLeadershipToLeaseholderLocked()). This means that
// on a range that is not already borked (i.e. can accept writes), periods of
// leader/leaseholder misalignment should be ephemeral and rare. We choose to
// be pessimistic here and choose to bail on the lease transfer, as opposed to
// potentially transferring the lease to a replica that may be waiting for a
// snapshot (which will wedge the range until the replica applies that
// snapshot).
candidates = excludeReplicasInNeedOfSnapshots(ctx, leaseRepl.RaftStatus(), candidates)
}

// Determine which store(s) is preferred based on user-specified preferences.
// If any stores match, only consider those stores as candidates.
preferred := a.preferredLeaseholders(conf, candidates)
if len(preferred) > 0 {
candidates = preferred
}
return candidates
}

// leaseholderShouldMoveDueToPreferences returns true if the current leaseholder
// is in violation of lease preferences _that can otherwise be satisfied_ by
// some existing replica.
//
// INVARIANT: This method should only be called with an `allExistingReplicas`
// slice that contains `leaseRepl`.
func (a *Allocator) leaseholderShouldMoveDueToPreferences(
ctx context.Context,
conf roachpb.SpanConfig,
leaseRepl interface {
RaftStatus() *raft.Status
StoreID() roachpb.StoreID
},
allExistingReplicas []roachpb.ReplicaDescriptor,
) bool {
// Defensive check to ensure that this is never called with a replica set that
// does not contain the leaseholder.
var leaseholderInExisting bool
for _, repl := range allExistingReplicas {
if repl.StoreID == leaseRepl.StoreID() {
leaseholderInExisting = true
break
}
}
if !leaseholderInExisting {
log.Errorf(ctx, "programming error: expected leaseholder store to be in the slice of existing replicas")
}

// Exclude suspect/draining/dead stores.
candidates, _ := a.storePool.liveAndDeadReplicas(
allExistingReplicas, false, /* includeSuspectAndDrainingStores */
)
// If there are any replicas that do match lease preferences, then we check if
// the existing leaseholder is one of them.
preferred := a.preferredLeaseholders(conf, candidates)
if len(preferred) == 0 {
return false
}
for _, repl := range preferred {
if repl.StoreID == leaseRepl.StoreID() {
return false
}
}
return true
}

// TransferLeaseTarget returns a suitable replica to transfer the range lease
// to from the provided list. It excludes the current lease holder replica
// unless asked to do otherwise by the checkTransferLeaseSource parameter.
Expand Down Expand Up @@ -1501,83 +1615,27 @@ func (a *Allocator) TransferLeaseTarget(
forceDecisionWithoutStats bool,
opts transferLeaseOptions,
) roachpb.ReplicaDescriptor {
excludeLeaseRepl := !opts.checkTransferLeaseSource
if a.leaseholderShouldMoveDueToPreferences(ctx, conf, leaseRepl, existing) {
// Explicitly exclude the current leaseholder from the result set if it is
// in violation of lease preferences that can be satisfied by some other
// replica.
excludeLeaseRepl = true
}

allStoresList, _, _ := a.storePool.getStoreList(storeFilterNone)
storeDescMap := storeListToMap(allStoresList)

sl, _, _ := a.storePool.getStoreList(storeFilterSuspect)
sl = sl.excludeInvalid(conf.Constraints)
sl = sl.excludeInvalid(conf.VoterConstraints)

candidateLeasesMean := sl.candidateLeases.mean

source, ok := a.storePool.getStoreDescriptor(leaseRepl.StoreID())
if !ok {
return roachpb.ReplicaDescriptor{}
}

// Determine which store(s) is preferred based on user-specified preferences.
// If any stores match, only consider those stores as candidates. If only one
// store matches, it's where the lease should be (unless the preferred store
// is the current one and checkTransferLeaseSource is false).
var preferred []roachpb.ReplicaDescriptor
checkTransferLeaseSource := opts.checkTransferLeaseSource
if checkTransferLeaseSource {
preferred = a.preferredLeaseholders(conf, existing)
} else {
// TODO(a-robinson): Should we just always remove the source store from
// existing when checkTransferLeaseSource is false? I'd do it now, but
// it's too big a change to make right before a major release.
var candidates []roachpb.ReplicaDescriptor
for _, repl := range existing {
if repl.StoreID != leaseRepl.StoreID() {
candidates = append(candidates, repl)
}
}
preferred = a.preferredLeaseholders(conf, candidates)
}
if len(preferred) == 1 {
if preferred[0].StoreID == leaseRepl.StoreID() {
return roachpb.ReplicaDescriptor{}
}
// Verify that the preferred replica is eligible to receive the lease.
preferred, _ = a.storePool.liveAndDeadReplicas(preferred, false /* includeSuspectAndDrainingStores */)
if len(preferred) == 1 {
return preferred[0]
}
return roachpb.ReplicaDescriptor{}
} else if len(preferred) > 1 {
// If the current leaseholder is not preferred, set checkTransferLeaseSource
// to false to motivate the below logic to transfer the lease.
existing = preferred
if !storeHasReplica(leaseRepl.StoreID(), roachpb.MakeReplicaSet(preferred).ReplicationTargets()) {
checkTransferLeaseSource = false
}
}

// Only consider live, non-draining, non-suspect replicas.
existing, _ = a.storePool.liveAndDeadReplicas(existing, false /* includeSuspectAndDrainingStores */)

if a.knobs == nil || !a.knobs.AllowLeaseTransfersToReplicasNeedingSnapshots {
// Only proceed with the lease transfer if we are also the raft leader (we
// already know we are the leaseholder at this point), and only consider
// replicas that are in `StateReplicate` as potential candidates.
//
// NB: The RaftStatus() only returns a non-empty and non-nil result on the
// Raft leader (since Raft followers do not track the progress of other
// replicas, only the leader does).
//
// NB: On every Raft tick, we try to ensure that leadership is collocated with
// leaseholdership (see
// Replica.maybeTransferRaftLeadershipToLeaseholderLocked()). This means that
// on a range that is not already borked (i.e. can accept writes), periods of
// leader/leaseholder misalignment should be ephemeral and rare. We choose to
// be pessimistic here and choose to bail on the lease transfer, as opposed to
// potentially transferring the lease to a replica that may be waiting for a
// snapshot (which will wedge the range until the replica applies that
// snapshot).
existing = excludeReplicasInNeedOfSnapshots(ctx, leaseRepl.RaftStatus(), existing)
}

existing = a.ValidLeaseTargets(ctx, conf, existing, leaseRepl, excludeLeaseRepl)
// Short-circuit if there are no valid targets out there.
if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == leaseRepl.StoreID()) {
log.VEventf(ctx, 2, "no lease transfer target found for r%d", leaseRepl.GetRangeID())
Expand All @@ -1592,7 +1650,7 @@ func (a *Allocator) TransferLeaseTarget(
transferDec, repl := a.shouldTransferLeaseForAccessLocality(
ctx, source, existing, stats, nil, candidateLeasesMean,
)
if checkTransferLeaseSource {
if !excludeLeaseRepl {
switch transferDec {
case shouldNotTransfer:
if !forceDecisionWithoutStats {
Expand All @@ -1611,13 +1669,21 @@ func (a *Allocator) TransferLeaseTarget(
if repl != (roachpb.ReplicaDescriptor{}) {
return repl
}
// Fall back to logic that doesn't take request counts and latency into
// account if the counts/latency-based logic couldn't pick a best replica.
fallthrough

case leaseCountConvergence:
// Fall back to logic that doesn't take request counts and latency into
// account if the counts/latency-based logic couldn't pick a best replica.
candidates := make([]roachpb.ReplicaDescriptor, 0, len(existing))
// If we want to ignore the existing lease counts on replicas, just do a
// random transfer.
if !opts.checkCandidateFullness {
a.randGen.Lock()
defer a.randGen.Unlock()
return existing[a.randGen.Intn(len(existing))]
}

var bestOption roachpb.ReplicaDescriptor
candidates := make([]roachpb.ReplicaDescriptor, 0, len(existing))
bestOptionLeaseCount := int32(math.MaxInt32)
for _, repl := range existing {
if leaseRepl.StoreID() == repl.StoreID {
Expand All @@ -1627,18 +1693,19 @@ func (a *Allocator) TransferLeaseTarget(
if !ok {
continue
}
if !opts.checkCandidateFullness || float64(storeDesc.Capacity.LeaseCount) < candidateLeasesMean-0.5 {
if float64(storeDesc.Capacity.LeaseCount) < candidateLeasesMean-0.5 {
candidates = append(candidates, repl)
} else if storeDesc.Capacity.LeaseCount < bestOptionLeaseCount {
}
if storeDesc.Capacity.LeaseCount < bestOptionLeaseCount {
bestOption = repl
bestOptionLeaseCount = storeDesc.Capacity.LeaseCount
}
}
if len(candidates) == 0 {
// If we aren't supposed to be considering the current leaseholder (e.g.
// because we need to remove this replica for some reason), return
// our best option if we otherwise wouldn't want to do anything.
if !checkTransferLeaseSource {
// If there were no existing replicas on stores with less-than-mean
// leases, and we _must_ move the lease away (indicated by
// `opts.excludeLeaseRepl`), just return the best possible option.
if excludeLeaseRepl {
return bestOption
}
return roachpb.ReplicaDescriptor{}
Expand Down Expand Up @@ -1784,41 +1851,30 @@ func (a *Allocator) ShouldTransferLease(
ctx context.Context,
conf roachpb.SpanConfig,
existing []roachpb.ReplicaDescriptor,
leaseStoreID roachpb.StoreID,
leaseRepl interface {
RaftStatus() *raft.Status
StoreID() roachpb.StoreID
},
stats *replicaStats,
) bool {
source, ok := a.storePool.getStoreDescriptor(leaseStoreID)
if !ok {
return false
if a.leaseholderShouldMoveDueToPreferences(ctx, conf, leaseRepl, existing) {
return true
}
existing = a.ValidLeaseTargets(ctx, conf, existing, leaseRepl, false /* excludeLeaseRepl */)

// Determine which store(s) is preferred based on user-specified preferences.
// If any stores match, only consider those stores as options. If only one
// store matches, it's where the lease should be.
preferred := a.preferredLeaseholders(conf, existing)
if len(preferred) == 1 {
return preferred[0].StoreID != leaseStoreID
} else if len(preferred) > 1 {
existing = preferred
// If the current leaseholder isn't one of the preferred stores, then we
// should try to transfer the lease.
if !storeHasReplica(leaseStoreID, roachpb.MakeReplicaSet(existing).ReplicationTargets()) {
return true
}
// Short-circuit if there are no valid targets out there.
if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == leaseRepl.StoreID()) {
return false
}
source, ok := a.storePool.getStoreDescriptor(leaseRepl.StoreID())
if !ok {
return false
}

sl, _, _ := a.storePool.getStoreList(storeFilterSuspect)
sl = sl.excludeInvalid(conf.Constraints)
sl = sl.excludeInvalid(conf.VoterConstraints)
log.VEventf(ctx, 3, "ShouldTransferLease (lease-holder=%d):\n%s", leaseStoreID, sl)

// Only consider live, non-draining, non-suspect replicas.
existing, _ = a.storePool.liveAndDeadReplicas(existing, false /* includeSuspectNodes */)

// Short-circuit if there are no valid targets out there.
if len(existing) == 0 || (len(existing) == 1 && existing[0].StoreID == source.StoreID) {
return false
}
log.VEventf(ctx, 3, "ShouldTransferLease (lease-holder=%d):\n%s", leaseRepl, sl)

transferDec, _ := a.shouldTransferLeaseForAccessLocality(
ctx,
Expand All @@ -1840,7 +1896,9 @@ func (a *Allocator) ShouldTransferLease(
log.Fatalf(ctx, "unexpected transfer decision %d", transferDec)
}

log.VEventf(ctx, 3, "ShouldTransferLease decision (lease-holder=%d): %t", leaseStoreID, result)
log.VEventf(
ctx, 3, "ShouldTransferLease decision (lease-holder=s%d): %t", leaseRepl.StoreID(), result,
)
return result
}

Expand Down
Loading

0 comments on commit a6a8d5c

Please sign in to comment.