diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index 8a3777749582..147e11958e70 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -823,7 +823,8 @@ func FilterReplicasForAction( storePool storepool.AllocatorStorePool, desc *roachpb.RangeDescriptor, action AllocatorAction, ) ( filteredVoters, filteredNonVoters []roachpb.ReplicaDescriptor, - isReplacement, nothingToDo bool, + replacing *roachpb.ReplicaDescriptor, + nothingToDo bool, err error, ) { voterReplicas, nonVoterReplicas, @@ -831,7 +832,8 @@ func FilterReplicasForAction( liveNonVoterReplicas, deadNonVoterReplicas := LiveAndDeadVoterAndNonVoterReplicas(storePool, desc) removeIdx := -1 - _, filteredVoters, filteredNonVoters, removeIdx, nothingToDo, err = DetermineReplicaToReplaceAndFilter( + var existing []roachpb.ReplicaDescriptor + existing, filteredVoters, filteredNonVoters, removeIdx, nothingToDo, err = DetermineReplicaToReplaceAndFilter( storePool, action, voterReplicas, nonVoterReplicas, @@ -839,7 +841,11 @@ func FilterReplicasForAction( liveNonVoterReplicas, deadNonVoterReplicas, ) - return filteredVoters, filteredNonVoters, removeIdx >= 0, nothingToDo, err + if removeIdx >= 0 { + replacing = &existing[removeIdx] + } + + return filteredVoters, filteredNonVoters, replacing, nothingToDo, err } // ComputeAction determines the exact operation needed to repair the @@ -1184,6 +1190,7 @@ func (a *Allocator) AllocateTarget( storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + replacing *roachpb.ReplicaDescriptor, replicaStatus ReplicaStatus, targetType TargetReplicaType, ) (roachpb.ReplicationTarget, string, error) { @@ -1201,6 +1208,16 @@ func (a *Allocator) AllocateTarget( selector = a.NewGoodCandidateSelector() } + // Only consider the effects of replacing a replica on constraint conformance + // during decommission. In the case that replicas are being replaced due to + // a store being dead but no remaining live stores meet all constraints, they + // should be considered of otherwise equal validity, with candidate ranking + // chosing the best of the available options. + var decommissioningReplica *roachpb.ReplicaDescriptor + if replicaStatus == Decommissioning { + decommissioningReplica = replacing + } + target, details := a.allocateTargetFromList( ctx, storePool, @@ -1208,6 +1225,7 @@ func (a *Allocator) AllocateTarget( conf, existingVoters, existingNonVoters, + decommissioningReplica, a.ScorerOptions(ctx), selector, // When allocating a *new* replica, we explicitly disregard nodes with any @@ -1277,7 +1295,7 @@ func (a *Allocator) CheckAvoidsFragileQuorum( roachpb.ReplicaDescriptor{NodeID: newTarget.NodeID, StoreID: newTarget.StoreID}, ) - _, _, err := a.AllocateVoter(ctx, storePool, conf, oldPlusNewReplicas, remainingLiveNonVoters, replicaStatus) + _, _, err := a.AllocateVoter(ctx, storePool, conf, oldPlusNewReplicas, remainingLiveNonVoters, nil /* replacing */, replicaStatus) return err } @@ -1292,9 +1310,10 @@ func (a *Allocator) AllocateVoter( storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + replacing *roachpb.ReplicaDescriptor, replicaStatus ReplicaStatus, ) (roachpb.ReplicationTarget, string, error) { - return a.AllocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replicaStatus, VoterTarget) + return a.AllocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replacing, replicaStatus, VoterTarget) } // AllocateNonVoter returns a suitable store for a new allocation of a @@ -1305,9 +1324,10 @@ func (a *Allocator) AllocateNonVoter( storePool storepool.AllocatorStorePool, conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + replacing *roachpb.ReplicaDescriptor, replicaStatus ReplicaStatus, ) (roachpb.ReplicationTarget, string, error) { - return a.AllocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replicaStatus, NonVoterTarget) + return a.AllocateTarget(ctx, storePool, conf, existingVoters, existingNonVoters, replacing, replicaStatus, NonVoterTarget) } // AllocateTargetFromList returns a suitable store for a new allocation of a @@ -1325,7 +1345,7 @@ func (a *Allocator) AllocateTargetFromList( targetType TargetReplicaType, ) (roachpb.ReplicationTarget, string) { return a.allocateTargetFromList(ctx, storePool, candidateStores, conf, existingVoters, - existingNonVoters, options, selector, allowMultipleReplsPerNode, targetType) + existingNonVoters, nil /* replacing */, options, selector, allowMultipleReplsPerNode, targetType) } func (a *Allocator) allocateTargetFromList( @@ -1334,12 +1354,16 @@ func (a *Allocator) allocateTargetFromList( candidateStores storepool.StoreList, conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + replacing *roachpb.ReplicaDescriptor, options ScorerOptions, selector CandidateSelector, allowMultipleReplsPerNode bool, targetType TargetReplicaType, ) (roachpb.ReplicationTarget, string) { existingReplicas := append(existingVoters, existingNonVoters...) + if replacing != nil { + existingReplicas = append(existingReplicas, *replacing) + } analyzedOverallConstraints := constraint.AnalyzeConstraints( storePool, existingReplicas, @@ -1353,15 +1377,40 @@ func (a *Allocator) allocateTargetFromList( conf.VoterConstraints, ) + var replacingStore roachpb.StoreDescriptor + var replacingStoreOK bool + if replacing != nil { + replacingStore, replacingStoreOK = storePool.GetStoreDescriptor(replacing.StoreID) + } + var constraintsChecker constraintsCheckFn switch t := targetType; t { case VoterTarget: - constraintsChecker = voterConstraintsCheckerForAllocation( - analyzedOverallConstraints, - analyzedVoterConstraints, - ) + // If we are replacing an existing replica, make sure we check the + // constraints to ensure we are not going from a state in which a + // constraint is satisfied to one in which we are not. In this case, we + // consider no candidates to be valid, as no sorting of replicas would lead + // to a satisfying candidate being selected. + if replacing != nil && replacingStoreOK { + constraintsChecker = voterConstraintsCheckerForReplace( + analyzedOverallConstraints, + analyzedVoterConstraints, + replacingStore, + ) + } else { + constraintsChecker = voterConstraintsCheckerForAllocation( + analyzedOverallConstraints, + analyzedVoterConstraints, + ) + } case NonVoterTarget: - constraintsChecker = nonVoterConstraintsCheckerForAllocation(analyzedOverallConstraints) + if replacing != nil && replacingStoreOK { + constraintsChecker = nonVoterConstraintsCheckerForReplace( + analyzedOverallConstraints, replacingStore, + ) + } else { + constraintsChecker = nonVoterConstraintsCheckerForAllocation(analyzedOverallConstraints) + } default: log.KvDistribution.Fatalf(ctx, "unsupported targetReplicaType: %v", t) } diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go index 1d5b07ef653a..3a5a56ef5381 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go @@ -975,6 +975,15 @@ func rankedCandidateListForAllocation( } constraintsOK, necessary := constraintsCheck(s) if !constraintsOK { + if necessary { + log.KvDistribution.VEventf( + ctx, + 3, + "cannot allocate necessary %s on s%d", + targetType, + s.StoreID, + ) + } continue } @@ -1789,15 +1798,42 @@ func nonVoterConstraintsCheckerForRebalance( } } +// voterConstraintsCheckerForReplace returns a constraintsCheckFn +// that determines whether a given store is a valid and/or necessary replacement +// candidate for the given store of an existing voting replica. +func voterConstraintsCheckerForReplace( + overallConstraints, voterConstraints constraint.AnalyzedConstraints, + existingStore roachpb.StoreDescriptor, +) constraintsCheckFn { + return func(s roachpb.StoreDescriptor) (valid, necessary bool) { + overallConstraintsOK, necessaryOverall := replaceConstraintsCheck(s, existingStore, overallConstraints) + voterConstraintsOK, necessaryForVoters := replaceConstraintsCheck(s, existingStore, voterConstraints) + + return overallConstraintsOK && voterConstraintsOK, necessaryOverall || necessaryForVoters + } +} + +// nonVoterConstraintsCheckerForReplace returns a constraintsCheckFn +// that determines whether a given store is a valid and/or necessary replacement +// candidate for the given store of an existing non-voting replica. +func nonVoterConstraintsCheckerForReplace( + overallConstraints constraint.AnalyzedConstraints, existingStore roachpb.StoreDescriptor, +) constraintsCheckFn { + return func(s roachpb.StoreDescriptor) (valid, necessary bool) { + return replaceConstraintsCheck(s, existingStore, overallConstraints) + } +} + // allocateConstraintsCheck checks the potential allocation target store // against all the constraints. If it matches a constraint at all, it's valid. // If it matches a constraint that is not already fully satisfied by existing // replicas, then it's necessary. // -// NB: This assumes that the sum of all constraints.NumReplicas is equal to -// configured number of replicas for the range, or that there's just one set of -// constraints with NumReplicas set to 0. This is meant to be enforced in the -// config package. +// NB: Formerly there was an assumption that the sum of all +// constraints.NumReplicas was equal to the configured number of replicas for +// the range, or that there was just one set of constraints with NumReplicas +// set to 0, however this is not enforced by the config package and this +// no longer holds, as we may have unconstrained replicas. func allocateConstraintsCheck( store roachpb.StoreDescriptor, analyzed constraint.AnalyzedConstraints, ) (valid bool, necessary bool) { @@ -1828,6 +1864,57 @@ func allocateConstraintsCheck( return valid, false } +// replaceConstraintsCheck checks the potential allocation target store +// for a replacement operation against all the constraints, including checking +// that the candidate store matches a constraint satisfied by the existing +// store. If it matches a constraint, it's valid. If it matches a constraint +// that is not already overly satisfied by existing replicas (other than the +// replacement), then it's necessary. If there are any necessary constraints +// that are not satisfied by the candidate when the existing store did satisfy +// that constraint, then the candidate is considered invalid entirely. +func replaceConstraintsCheck( + store, existingStore roachpb.StoreDescriptor, analyzed constraint.AnalyzedConstraints, +) (valid bool, necessary bool) { + // All stores are valid when there are no constraints. + if len(analyzed.Constraints) == 0 { + return true, false + } + + for i, constraints := range analyzed.Constraints { + matchingStores := analyzed.SatisfiedBy[i] + satisfiedByExistingStore := containsStore(matchingStores, existingStore.StoreID) + satisfiedByCandidateStore := constraint.ConjunctionsCheck( + store, constraints.Constraints, + ) + if satisfiedByCandidateStore { + valid = true + } + + // If the constraint is not already satisfied, it's necessary. + // Additionally, if the constraint is only just satisfied by the existing + // store being replaced, since that store is going away, the constraint is + // also marked as necessary. + if len(matchingStores) < int(constraints.NumReplicas) || + (len(matchingStores) == int(constraints.NumReplicas) && + satisfiedByExistingStore) { + necessary = true + } + + // Check if existing store matches a constraint that isn't overly satisfied. + // If so, then only replacing it with a satisfying store is valid to ensure + // that the constraint stays fully satisfied. + if necessary && satisfiedByExistingStore && !satisfiedByCandidateStore { + return false, necessary + } + } + + if analyzed.UnconstrainedReplicas { + valid = true + } + + return valid, necessary +} + // removeConstraintsCheck checks the existing store against the analyzed // constraints, determining whether it's valid (matches some constraint) and // necessary (matches some constraint that no other existing replica matches). @@ -1867,6 +1954,19 @@ func removeConstraintsCheck( // against the analyzed constraints, determining whether it's valid whether it // will be necessary if fromStoreID (an existing replica) is removed from the // range. +// +// NB: Formerly there was an assumption that the sum of all +// constraints.NumReplicas was equal to the configured number of replicas for +// the range, or that there was just one set of constraints with NumReplicas +// set to 0, however this is not enforced by the config package and this +// no longer holds, as we may have unconstrained replicas. +// +// Note that rebalance, while seemingly similar to replacement, is distinct +// because leaving the replica on the existing store is a valid option. +// Hence, when leaving the existing store (and using it to satisfy a particular +// constraint) is not a possibility such as in the case of a decommissioning or +// dead node, the specialized replacement check is required. +// See replaceConstraintsCheck(..). func rebalanceFromConstraintsCheck( store, fromStoreID roachpb.StoreDescriptor, analyzed constraint.AnalyzedConstraints, ) (valid bool, necessary bool) { @@ -1879,11 +1979,6 @@ func rebalanceFromConstraintsCheck( // all, it's valid. If it matches a constraint that is not already fully // satisfied by existing replicas or that is only fully satisfied because of // fromStoreID, then it's necessary. - // - // NB: This assumes that the sum of all constraints.NumReplicas is equal to - // configured number of replicas for the range, or that there's just one set - // of constraints with NumReplicas set to 0. This is meant to be enforced in - // the config package. for i, constraints := range analyzed.Constraints { if constraintsOK := constraint.ConjunctionsCheck( store, constraints.Constraints, diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go index 3977d8f2bb9a..9b6bb2f1c204 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go @@ -128,7 +128,7 @@ var sameDCStores = []*roachpb.StoreDescriptor{ Attrs: roachpb.Attributes{Attrs: []string{"ssd"}}, Node: roachpb.NodeDescriptor{ NodeID: 1, - Attrs: roachpb.Attributes{Attrs: []string{"a"}}, + Attrs: roachpb.Attributes{Attrs: []string{"a", "n1"}}, }, Capacity: roachpb.StoreCapacity{ Capacity: 200, @@ -141,7 +141,7 @@ var sameDCStores = []*roachpb.StoreDescriptor{ Attrs: roachpb.Attributes{Attrs: []string{"ssd"}}, Node: roachpb.NodeDescriptor{ NodeID: 2, - Attrs: roachpb.Attributes{Attrs: []string{"a"}}, + Attrs: roachpb.Attributes{Attrs: []string{"a", "n2"}}, }, Capacity: roachpb.StoreCapacity{ Capacity: 200, @@ -154,7 +154,7 @@ var sameDCStores = []*roachpb.StoreDescriptor{ Attrs: roachpb.Attributes{Attrs: []string{"hdd"}}, Node: roachpb.NodeDescriptor{ NodeID: 3, - Attrs: roachpb.Attributes{Attrs: []string{"a"}}, + Attrs: roachpb.Attributes{Attrs: []string{"a", "n3"}}, }, Capacity: roachpb.StoreCapacity{ Capacity: 200, @@ -167,7 +167,7 @@ var sameDCStores = []*roachpb.StoreDescriptor{ Attrs: roachpb.Attributes{Attrs: []string{"hdd"}}, Node: roachpb.NodeDescriptor{ NodeID: 4, - Attrs: roachpb.Attributes{Attrs: []string{"a"}}, + Attrs: roachpb.Attributes{Attrs: []string{"a", "n4"}}, }, Capacity: roachpb.StoreCapacity{ Capacity: 200, @@ -180,7 +180,7 @@ var sameDCStores = []*roachpb.StoreDescriptor{ Attrs: roachpb.Attributes{Attrs: []string{"mem"}}, Node: roachpb.NodeDescriptor{ NodeID: 5, - Attrs: roachpb.Attributes{Attrs: []string{"a"}}, + Attrs: roachpb.Attributes{Attrs: []string{"a", "n5"}}, }, Capacity: roachpb.StoreCapacity{ Capacity: 200, @@ -562,7 +562,7 @@ func TestAllocatorSimpleRetrieval(t *testing.T) { ctx, sp, simpleSpanConfig, - nil /* existingVoters */, nil, /* existingNonVoters */ + nil /* existingVoters */, nil /* existingNonVoters */, nil, /* replacing */ Dead, ) if err != nil { @@ -584,7 +584,7 @@ func TestAllocatorNoAvailableDisks(t *testing.T) { ctx, sp, simpleSpanConfig, - nil /* existingVoters */, nil, /* existingNonVoters */ + nil /* existingVoters */, nil /* existingNonVoters */, nil, /* replacing */ Dead, ) if !roachpb.Empty(result) { @@ -703,6 +703,7 @@ func TestAllocatorReadAmpCheck(t *testing.T) { test.conf, nil, nil, + nil, Alive, ) require.NoError(t, err) @@ -718,6 +719,7 @@ func TestAllocatorReadAmpCheck(t *testing.T) { test.conf, nil, nil, + nil, // Dead and Decommissioning should behave the same here, use either. func() ReplicaStatus { if i%2 == 0 { @@ -747,7 +749,7 @@ func TestAllocatorTwoDatacenters(t *testing.T) { ctx, sp, multiDCConfigSSD, - nil /* existingVoters */, nil, /* existingNonVoters */ + nil /* existingVoters */, nil /* existingNonVoters */, nil, /* replacing */ Dead, ) if err != nil { @@ -760,7 +762,7 @@ func TestAllocatorTwoDatacenters(t *testing.T) { []roachpb.ReplicaDescriptor{{ NodeID: result1.NodeID, StoreID: result1.StoreID, - }}, nil, /* existingNonVoters */ + }}, nil /* existingNonVoters */, nil, /* replacing */ Dead, ) if err != nil { @@ -785,7 +787,7 @@ func TestAllocatorTwoDatacenters(t *testing.T) { NodeID: result2.NodeID, StoreID: result2.StoreID, }, - }, nil, /* existingNonVoters */ + }, nil /* existingNonVoters */, nil, /* replacing */ Dead, ) if err == nil { @@ -821,6 +823,7 @@ func TestAllocatorExistingReplica(t *testing.T) { StoreID: 2, }, }, nil, /* existingNonVoters */ + nil, /* replacing */ Dead, ) if err != nil { @@ -886,6 +889,11 @@ func TestAllocatorReplaceDecommissioningReplica(t *testing.T) { ReplicaID: 2, }, }, nil, /* existingNonVoters */ + &roachpb.ReplicaDescriptor{ + NodeID: 3, + StoreID: 3, + ReplicaID: 3, + }, Decommissioning, ) if err != nil { @@ -896,6 +904,72 @@ func TestAllocatorReplaceDecommissioningReplica(t *testing.T) { } } +func TestAllocatorReplaceFailsOnConstrainedDecommissioningReplica(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper, g, sp, a, _ := CreateTestAllocator(ctx, 5, false /* deterministic */) + defer stopper.Stop(ctx) + gossiputil.NewStoreGossiper(g).GossipStores(sameDCStores, t) + + // Override liveness of n3 to decommissioning so the only available target is s4. + oSp := storepool.NewOverrideStorePool(sp, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + if nid == roachpb.NodeID(3) { + return livenesspb.NodeLivenessStatus_DECOMMISSIONING + } + + return sp.NodeLivenessFn(nid, now, timeUntilStoreDead) + }, func() int { + return 4 + }) + + _, _, err := a.AllocateVoter( + ctx, + oSp, + roachpb.SpanConfig{ + NumReplicas: 3, + Constraints: []roachpb.ConstraintsConjunction{ + { + Constraints: []roachpb.Constraint{ + {Value: "mem", Type: roachpb.Constraint_PROHIBITED}, + }, + }, + { + NumReplicas: 1, + Constraints: []roachpb.Constraint{ + {Value: "n3", Type: roachpb.Constraint_REQUIRED}, + }, + }, + }, + }, + []roachpb.ReplicaDescriptor{ + { + NodeID: 1, + StoreID: 1, + ReplicaID: 1, + }, + { + NodeID: 2, + StoreID: 2, + ReplicaID: 2, + }, + }, nil, /* existingNonVoters */ + &roachpb.ReplicaDescriptor{ + NodeID: 3, + StoreID: 3, + ReplicaID: 3, + }, + Decommissioning, + ) + require.Errorf(t, err, "Unable to perform allocation: "+ + "0 of 4 live stores are able to take a new replica for the range "+ + "(2 already have a voter, 0 already have a non-voter); "+ + "replicas must match constraints [{-mem} {+n3:1}]; "+ + "voting replicas must match voter_constraints []", + ) +} + func TestAllocatorMultipleStoresPerNode(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -989,7 +1063,7 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { for _, tc := range testCases { { result, _, err := a.AllocateVoter( - ctx, sp, emptySpanConfig(), tc.existing, nil, + ctx, sp, emptySpanConfig(), tc.existing, nil, nil, Dead, ) if e, a := tc.expectTargetAllocate, !roachpb.Empty(result); e != a { @@ -3075,7 +3149,7 @@ func TestAllocatorConstraintsAndVoterConstraints(t *testing.T) { // Allocate the voting replica first, before the non-voter. This is the // order in which we'd expect the allocator to repair a given range. See // TestAllocatorComputeAction. - voterTarget, _, err := a.AllocateVoter(ctx, sp, test.conf, test.existingVoters, test.existingNonVoters, Dead) + voterTarget, _, err := a.AllocateVoter(ctx, sp, test.conf, test.existingVoters, test.existingNonVoters, nil, Dead) if test.shouldVoterAllocFail { require.Errorf(t, err, "expected voter allocation to fail; got %v as a valid target instead", voterTarget) } else { @@ -3084,7 +3158,7 @@ func TestAllocatorConstraintsAndVoterConstraints(t *testing.T) { test.existingVoters = append(test.existingVoters, replicas(voterTarget.StoreID)...) } - nonVoterTarget, _, err := a.AllocateNonVoter(ctx, sp, test.conf, test.existingVoters, test.existingNonVoters, Dead) + nonVoterTarget, _, err := a.AllocateNonVoter(ctx, sp, test.conf, test.existingVoters, test.existingNonVoters, nil /* replacing */, Dead) if test.shouldNonVoterAllocFail { require.Errorf(t, err, "expected non-voter allocation to fail; got %v as a valid target instead", nonVoterTarget) } else { @@ -3158,7 +3232,7 @@ func TestAllocatorAllocateTargetLocality(t *testing.T) { StoreID: storeID, } } - targetStore, details, err := a.AllocateVoter(ctx, sp, emptySpanConfig(), existingRepls, nil, Dead) + targetStore, details, err := a.AllocateVoter(ctx, sp, emptySpanConfig(), existingRepls, nil, nil, Dead) if err != nil { t.Fatal(err) } @@ -3625,7 +3699,7 @@ func TestAllocatorNonVoterAllocationExcludesVoterNodes(t *testing.T) { sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(test.stores, t) - result, _, err := a.AllocateNonVoter(ctx, sp, test.conf, test.existingVoters, test.existingNonVoters, Dead) + result, _, err := a.AllocateNonVoter(ctx, sp, test.conf, test.existingVoters, test.existingNonVoters, nil /* replacing */, Dead) if test.shouldFail { require.Error(t, err) require.Regexp(t, test.expError, err) @@ -8893,7 +8967,7 @@ func TestNonVoterPrioritizationInVoterAdditions(t *testing.T) { } for i, tc := range testCases { - result, _, _ := a.AllocateVoter(ctx, sp, tc.spanConfig, tc.existingVoters, tc.existingNonVoters, Alive) + result, _, _ := a.AllocateVoter(ctx, sp, tc.spanConfig, tc.existingVoters, tc.existingNonVoters, nil, Alive) assert.Equal(t, tc.expectedTargetAllocate, result, "Unexpected replication target returned by allocate voter in test %d", i) } } diff --git a/pkg/kv/kvserver/allocator_impl_test.go b/pkg/kv/kvserver/allocator_impl_test.go index db9ffb04c76d..d3b31dc73d2a 100644 --- a/pkg/kv/kvserver/allocator_impl_test.go +++ b/pkg/kv/kvserver/allocator_impl_test.go @@ -102,6 +102,105 @@ var threeStores = []*roachpb.StoreDescriptor{ }, } +var fourSingleStoreRacks = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Attrs: roachpb.Attributes{Attrs: []string{"red"}}, + Node: roachpb.NodeDescriptor{ + NodeID: 1, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + { + Key: "region", + Value: "local", + }, + { + Key: "rack", + Value: "1", + }, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + Capacity: 200, + Available: 100, + LogicalBytes: 100, + }, + }, + { + StoreID: 2, + Attrs: roachpb.Attributes{Attrs: []string{"red"}}, + Node: roachpb.NodeDescriptor{ + NodeID: 2, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + { + Key: "region", + Value: "local", + }, + { + Key: "rack", + Value: "2", + }, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + Capacity: 200, + Available: 100, + LogicalBytes: 100, + }, + }, + { + StoreID: 3, + Attrs: roachpb.Attributes{Attrs: []string{"black"}}, + Node: roachpb.NodeDescriptor{ + NodeID: 3, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + { + Key: "region", + Value: "local", + }, + { + Key: "rack", + Value: "3", + }, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + Capacity: 200, + Available: 100, + LogicalBytes: 100, + }, + }, + { + StoreID: 4, + Attrs: roachpb.Attributes{Attrs: []string{"black"}}, + Node: roachpb.NodeDescriptor{ + NodeID: 4, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + { + Key: "region", + Value: "local", + }, + { + Key: "rack", + Value: "4", + }, + }, + }, + }, + Capacity: roachpb.StoreCapacity{ + Capacity: 200, + Available: 100, + LogicalBytes: 100, + }, + }, +} + // TestAllocatorRebalanceTarget could help us to verify whether we'll rebalance // to a target that we'll immediately remove. func TestAllocatorRebalanceTarget(t *testing.T) { @@ -298,14 +397,14 @@ func TestAllocatorThrottled(t *testing.T) { defer stopper.Stop(ctx) // First test to make sure we would send the replica to purgatory. - _, _, err := a.AllocateVoter(ctx, sp, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) + _, _, err := a.AllocateVoter(ctx, sp, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, nil, allocatorimpl.Dead) if _, ok := IsPurgatoryError(err); !ok { t.Fatalf("expected a purgatory error, got: %+v", err) } // Second, test the normal case in which we can allocate to the store. gossiputil.NewStoreGossiper(g).GossipStores(singleStore, t) - result, _, err := a.AllocateVoter(ctx, sp, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) + result, _, err := a.AllocateVoter(ctx, sp, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, nil, allocatorimpl.Dead) if err != nil { t.Fatalf("unable to perform allocation: %+v", err) } @@ -322,7 +421,7 @@ func TestAllocatorThrottled(t *testing.T) { } storeDetail.ThrottledUntil = timeutil.Now().Add(24 * time.Hour) sp.DetailsMu.Unlock() - _, _, err = a.AllocateVoter(ctx, sp, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) + _, _, err = a.AllocateVoter(ctx, sp, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, nil, allocatorimpl.Dead) if _, ok := IsPurgatoryError(err); ok { t.Fatalf("expected a non purgatory error, got: %+v", err) } diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index d8dd67ba5cd2..c1cba61fba4f 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -1154,16 +1154,21 @@ func (rq *replicateQueue) addOrReplaceVoters( ) (op AllocationOp, _ error) { effects := effectBuilder{} desc, conf := repl.DescAndSpanConfig() - isReplace := removeIdx >= 0 + var replacing *roachpb.ReplicaDescriptor + if removeIdx >= 0 { + replacing = &existingVoters[removeIdx] + } // The allocator should not try to re-add this replica since there is a reason // we're removing it (i.e. dead or decommissioning). If we left the replica in // the slice, the allocator would not be guaranteed to pick a replica that // fills the gap removeRepl leaves once it's gone. - newVoter, details, err := rq.allocator.AllocateVoter(ctx, rq.storePool, conf, remainingLiveVoters, remainingLiveNonVoters, replicaStatus) + newVoter, details, err := rq.allocator.AllocateVoter(ctx, rq.storePool, conf, remainingLiveVoters, remainingLiveNonVoters, replacing, replicaStatus) if err != nil { return nil, err } + + isReplace := removeIdx >= 0 if isReplace && newVoter.StoreID == existingVoters[removeIdx].StoreID { return nil, errors.AssertionFailedf("allocator suggested to replace replica on s%d with itself", newVoter.StoreID) } @@ -1254,8 +1259,12 @@ func (rq *replicateQueue) addOrReplaceNonVoters( ) (op AllocationOp, _ error) { effects := effectBuilder{} conf := repl.SpanConfig() + var replacing *roachpb.ReplicaDescriptor + if removeIdx >= 0 { + replacing = &existingNonVoters[removeIdx] + } - newNonVoter, details, err := rq.allocator.AllocateNonVoter(ctx, rq.storePool, conf, liveVoterReplicas, liveNonVoterReplicas, replicaStatus) + newNonVoter, details, err := rq.allocator.AllocateNonVoter(ctx, rq.storePool, conf, liveVoterReplicas, liveNonVoterReplicas, replacing, replicaStatus) if err != nil { return nil, err } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index bceb8ad0f107..3d0ff7ff74fd 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2069,6 +2069,9 @@ func (s *Store) GetConfReader(ctx context.Context) (spanconfig.StoreReader, erro if s.cfg.TestingKnobs.MakeSystemConfigSpanUnavailableToQueues { return nil, errSysCfgUnavailable } + if s.cfg.TestingKnobs.ConfReaderInterceptor != nil { + return s.cfg.TestingKnobs.ConfReaderInterceptor(), nil + } if s.cfg.SpanConfigsDisabled || !spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) || @@ -3311,7 +3314,7 @@ func (s *Store) AllocatorCheckRange( return action, roachpb.ReplicationTarget{}, sp.FinishAndGetConfiguredRecording(), err } - liveVoters, liveNonVoters, isReplacement, nothingToDo, err := + filteredVoters, filteredNonVoters, replacing, nothingToDo, err := allocatorimpl.FilterReplicasForAction(storePool, desc, action) if nothingToDo || err != nil { @@ -3319,7 +3322,7 @@ func (s *Store) AllocatorCheckRange( } target, _, err := s.allocator.AllocateTarget(ctx, storePool, conf, - liveVoters, liveNonVoters, action.ReplicaStatus(), action.TargetReplicaType(), + filteredVoters, filteredNonVoters, replacing, action.ReplicaStatus(), action.TargetReplicaType(), ) if err == nil { log.Eventf(ctx, "found valid allocation of %s target %v", action.TargetReplicaType(), target) @@ -3331,11 +3334,11 @@ func (s *Store) AllocatorCheckRange( storePool, conf, desc.Replicas().VoterDescriptors(), - liveNonVoters, + filteredVoters, action.ReplicaStatus(), action.TargetReplicaType(), target, - isReplacement, + replacing != nil, ) if fragileQuorumErr != nil { diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index d292ec98cb06..bb246b1fc8dd 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -49,6 +49,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -3404,6 +3405,32 @@ func TestSnapshotRateLimit(t *testing.T) { } } +type mockSpanConfigReader struct { + real spanconfig.StoreReader + overrides map[string]roachpb.SpanConfig +} + +func (m *mockSpanConfigReader) NeedsSplit(ctx context.Context, start, end roachpb.RKey) bool { + panic("unimplemented") +} + +func (m *mockSpanConfigReader) ComputeSplitKey( + ctx context.Context, start, end roachpb.RKey, +) roachpb.RKey { + panic("unimplemented") +} + +func (m *mockSpanConfigReader) GetSpanConfigForKey( + ctx context.Context, key roachpb.RKey, +) (roachpb.SpanConfig, error) { + if conf, ok := m.overrides[string(key)]; ok { + return conf, nil + } + return m.GetSpanConfigForKey(ctx, key) +} + +var _ spanconfig.StoreReader = &mockSpanConfigReader{} + // TestAllocatorCheckRangeUnconfigured tests evaluating the allocation decisions // for a range with a single replica using the default system configuration and // no other available allocation targets. @@ -3456,15 +3483,17 @@ func TestAllocatorCheckRange(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) + defaultSystemSpanConfig := zonepb.DefaultSystemZoneConfigRef().AsSpanConfig() + for _, tc := range []struct { name string stores []*roachpb.StoreDescriptor existingReplicas []roachpb.ReplicaDescriptor - zoneConfig *zonepb.ZoneConfig + spanConfig *roachpb.SpanConfig livenessOverrides map[roachpb.NodeID]livenesspb.NodeLivenessStatus - baselineExpNoop bool expectedAction allocatorimpl.AllocatorAction expectValidTarget bool + expectedTarget roachpb.ReplicationTarget expectedLogMessage string expectErr bool expectAllocatorErr bool @@ -3553,11 +3582,10 @@ func TestAllocatorCheckRange(t *testing.T) { {NodeID: 4, StoreID: 4, ReplicaID: 4}, {NodeID: 5, StoreID: 5, ReplicaID: 5}, }, - zoneConfig: zonepb.DefaultSystemZoneConfigRef(), + spanConfig: &defaultSystemSpanConfig, livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{ 3: livenesspb.NodeLivenessStatus_DECOMMISSIONING, }, - baselineExpNoop: true, expectedAction: allocatorimpl.AllocatorRemoveDecommissioningVoter, expectErr: false, expectValidTarget: false, @@ -3572,13 +3600,12 @@ func TestAllocatorCheckRange(t *testing.T) { {NodeID: 4, StoreID: 4, ReplicaID: 4}, {NodeID: 5, StoreID: 5, ReplicaID: 5}, }, - zoneConfig: zonepb.DefaultSystemZoneConfigRef(), + spanConfig: &defaultSystemSpanConfig, livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{ 3: livenesspb.NodeLivenessStatus_DECOMMISSIONING, 4: livenesspb.NodeLivenessStatus_DECOMMISSIONING, 5: livenesspb.NodeLivenessStatus_DECOMMISSIONING, }, - baselineExpNoop: true, expectedAction: allocatorimpl.AllocatorReplaceDecommissioningVoter, expectValidTarget: false, expectAllocatorErr: true, @@ -3597,7 +3624,7 @@ func TestAllocatorCheckRange(t *testing.T) { // Region "c" {NodeID: 7, StoreID: 7, ReplicaID: 5}, }, - zoneConfig: zonepb.DefaultSystemZoneConfigRef(), + spanConfig: &defaultSystemSpanConfig, livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{ // Downsize to one node per region: 3,6,9. 1: livenesspb.NodeLivenessStatus_DECOMMISSIONING, @@ -3624,7 +3651,7 @@ func TestAllocatorCheckRange(t *testing.T) { // Region "c" {NodeID: 7, StoreID: 7, ReplicaID: 5}, }, - zoneConfig: zonepb.DefaultSystemZoneConfigRef(), + spanConfig: &defaultSystemSpanConfig, livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{ // Downsize to: 1,4,7,9 but 7 is dead. // Replica on n7 should be replaced first. @@ -3639,6 +3666,196 @@ func TestAllocatorCheckRange(t *testing.T) { expectErr: false, expectValidTarget: true, }, + { + name: "decommissioning without satisfying partially constrained locality", + stores: fourSingleStoreRacks, + existingReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 4, StoreID: 4, ReplicaID: 3}, + }, + livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{ + 4: livenesspb.NodeLivenessStatus_DECOMMISSIONING, + }, + spanConfig: &roachpb.SpanConfig{ + NumReplicas: 3, + Constraints: []roachpb.ConstraintsConjunction{ + { + NumReplicas: 1, + Constraints: []roachpb.Constraint{ + { + Type: roachpb.Constraint_REQUIRED, + Key: "rack", + Value: "4", + }, + }, + }, + }, + }, + expectedAction: allocatorimpl.AllocatorReplaceDecommissioningVoter, + expectAllocatorErr: true, + expectedErrStr: "replicas must match constraints", + expectedLogMessage: "cannot allocate necessary voter on s3", + }, + { + name: "decommissioning without satisfying multiple partial constraints", + stores: fourSingleStoreRacks, + existingReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 4, StoreID: 4, ReplicaID: 3}, + }, + livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{ + 4: livenesspb.NodeLivenessStatus_DECOMMISSIONING, + }, + spanConfig: &roachpb.SpanConfig{ + NumReplicas: 3, + Constraints: []roachpb.ConstraintsConjunction{ + { + NumReplicas: 1, + Constraints: []roachpb.Constraint{ + { + Type: roachpb.Constraint_REQUIRED, + Value: "black", + }, + }, + }, + { + NumReplicas: 1, + Constraints: []roachpb.Constraint{ + { + Type: roachpb.Constraint_REQUIRED, + Key: "rack", + Value: "4", + }, + }, + }, + }, + }, + expectedAction: allocatorimpl.AllocatorReplaceDecommissioningVoter, + expectAllocatorErr: true, + expectedErrStr: "replicas must match constraints", + expectedLogMessage: "cannot allocate necessary voter on s3", + }, + { + name: "decommissioning during upreplication with partial constraints", + stores: fourSingleStoreRacks, + existingReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + }, + livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{ + 4: livenesspb.NodeLivenessStatus_DECOMMISSIONING, + }, + spanConfig: &roachpb.SpanConfig{ + NumReplicas: 3, + Constraints: []roachpb.ConstraintsConjunction{ + { + NumReplicas: 1, + Constraints: []roachpb.Constraint{ + { + Type: roachpb.Constraint_REQUIRED, + Key: "rack", + Value: "4", + }, + }, + }, + }, + }, + expectedAction: allocatorimpl.AllocatorAddVoter, + expectValidTarget: true, + }, + { + name: "decommissioning with replacement satisfying locality", + stores: multiRegionStores, + existingReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 4, StoreID: 4, ReplicaID: 2}, + {NodeID: 7, StoreID: 7, ReplicaID: 3}, + }, + livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{ + 1: livenesspb.NodeLivenessStatus_DECOMMISSIONING, + 3: livenesspb.NodeLivenessStatus_DEAD, + }, + spanConfig: &roachpb.SpanConfig{ + NumReplicas: 3, + Constraints: []roachpb.ConstraintsConjunction{ + { + NumReplicas: 1, + Constraints: []roachpb.Constraint{ + { + Type: roachpb.Constraint_REQUIRED, + Key: "region", + Value: "a", + }, + }, + }, + { + NumReplicas: 1, + Constraints: []roachpb.Constraint{ + { + Type: roachpb.Constraint_REQUIRED, + Key: "region", + Value: "b", + }, + }, + }, + }, + }, + expectedAction: allocatorimpl.AllocatorReplaceDecommissioningVoter, + expectValidTarget: true, + expectedTarget: roachpb.ReplicationTarget{NodeID: 2, StoreID: 2}, + expectErr: false, + }, + { + name: "decommissioning without satisfying fully constrained locality", + stores: fourSingleStoreRacks, + existingReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 4, StoreID: 4, ReplicaID: 3}, + }, + livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{ + 4: livenesspb.NodeLivenessStatus_DECOMMISSIONING, + }, + spanConfig: &roachpb.SpanConfig{ + NumReplicas: 3, + Constraints: []roachpb.ConstraintsConjunction{ + { + NumReplicas: 1, + Constraints: []roachpb.Constraint{ + { + Type: roachpb.Constraint_REQUIRED, + Key: "rack", + Value: "1", + }, + }, + }, + { + NumReplicas: 1, + Constraints: []roachpb.Constraint{ + { + Type: roachpb.Constraint_REQUIRED, + Key: "rack", + Value: "2", + }, + }, + }, + { + NumReplicas: 1, + Constraints: []roachpb.Constraint{ + { + Type: roachpb.Constraint_REQUIRED, + Key: "rack", + Value: "4", + }, + }, + }, + }, + }, + expectedAction: allocatorimpl.AllocatorReplaceDecommissioningVoter, + expectAllocatorErr: true, + expectedErrStr: "replicas must match constraints", + }, } { t.Run(tc.name, func(t *testing.T) { // Setup store pool based on store descriptors and configure test store. @@ -3659,11 +3876,17 @@ func TestAllocatorCheckRange(t *testing.T) { cfg := TestStoreConfig(clock) cfg.Gossip = g cfg.StorePool = sp - if tc.zoneConfig != nil { - // TODO(sarkesian): This override is not great. It would be much - // preferable to provide a SpanConfig if possible. See comment in - // createTestStoreWithoutStart. - cfg.SystemConfigProvider.GetSystemConfig().DefaultZoneConfig = tc.zoneConfig + if tc.spanConfig != nil { + mockSr := &mockSpanConfigReader{ + real: cfg.SystemConfigProvider.GetSystemConfig(), + overrides: map[string]roachpb.SpanConfig{ + "a": *tc.spanConfig, + }, + } + + cfg.TestingKnobs.ConfReaderInterceptor = func() spanconfig.StoreReader { + return mockSr + } } s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) @@ -3691,20 +3914,6 @@ func TestAllocatorCheckRange(t *testing.T) { storePoolOverride = storepool.NewOverrideStorePool(sp, livenessOverride, nodeCountOverride) } - // Check if our baseline action without overrides is a noop; i.e., the - // range is fully replicated as configured and needs no actions. - if tc.baselineExpNoop { - action, _, _, err := s.AllocatorCheckRange(ctx, desc, - false /* collectTraces */, nil, /* overrideStorePool */ - ) - require.NoError(t, err, "expected baseline check without error") - require.Containsf(t, []allocatorimpl.AllocatorAction{ - allocatorimpl.AllocatorNoop, - allocatorimpl.AllocatorConsiderRebalance, - }, action, "expected baseline noop, got %s", action) - //require.Equalf(t, allocatorimpl.AllocatorNoop, action, "expected baseline noop, got %s", action) - } - // Execute actual allocator range repair check. action, target, recording, err := s.AllocatorCheckRange(ctx, desc, true /* collectTraces */, storePoolOverride, @@ -3733,12 +3942,18 @@ func TestAllocatorCheckRange(t *testing.T) { } if tc.expectValidTarget { - require.NotEqualf(t, roachpb.ReplicationTarget{}, target, "expected valid target") + require.Falsef(t, roachpb.Empty(target), "expected valid target") + } + + if !roachpb.Empty(tc.expectedTarget) { + require.Equalf(t, tc.expectedTarget, target, "expected target %s, got %s", + tc.expectedTarget, target, + ) } if tc.expectedLogMessage != "" { _, ok := recording.FindLogMessage(tc.expectedLogMessage) - require.Truef(t, ok, "expected to find trace \"%s\"", tc.expectedLogMessage) + require.Truef(t, ok, "expected to find \"%s\" in trace:\n%s", tc.expectedLogMessage, recording) } }) } diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index e5ebda32f678..fb033789b283 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -412,6 +412,8 @@ type StoreTestingKnobs struct { // TODO(irfansharif): Get rid of this knob, maybe by first moving // DisableSpanConfigs into a testing knob instead of a server arg. UseSystemConfigSpanForQueues bool + // ConfReaderInterceptor intercepts calls to get a span config reader. + ConfReaderInterceptor func() spanconfig.StoreReader // IgnoreStrictGCEnforcement is used by tests to op out of strict GC // enforcement. IgnoreStrictGCEnforcement bool