Skip to content

Commit

Permalink
kvserver: pass Desc and SpanConf through allocator
Browse files Browse the repository at this point in the history
Previously the different layers of the allocator would load the Desc and
SpanConf as needed, this had a risk of them changing between various
loads and could cause strange and hard to track down races. Now they are
loaded once and passed through all the layers.

Epic: none

Release note: None
  • Loading branch information
andrewbaptist committed Aug 8, 2023
1 parent 1382b26 commit 8478b85
Show file tree
Hide file tree
Showing 13 changed files with 154 additions and 102 deletions.
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1961,13 +1961,13 @@ func (a *Allocator) ScorerOptionsForScatter(ctx context.Context) *ScatterScorerO
func (a *Allocator) ValidLeaseTargets(
ctx context.Context,
storePool storepool.AllocatorStorePool,
desc *roachpb.RangeDescriptor,
conf roachpb.SpanConfig,
existing []roachpb.ReplicaDescriptor,
leaseRepl interface {
StoreID() roachpb.StoreID
RaftStatus() *raft.Status
GetFirstIndex() kvpb.RaftIndex
Desc() *roachpb.RangeDescriptor
},
opts allocator.TransferLeaseOptions,
) []roachpb.ReplicaDescriptor {
Expand Down Expand Up @@ -2014,9 +2014,8 @@ func (a *Allocator) ValidLeaseTargets(
// replica set, however are in the candidate list. Uninitialized
// replicas will always need a snapshot.
existingCandidates := []roachpb.ReplicaDescriptor{}
rangeDesc := leaseRepl.Desc()
for _, candidate := range candidates {
if _, ok := rangeDesc.GetReplicaDescriptor(candidate.StoreID); ok {
if _, ok := desc.GetReplicaDescriptor(candidate.StoreID); ok {
existingCandidates = append(existingCandidates, candidate)
} else {
validSnapshotCandidates = append(validSnapshotCandidates, candidate)
Expand All @@ -2027,7 +2026,7 @@ func (a *Allocator) ValidLeaseTargets(

status := leaseRepl.RaftStatus()
if a.knobs != nil && a.knobs.RaftStatusFn != nil {
status = a.knobs.RaftStatusFn(leaseRepl)
status = a.knobs.RaftStatusFn(desc, leaseRepl.StoreID())
}

candidates = append(validSnapshotCandidates, excludeReplicasInNeedOfSnapshots(
Expand Down Expand Up @@ -2220,14 +2219,14 @@ func (a *Allocator) IOOverloadOptions() IOOverloadOptions {
func (a *Allocator) TransferLeaseTarget(
ctx context.Context,
storePool storepool.AllocatorStorePool,
desc *roachpb.RangeDescriptor,
conf roachpb.SpanConfig,
existing []roachpb.ReplicaDescriptor,
leaseRepl interface {
StoreID() roachpb.StoreID
GetRangeID() roachpb.RangeID
RaftStatus() *raft.Status
GetFirstIndex() kvpb.RaftIndex
Desc() *roachpb.RangeDescriptor
},
usageInfo allocator.RangeUsageInfo,
forceDecisionWithoutStats bool,
Expand Down Expand Up @@ -2259,7 +2258,7 @@ func (a *Allocator) TransferLeaseTarget(
return roachpb.ReplicaDescriptor{}
}

validTargets := a.ValidLeaseTargets(ctx, storePool, conf, existing, leaseRepl, opts)
validTargets := a.ValidLeaseTargets(ctx, storePool, desc, conf, existing, leaseRepl, opts)

// Short-circuit if there are no valid targets out there.
if len(validTargets) == 0 || (len(validTargets) == 1 && validTargets[0].StoreID == leaseRepl.StoreID()) {
Expand Down Expand Up @@ -2494,13 +2493,13 @@ func getLoadDelta(
func (a *Allocator) ShouldTransferLease(
ctx context.Context,
storePool storepool.AllocatorStorePool,
desc *roachpb.RangeDescriptor,
conf roachpb.SpanConfig,
existing []roachpb.ReplicaDescriptor,
leaseRepl interface {
StoreID() roachpb.StoreID
RaftStatus() *raft.Status
GetFirstIndex() kvpb.RaftIndex
Desc() *roachpb.RangeDescriptor
},
usageInfo allocator.RangeUsageInfo,
) bool {
Expand All @@ -2510,6 +2509,7 @@ func (a *Allocator) ShouldTransferLease(
existing = a.ValidLeaseTargets(
ctx,
storePool,
desc,
conf,
existing,
leaseRepl,
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1967,6 +1967,7 @@ func TestAllocatorTransferLeaseTarget(t *testing.T) {
target := a.TransferLeaseTarget(
ctx,
sp,
&roachpb.RangeDescriptor{},
emptySpanConfig(),
c.existing,
&mockRepl{
Expand Down Expand Up @@ -2096,6 +2097,7 @@ func TestAllocatorTransferLeaseTargetIOOverloadCheck(t *testing.T) {
target := a.TransferLeaseTarget(
ctx,
sp,
&roachpb.RangeDescriptor{},
emptySpanConfig(),
existing,
&mockRepl{
Expand Down Expand Up @@ -2211,6 +2213,7 @@ func TestAllocatorTransferLeaseToReplicasNeedingSnapshot(t *testing.T) {
target := a.TransferLeaseTarget(
ctx,
sp,
&roachpb.RangeDescriptor{},
emptySpanConfig(),
c.existing,
repl,
Expand Down Expand Up @@ -2303,6 +2306,7 @@ func TestAllocatorTransferLeaseTargetConstraints(t *testing.T) {
target := a.TransferLeaseTarget(
context.Background(),
sp,
&roachpb.RangeDescriptor{},
c.conf,
c.existing,
&mockRepl{
Expand Down Expand Up @@ -2415,6 +2419,7 @@ func TestAllocatorTransferLeaseTargetDraining(t *testing.T) {
target := a.TransferLeaseTarget(
ctx,
storePool,
&roachpb.RangeDescriptor{},
c.conf,
c.existing,
&mockRepl{
Expand Down Expand Up @@ -2699,6 +2704,7 @@ func TestAllocatorShouldTransferLease(t *testing.T) {
result := a.ShouldTransferLease(
ctx,
sp,
&roachpb.RangeDescriptor{},
emptySpanConfig(),
c.existing,
&mockRepl{
Expand Down Expand Up @@ -2767,6 +2773,7 @@ func TestAllocatorShouldTransferLeaseDraining(t *testing.T) {
result := a.ShouldTransferLease(
ctx,
storePool,
&roachpb.RangeDescriptor{},
emptySpanConfig(),
c.existing,
&mockRepl{
Expand Down Expand Up @@ -2814,6 +2821,7 @@ func TestAllocatorShouldTransferSuspected(t *testing.T) {
result := a.ShouldTransferLease(
ctx,
storePool,
&roachpb.RangeDescriptor{},
emptySpanConfig(),
replicas(1, 2, 3),
&mockRepl{storeID: 2, replicationFactor: 3},
Expand Down Expand Up @@ -2955,6 +2963,7 @@ func TestAllocatorLeasePreferences(t *testing.T) {
result := a.ShouldTransferLease(
ctx,
sp,
&roachpb.RangeDescriptor{},
conf,
c.existing,
&mockRepl{
Expand All @@ -2970,6 +2979,7 @@ func TestAllocatorLeasePreferences(t *testing.T) {
target := a.TransferLeaseTarget(
ctx,
sp,
&roachpb.RangeDescriptor{},
conf,
c.existing,
&mockRepl{
Expand All @@ -2989,6 +2999,7 @@ func TestAllocatorLeasePreferences(t *testing.T) {
target = a.TransferLeaseTarget(
ctx,
sp,
&roachpb.RangeDescriptor{},
conf,
c.existing,
&mockRepl{
Expand Down Expand Up @@ -3082,6 +3093,7 @@ func TestAllocatorLeasePreferencesMultipleStoresPerLocality(t *testing.T) {
target := a.TransferLeaseTarget(
ctx,
sp,
&roachpb.RangeDescriptor{},
conf,
c.existing,
&mockRepl{
Expand All @@ -3102,6 +3114,7 @@ func TestAllocatorLeasePreferencesMultipleStoresPerLocality(t *testing.T) {
target = a.TransferLeaseTarget(
ctx,
sp,
&roachpb.RangeDescriptor{},
conf,
c.existing,
&mockRepl{
Expand Down Expand Up @@ -5732,6 +5745,7 @@ func TestAllocatorTransferLeaseTargetLoadBased(t *testing.T) {
target := a.TransferLeaseTarget(
ctx,
storePool,
&roachpb.RangeDescriptor{},
emptySpanConfig(),
existing,
&mockRepl{
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/allocator/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ type TestingKnobs struct {
// targets produced by the Allocator to include replicas that may be waiting
// for snapshots.
AllowLeaseTransfersToReplicasNeedingSnapshots bool
RaftStatusFn func(r interface {
Desc() *roachpb.RangeDescriptor
StoreID() roachpb.StoreID
}) *raft.Status
RaftStatusFn func(
desc *roachpb.RangeDescriptor,
storeID roachpb.StoreID,
) *raft.Status
// BlockTransferTarget can be used to block returning any transfer targets
// from TransferLeaseTarget.
BlockTransferTarget func() bool
Expand Down
Loading

0 comments on commit 8478b85

Please sign in to comment.