diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index 83b1d2becc0c..7a42219721ac 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -19,6 +19,7 @@ import ( "strings" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -899,7 +900,7 @@ func (a *Allocator) allocateTarget( conf, existingVoters, existingNonVoters, - a.scorerOptions(), + a.scorerOptions(ctx), // When allocating a *new* replica, we explicitly disregard nodes with any // existing replicas. This is important for multi-store scenarios as // otherwise, stores on the nodes that have existing replicas are simply @@ -1122,6 +1123,7 @@ func (a Allocator) removeTarget( replicaSetForDiversityCalc := getReplicasForDiversityCalc(targetType, existingVoters, existingReplicas) rankedCandidates := candidateListForRemoval( + ctx, candidateStoreList, constraintsChecker, a.storePool.getLocalitiesByStore(replicaSetForDiversityCalc), @@ -1451,16 +1453,18 @@ func (a Allocator) RebalanceNonVoter( ) } -func (a *Allocator) scorerOptions() *rangeCountScorerOptions { +func (a *Allocator) scorerOptions(ctx context.Context) *rangeCountScorerOptions { return &rangeCountScorerOptions{ + storeHealthOptions: a.storeHealthOptions(ctx), deterministic: a.storePool.deterministic, rangeRebalanceThreshold: rangeRebalanceThreshold.Get(&a.storePool.st.SV), } } -func (a *Allocator) scorerOptionsForScatter() *scatterScorerOptions { +func (a *Allocator) scorerOptionsForScatter(ctx context.Context) *scatterScorerOptions { return &scatterScorerOptions{ rangeCountScorerOptions: rangeCountScorerOptions{ + storeHealthOptions: a.storeHealthOptions(ctx), deterministic: a.storePool.deterministic, rangeRebalanceThreshold: 0, }, @@ -1588,6 +1592,24 @@ func (a *Allocator) leaseholderShouldMoveDueToPreferences( return true } +// storeHealthOptions returns the store health options, currently only +// considering the threshold for L0 sub-levels. This threshold is not +// considered in allocation or rebalancing decisions (excluding candidate +// stores as targets) when enforcementLevel is set to storeHealthNoAction or +// storeHealthLogOnly. By default storeHealthLogOnly is the action taken. When +// there is a mixed version cluster, storeHealthNoAction is set instead. +func (a *Allocator) storeHealthOptions(ctx context.Context) storeHealthOptions { + enforcementLevel := storeHealthNoAction + if a.storePool.st.Version.IsActive(ctx, clusterversion.AutoStatsTableSettings) { + enforcementLevel = storeHealthEnforcement(l0SublevelsThresholdEnforce.Get(&a.storePool.st.SV)) + } + + return storeHealthOptions{ + enforcementLevel: enforcementLevel, + l0SublevelThreshold: l0SublevelsThreshold.Get(&a.storePool.st.SV), + } +} + // TransferLeaseTarget returns a suitable replica to transfer the range lease // to from the provided list. It includes the current lease holder replica // unless asked to do otherwise by the excludeLeaseRepl parameter. @@ -1735,11 +1757,14 @@ func (a *Allocator) TransferLeaseTarget( // https://github.com/cockroachdb/cockroach/issues/75630. bestStore, noRebalanceReason := bestStoreToMinimizeQPSDelta( leaseReplQPS, - qpsRebalanceThreshold.Get(&a.storePool.st.SV), - minQPSDifferenceForTransfers.Get(&a.storePool.st.SV), leaseRepl.StoreID(), candidates, storeDescMap, + &qpsScorerOptions{ + storeHealthOptions: a.storeHealthOptions(ctx), + qpsRebalanceThreshold: qpsRebalanceThreshold.Get(&a.storePool.st.SV), + minRequiredQPSDiff: minQPSDifferenceForTransfers.Get(&a.storePool.st.SV), + }, ) switch noRebalanceReason { diff --git a/pkg/kv/kvserver/allocator_scorer.go b/pkg/kv/kvserver/allocator_scorer.go index 3c33358f74e8..1131cc52e358 100644 --- a/pkg/kv/kvserver/allocator_scorer.go +++ b/pkg/kv/kvserver/allocator_scorer.go @@ -16,6 +16,7 @@ import ( "fmt" "math" "sort" + "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -70,6 +71,50 @@ const ( // consider a store full/empty if it's at least minRebalanceThreshold // away from the mean. minRangeRebalanceThreshold = 2 + + // maxL0SublevelThreshold is the number of L0 sub-levels of a store + // descriptor, that when greater than this value and in excees of the + // average L0 sub-levels in the cluster - will have the action defined by + // l0SublevelsThresholdEnforce taken. This value does not affect the + // allocator in deciding to remove replicas from it's store, only + // potentially block adding or moving replicas to other stores. + maxL0SublevelThreshold = 20 + + // l0SublevelInterval is the period over which to accumulate statistics on + // the number of L0 sublevels within a store. + l0SublevelInterval = time.Minute * 5 + + // l0SublevelMaxSampled is maximum number of L0 sub-levels that may exist + // in a sample. This setting limits the extreme skew that could occur by + // capping the highest possible value considered. + l0SublevelMaxSampled = 500 + + // l0SubLevelWaterMark is the percentage above the mean after which a store + // could be conisdered unhealthy if also exceeding the threshold. + l0SubLevelWaterMark = 1.10 +) + +// storeHealthEnforcement represents the level of action that may be taken or +// excluded when a candidate disk is considered unhealthy. +type storeHealthEnforcement int64 + +const ( + // storeHealthNoAction will take no action upon candidate stores when they + // exceed l0SublevelThreshold. + storeHealthNoAction storeHealthEnforcement = iota + // storeHealthLogOnly will take no action upon candidate stores when they + // exceed l0SublevelThreshold except log an event. + storeHealthLogOnly + // storeHealthBlockRebalanceTo will take action to exclude candidate stores + // when they exceed l0SublevelThreshold and mean from being considered + // targets for rebalance actions only. Allocation actions such as adding + // upreplicaing an range will not be affected. + storeHealthBlockRebalanceTo + // storeHealthBlockAll will take action to exclude candidate stores when + // they exceed l0SublevelThreshold and mean from being candidates for all + // replica allocation and rebalancing. When enabled and stores exceed the + // threshold, they will not receive any new replicas. + storeHealthBlockAll ) // rangeRebalanceThreshold is the minimum ratio of a store's range count to @@ -87,6 +132,55 @@ var rangeRebalanceThreshold = func() *settings.FloatSetting { return s }() +// l0SublevelsThreshold is the maximum number of sub-levels within level 0 that +// may exist on candidate store descriptors before they are considered +// unhealthy. Once considered unhealthy, the action taken will be dictated by +// l0SublevelsThresholdEnforce cluster setting defined below. The rationale for +// using L0 sub-levels as opposed to read amplification is that it is more +// generally the variable component that makes up read amplification. When +// L0 sub-levels is high, it is an indicator of poor LSM health as L0 is usually +// in memory and must be first visited before traversing any further level. See +// this issue for additional information: +// https://github.com/cockroachdb/pebble/issues/609 +var l0SublevelsThreshold = settings.RegisterIntSetting( + settings.SystemOnly, + "kv.allocator.l0_sublevels_threshold", + "the maximum number of l0 sublevels within a store that may exist "+ + "before the action defined in "+ + "`kv.allocator.l0_sublevels_threshold_enforce` will be taken "+ + "if also exceeding the cluster average", + maxL0SublevelThreshold, +) + +// l0SublevelsThresholdEnforce is the level of enforcement taken upon candidate +// stores when their L0-sublevels exceeds the threshold defined in +// l0SublevelThreshold. Under disabled and log enforcement, no action is taken +// to exclude the candidate store either as a potential allocation nor +// rebalance target by the replicate queue and store rebalancer. When the +// enforcement level is rebalance, candidate stores will be excluded as targets +// for rebalancing when exceeding the threshold, however will remain candidates +// for allocation of voters and non-voters. When allocate is set, candidates +// are excluded as targets for all rebalancing and also allocation of voters +// and non-voters. +var l0SublevelsThresholdEnforce = settings.RegisterEnumSetting( + settings.SystemOnly, + "kv.allocator.l0_sublevels_threshold_enforce", + "the level of enforcement when a candidate disk has L0 sub-levels "+ + "exceeding `kv.allocator.l0_sublevels_threshold` and above the "+ + "cluster average:`block_none` will exclude "+ + "no candidate stores, `block_none_log` will exclude no candidates but log an "+ + "event, `block_rebalance_to` will exclude candidates stores from being "+ + "targets of rebalance actions, `block_all` will exclude candidate stores "+ + "from being targets of both allocation and rebalancing", + "block_none_log", + map[int64]string{ + int64(storeHealthNoAction): "block_none", + int64(storeHealthLogOnly): "block_none_log", + int64(storeHealthBlockRebalanceTo): "block_rebalance_to", + int64(storeHealthBlockAll): "block_all", + }, +) + // CockroachDB has two heuristics that trigger replica rebalancing: range count // convergence and QPS convergence. scorerOptions defines the interface that // both of these heuristics must implement. @@ -140,6 +234,9 @@ type scorerOptions interface { // with the same QPS) that would converge the range's existing stores' QPS the // most. removalMaximallyConvergesScore(removalCandStoreList StoreList, existing roachpb.StoreDescriptor) int + // getStoreHealthOptions returns the scorer options for store health. It is + // used to inform scoring based on the health of a store. + getStoreHealthOptions() storeHealthOptions } func jittered(val float64, jitter float64, rand allocatorRand) float64 { @@ -181,6 +278,7 @@ func (o *scatterScorerOptions) maybeJitterStoreStats( // This means that the resulting rebalancing decisions will further the goal of // converging range counts across stores in the cluster. type rangeCountScorerOptions struct { + storeHealthOptions deterministic bool rangeRebalanceThreshold float64 } @@ -295,6 +393,7 @@ func (o *rangeCountScorerOptions) removalMaximallyConvergesScore( // queries-per-second. This means that the resulting rebalancing decisions will // further the goal of converging QPS across stores in the cluster. type qpsScorerOptions struct { + storeHealthOptions deterministic bool qpsRebalanceThreshold, minRequiredQPSDiff float64 @@ -443,13 +542,15 @@ func (o *qpsScorerOptions) removalMaximallyConvergesScore( return 0 } -// candidate store for allocation. +// candidate store for allocation. These are ordered by importance. type candidate struct { store roachpb.StoreDescriptor valid bool fullDisk bool necessary bool diversityScore float64 + highReadAmp bool + l0SubLevels int convergesScore int balanceScore balanceStatus rangeCount int @@ -457,9 +558,9 @@ type candidate struct { } func (c candidate) String() string { - str := fmt.Sprintf("s%d, valid:%t, fulldisk:%t, necessary:%t, diversity:%.2f, converges:%d, "+ + str := fmt.Sprintf("s%d, valid:%t, fulldisk:%t, necessary:%t, diversity:%.2f, highReadAmp: %t, l0SubLevels: %d, converges:%d, "+ "balance:%d, rangeCount:%d, queriesPerSecond:%.2f", - c.store.StoreID, c.valid, c.fullDisk, c.necessary, c.diversityScore, c.convergesScore, + c.store.StoreID, c.valid, c.fullDisk, c.necessary, c.diversityScore, c.highReadAmp, c.l0SubLevels, c.convergesScore, c.balanceScore, c.rangeCount, c.store.Capacity.QueriesPerSecond) if c.details != "" { return fmt.Sprintf("%s, details:(%s)", str, c.details) @@ -482,6 +583,12 @@ func (c candidate) compactString() string { if c.diversityScore != 0 { fmt.Fprintf(&buf, ", diversity:%.2f", c.diversityScore) } + if c.highReadAmp { + fmt.Fprintf(&buf, ", highReadAmp:%t", c.highReadAmp) + } + if c.l0SubLevels > 0 { + fmt.Fprintf(&buf, ", l0SubLevels:%d", c.l0SubLevels) + } fmt.Fprintf(&buf, ", converges:%d, balance:%d, rangeCount:%d", c.convergesScore, c.balanceScore, c.rangeCount) if c.details != "" { @@ -502,29 +609,43 @@ func (c candidate) less(o candidate) bool { // candidate is. func (c candidate) compare(o candidate) float64 { if !o.valid { - return 6 + return 60 } if !c.valid { - return -6 + return -60 } if o.fullDisk { - return 5 + return 50 } if c.fullDisk { - return -5 + return -50 } if c.necessary != o.necessary { if c.necessary { - return 4 + return 40 } - return -4 + return -40 } if !scoresAlmostEqual(c.diversityScore, o.diversityScore) { if c.diversityScore > o.diversityScore { - return 3 + return 30 } - return -3 + return -30 + } + // If both o and c have high read amplification, then we prefer the + // canidate with lower read amp. + if o.highReadAmp && c.highReadAmp { + if o.l0SubLevels > c.l0SubLevels { + return 25 + } + } + if c.highReadAmp { + return -25 } + if o.highReadAmp { + return 25 + } + if c.convergesScore != o.convergesScore { if c.convergesScore > o.convergesScore { return 2 + float64(c.convergesScore-o.convergesScore)/10.0 @@ -587,6 +708,7 @@ func (c byScoreAndID) Less(i, j int) bool { c[i].rangeCount == c[j].rangeCount && c[i].necessary == c[j].necessary && c[i].fullDisk == c[j].fullDisk && + c[i].highReadAmp == c[j].highReadAmp && c[i].valid == c[j].valid { return c[i].store.StoreID < c[j].store.StoreID } @@ -594,11 +716,12 @@ func (c byScoreAndID) Less(i, j int) bool { } func (c byScoreAndID) Swap(i, j int) { c[i], c[j] = c[j], c[i] } -// onlyValidAndNotFull returns all the elements in a sorted (by score reversed) -// candidate list that are valid and not nearly full. -func (cl candidateList) onlyValidAndNotFull() candidateList { +// onlyValidAndHealthyDisk returns all the elements in a sorted (by score +// reversed) candidate list that are valid and not nearly full or with high +// read amplification. +func (cl candidateList) onlyValidAndHealthyDisk() candidateList { for i := len(cl) - 1; i >= 0; i-- { - if cl[i].valid && !cl[i].fullDisk { + if cl[i].valid && !cl[i].fullDisk && !cl[i].highReadAmp { return cl[:i+1] } } @@ -608,7 +731,7 @@ func (cl candidateList) onlyValidAndNotFull() candidateList { // best returns all the elements in a sorted (by score reversed) candidate list // that share the highest constraint score and are valid. func (cl candidateList) best() candidateList { - cl = cl.onlyValidAndNotFull() + cl = cl.onlyValidAndHealthyDisk() if len(cl) <= 1 { return cl } @@ -650,6 +773,14 @@ func (cl candidateList) worst() candidateList { } } } + // Are there candidates with high read amplification? If so, pick those. + if cl[len(cl)-1].highReadAmp { + for i := len(cl) - 2; i >= 0; i-- { + if !cl[i].highReadAmp { + return cl[i+1:] + } + } + } // Find the worst constraint/locality/converges/balanceScore values. for i := len(cl) - 2; i >= 0; i-- { if cl[i].necessary == cl[len(cl)-1].necessary && @@ -774,7 +905,11 @@ func rankedCandidateListForAllocation( continue } - if !maxCapacityCheck(s) { + if !maxCapacityCheck(s) || !options.storeHealthOptions.readAmpIsHealthy( + ctx, + s, + candidateStores.candidateL0Sublevels.mean, + ) { continue } diversityScore := diversityAllocateScore(s, existingStoreLocalities) @@ -808,6 +943,7 @@ func rankedCandidateListForAllocation( // // Stores that are marked as not valid, are in violation of a required criteria. func candidateListForRemoval( + ctx context.Context, existingReplsStoreList StoreList, constraintsCheck constraintsCheckFn, existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, @@ -827,10 +963,15 @@ func candidateListForRemoval( } diversityScore := diversityRemovalScore(s.StoreID, existingStoreLocalities) candidates = append(candidates, candidate{ - store: s, - valid: constraintsOK, - necessary: necessary, - fullDisk: !maxCapacityCheck(s), + store: s, + valid: constraintsOK, + necessary: necessary, + fullDisk: !maxCapacityCheck(s), + // When removing a replica from a store, we do not want to include + // high amplification in ranking stores. This would submit already + // high read amplification stores to additional load of moving a + // replica. + highReadAmp: false, diversityScore: diversityScore, }) } @@ -980,10 +1121,11 @@ const ( // decision would minimize the QPS range between the existing store and the // coldest store in the equivalence class. func bestStoreToMinimizeQPSDelta( - replQPS, rebalanceThreshold, minRequiredQPSDiff float64, + replQPS float64, existing roachpb.StoreID, candidates []roachpb.StoreID, storeDescMap map[roachpb.StoreID]*roachpb.StoreDescriptor, + options *qpsScorerOptions, ) (bestCandidate roachpb.StoreID, reason declineReason) { storeQPSMap := make(map[roachpb.StoreID]float64, len(candidates)+1) for _, store := range candidates { @@ -1027,7 +1169,7 @@ func bestStoreToMinimizeQPSDelta( // `bestCandidate` (not accounting for the replica under consideration) is // higher than `minQPSDifferenceForTransfers`. diffIgnoringRepl := existingQPSIgnoringRepl - bestCandQPS - if diffIgnoringRepl < minRequiredQPSDiff { + if diffIgnoringRepl < options.minRequiredQPSDiff { return 0, deltaNotSignificant } @@ -1035,7 +1177,7 @@ func bestStoreToMinimizeQPSDelta( // the equivalence class. mean := domainStoreList.candidateQueriesPerSecond.mean overfullThreshold := overfullQPSThreshold( - &qpsScorerOptions{qpsRebalanceThreshold: rebalanceThreshold}, + options, mean, ) if existingQPS < overfullThreshold { @@ -1093,11 +1235,10 @@ func (o *qpsScorerOptions) getRebalanceTargetToMinimizeDelta( } return bestStoreToMinimizeQPSDelta( o.qpsPerReplica, - o.qpsRebalanceThreshold, - o.minRequiredQPSDiff, eqClass.existing.StoreID, candidates, storeListToMap(domainStoreList), + o, ) } @@ -1127,6 +1268,7 @@ func rankedCandidateListForRebalancing( } valid, necessary := removalConstraintsChecker(store) fullDisk := !maxCapacityCheck(store) + if !valid { if !needRebalanceFrom { log.VEventf(ctx, 2, "s%d: should-rebalance(invalid): locality:%q", @@ -1142,10 +1284,15 @@ func rankedCandidateListForRebalancing( needRebalanceFrom = true } existingStores[store.StoreID] = candidate{ - store: store, - valid: valid, - necessary: necessary, - fullDisk: fullDisk, + store: store, + valid: valid, + necessary: necessary, + fullDisk: fullDisk, + // When rebalancing a replica away from a store, we do not want + // to include high amplification in ranking stores. This would + // submit already high read amplification stores to additional + // load of moving a replica. + highReadAmp: false, diversityScore: curDiversityScore, } } @@ -1215,15 +1362,19 @@ func rankedCandidateListForRebalancing( continue } + // NB: We construct equivalence classes based on locality hierarchies, + // the diversityScore must be the only thing that's populated at + // this stage, in additon to hard checks and validation. + // TODO(kvoli,ayushshah15): Refactor this to make it harder to + // inadvertently break the invariant above, constraintsOK, necessary := rebalanceConstraintsChecker(store, existing.store) - maxCapacityOK := maxCapacityCheck(store) diversityScore := diversityRebalanceFromScore( store, existing.store.StoreID, existingStoreLocalities) cand := candidate{ store: store, valid: constraintsOK, necessary: necessary, - fullDisk: !maxCapacityOK, + fullDisk: !maxCapacityCheck(store), diversityScore: diversityScore, } if !cand.less(existing) { @@ -1322,6 +1473,14 @@ func rankedCandidateListForRebalancing( // rebalance candidates. s := cand.store cand.fullDisk = !rebalanceToMaxCapacityCheck(s) + cand.l0SubLevels = int(s.Capacity.L0Sublevels) + cand.highReadAmp = !options.getStoreHealthOptions().rebalanceToReadAmpIsHealthy( + ctx, + s, + // We only wish to compare the read amplification to the + // comparable stores average and not the cluster. + comparable.candidateSL.candidateL0Sublevels.mean, + ) cand.balanceScore = options.balanceScore(comparable.candidateSL, s.Capacity) cand.convergesScore = options.rebalanceToConvergesScore(comparable, s) cand.rangeCount = int(s.Capacity.RangeCount) @@ -1856,6 +2015,73 @@ func convergesOnMean(oldVal, newVal, mean float64) bool { return math.Abs(newVal-mean) < math.Abs(oldVal-mean) } +type storeHealthOptions struct { + enforcementLevel storeHealthEnforcement + l0SublevelThreshold int64 +} + +func (o storeHealthOptions) getStoreHealthOptions() storeHealthOptions { + return o +} + +// readAmpIsHealthy returns true if the store read amplification does not exceed +// the cluster threshold and mean, or the enforcement level does not include +// excluding candidates from being allocation targets. +func (o storeHealthOptions) readAmpIsHealthy( + ctx context.Context, store roachpb.StoreDescriptor, avg float64, +) bool { + if o.enforcementLevel == storeHealthNoAction || + store.Capacity.L0Sublevels < o.l0SublevelThreshold { + return true + } + + // Still log an event when the L0 sub-levels exceeds the threshold, however + // does not exceed the cluster average. This is enabled to avoid confusion + // where candidate stores are still targets, despite exeeding the + // threshold. + if float64(store.Capacity.L0Sublevels) < avg*l0SubLevelWaterMark { + log.Eventf(ctx, "s%d, allocate check l0 sublevels %d exceeds threshold %d, but below average: %f, action enabled %d", + store.StoreID, store.Capacity.L0Sublevels, + o.l0SublevelThreshold, avg, o.enforcementLevel) + return true + } + + log.Eventf(ctx, "s%d, allocate check l0 sublevels %d exceeds threshold %d, above average: %f, action enabled %d", + store.StoreID, store.Capacity.L0Sublevels, + o.l0SublevelThreshold, avg, o.enforcementLevel) + + // The store is only considered unhealthy when the enforcement level is + // storeHealthBlockAll. + return o.enforcementLevel < storeHealthBlockAll +} + +// rebalanceToReadAmpIsHealthy returns true if the store read amplification does +// not exceed the cluster threshold and mean, or the enforcement level does not +// include excluding candidates from being rebalancing targets. +func (o storeHealthOptions) rebalanceToReadAmpIsHealthy( + ctx context.Context, store roachpb.StoreDescriptor, avg float64, +) bool { + if o.enforcementLevel == storeHealthNoAction || + store.Capacity.L0Sublevels < o.l0SublevelThreshold { + return true + } + + if float64(store.Capacity.L0Sublevels) < avg*l0SubLevelWaterMark { + log.Eventf(ctx, "s%d, allocate check l0 sublevels %d exceeds threshold %d, but below average watermark: %f, action enabled %d", + store.StoreID, store.Capacity.L0Sublevels, + o.l0SublevelThreshold, avg*l0SubLevelWaterMark, o.enforcementLevel) + return true + } + + log.Eventf(ctx, "s%d, allocate check l0 sublevels %d exceeds threshold %d, above average watermark: %f, action enabled %d", + store.StoreID, store.Capacity.L0Sublevels, + o.l0SublevelThreshold, avg*l0SubLevelWaterMark, o.enforcementLevel) + + // The store is only considered unhealthy when the enforcement level is + // storeHealthBlockRebalanceTo or storeHealthBlockAll. + return o.enforcementLevel < storeHealthBlockRebalanceTo +} + // maxCapacityCheck returns true if the store has room for a new replica. func maxCapacityCheck(store roachpb.StoreDescriptor) bool { return store.Capacity.FractionUsed() < maxFractionUsedThreshold diff --git a/pkg/kv/kvserver/allocator_scorer_test.go b/pkg/kv/kvserver/allocator_scorer_test.go index bb48103d62d8..17817259b0c5 100644 --- a/pkg/kv/kvserver/allocator_scorer_test.go +++ b/pkg/kv/kvserver/allocator_scorer_test.go @@ -44,22 +44,24 @@ func (s storeScores) Less(i, j int) bool { } func (s storeScores) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func TestOnlyValidAndNotFull(t *testing.T) { +func TestOnlyValidAndHealthyDisk(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) testCases := []struct { - valid, invalid int + valid, invalid, full, readAmpHigh int }{ - {0, 0}, - {1, 0}, - {0, 1}, - {1, 1}, - {2, 0}, - {2, 1}, - {2, 2}, - {1, 2}, - {0, 2}, + {0, 0, 0, 0}, + {1, 0, 0, 0}, + {0, 1, 0, 0}, + {0, 0, 1, 0}, + {0, 0, 0, 1}, + {1, 1, 1, 1}, + {2, 0, 0, 0}, + {2, 1, 1, 1}, + {2, 2, 2, 2}, + {1, 2, 3, 4}, + {0, 2, 4, 6}, } for _, tc := range testCases { @@ -72,13 +74,19 @@ func TestOnlyValidAndNotFull(t *testing.T) { for i := 0; i < tc.valid; i++ { cl = append(cl, candidate{valid: true}) } + for i := 0; i < tc.full; i++ { + cl = append(cl, candidate{fullDisk: true}) + } + for i := 0; i < tc.readAmpHigh; i++ { + cl = append(cl, candidate{highReadAmp: true}) + } sort.Sort(sort.Reverse(byScore(cl))) - valid := cl.onlyValidAndNotFull() + valid := cl.onlyValidAndHealthyDisk() if a, e := len(valid), tc.valid; a != e { t.Errorf("expected %d valid, actual %d", e, a) } - if a, e := len(cl)-len(valid), tc.invalid; a != e { + if a, e := len(cl)-len(valid), tc.invalid+tc.full+tc.readAmpHigh; a != e { t.Errorf("expected %d invalid, actual %d", e, a) } }) @@ -1046,13 +1054,14 @@ func TestRemoveConstraintsCheck(t *testing.T) { } }) } + } func TestShouldRebalanceDiversity(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - options := &rangeCountScorerOptions{} + options := &rangeCountScorerOptions{storeHealthOptions: storeHealthOptions{enforcementLevel: storeHealthNoAction}} newStore := func(id int, locality roachpb.Locality) roachpb.StoreDescriptor { return roachpb.StoreDescriptor{ StoreID: roachpb.StoreID(id), diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index 66df83cf0718..395c97328c1d 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -342,6 +342,93 @@ var oneStoreWithFullDisk = []*roachpb.StoreDescriptor{ }, } +var oneStoreHighReadAmp = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 600, L0Sublevels: maxL0SublevelThreshold - 5}, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 1800, L0Sublevels: maxL0SublevelThreshold - 5}, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 600, L0Sublevels: maxL0SublevelThreshold + 5}, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 1200, L0Sublevels: maxL0SublevelThreshold - 5}, + }, +} + +var allStoresHighReadAmp = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 1200, L0Sublevels: maxL0SublevelThreshold + 1}, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 800, L0Sublevels: maxL0SublevelThreshold + 1}, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 600, L0Sublevels: maxL0SublevelThreshold + 1}, + }, +} + +var allStoresHighReadAmpSkewed = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 1200, L0Sublevels: maxL0SublevelThreshold + 1}, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 800, L0Sublevels: maxL0SublevelThreshold + 50}, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 600, L0Sublevels: maxL0SublevelThreshold + 55}, + }, +} + +var threeStoresHighReadAmpAscRangeCount = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 100, L0Sublevels: maxL0SublevelThreshold + 10}, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 400, L0Sublevels: maxL0SublevelThreshold + 10}, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 1600, L0Sublevels: maxL0SublevelThreshold + 10}, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 6400, L0Sublevels: maxL0SublevelThreshold - 10}, + }, + { + StoreID: 5, + Node: roachpb.NodeDescriptor{NodeID: 5}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 25000, L0Sublevels: maxL0SublevelThreshold - 10}, + }, +} + var oneStoreWithTooManyRanges = []*roachpb.StoreDescriptor{ { StoreID: 1, @@ -527,6 +614,101 @@ func TestAllocatorNoAvailableDisks(t *testing.T) { } } +func TestAllocatorReadAmpCheck(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + type testCase struct { + name string + stores []*roachpb.StoreDescriptor + conf roachpb.SpanConfig + expectedAddTarget roachpb.StoreID + enforcement storeHealthEnforcement + } + tests := []testCase{ + { + name: "ignore read amp on allocation when storeHealthNoAction enforcement", + // NB: All stores have high read amp, this should be ignored and + // allocate to the store with the lowest range count. + stores: allStoresHighReadAmp, + conf: emptySpanConfig(), + expectedAddTarget: roachpb.StoreID(3), + enforcement: storeHealthNoAction, + }, + { + name: "ignore read amp on allocation when storeHealthLogOnly enforcement", + // NB: All stores have high read amp, this should be ignored and + // allocate to the store with the lowest range count. + stores: allStoresHighReadAmp, + conf: emptySpanConfig(), + expectedAddTarget: roachpb.StoreID(3), + enforcement: storeHealthLogOnly, + }, + { + name: "ignore read amp on allocation when storeHealthBlockRebalanceTo enforcement", + // NB: All stores have high read amp, this should be ignored and + // allocate to the store with the lowest range count. + stores: allStoresHighReadAmp, + conf: emptySpanConfig(), + expectedAddTarget: roachpb.StoreID(3), + enforcement: storeHealthBlockRebalanceTo, + }, + { + name: "don't allocate to stores when all have high read amp and storeHealthBlockAll", + // NB: All stores have high read amp (limit + 1), none are above the watermark, select the lowest range count. + stores: allStoresHighReadAmp, + conf: emptySpanConfig(), + expectedAddTarget: roachpb.StoreID(3), + enforcement: storeHealthBlockAll, + }, + { + name: "allocate to store below the mean when all have high read amp and storeHealthBlockAll", + // NB: All stores have high read amp, however store 1 is below the watermark mean read amp. + stores: allStoresHighReadAmpSkewed, + conf: emptySpanConfig(), + expectedAddTarget: roachpb.StoreID(1), + enforcement: storeHealthBlockAll, + }, + { + name: "allocate to lowest range count store without high read amp when storeHealthBlockAll enforcement", + // NB: Store 1, 2 and 3 have high read amp and are above the watermark, the lowest range count (4) + // should be selected. + stores: threeStoresHighReadAmpAscRangeCount, + conf: emptySpanConfig(), + expectedAddTarget: roachpb.StoreID(4), + enforcement: storeHealthBlockAll, + }, + } + + chk := func(target roachpb.ReplicationTarget, expectedTarget roachpb.StoreID) bool { + return target.StoreID == expectedTarget + } + + for i, test := range tests { + t.Run(fmt.Sprintf("%d_%s", i+1, test.name), func(t *testing.T) { + stopper, g, _, a, _ := createTestAllocator(ctx, 10, false /* deterministic */) + defer stopper.Stop(ctx) + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(test.stores, t) + + // Enable read disk health checking in candidate exclusion. + l0SublevelsThresholdEnforce.Override(ctx, &a.storePool.st.SV, int64(test.enforcement)) + add, _, err := a.AllocateVoter( + ctx, + test.conf, + nil, + nil, + ) + require.NoError(t, err) + require.Truef(t, + chk(add, test.expectedAddTarget), + "the addition target %+v from AllocateVoter doesn't match expectation", + add) + }) + } +} + func TestAllocatorTwoDatacenters(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -728,7 +910,7 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) if e, a := tc.expectTargetRebalance, ok; e != a { t.Errorf( @@ -802,7 +984,7 @@ func TestAllocatorMultipleStoresPerNodeLopsided(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) if ok { // Update the descriptor. @@ -843,7 +1025,7 @@ func TestAllocatorMultipleStoresPerNodeLopsided(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) require.False(t, ok) } @@ -915,7 +1097,7 @@ func TestAllocatorRebalanceBasedOnRangeCount(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) if !ok { i-- // loop until we find 10 candidates @@ -939,7 +1121,7 @@ func TestAllocatorRebalanceBasedOnRangeCount(t *testing.T) { t.Fatalf("%d: unable to get store %d descriptor", i, store.StoreID) } eqClass.existing = desc - result := a.scorerOptions().shouldRebalanceBasedOnThresholds( + result := a.scorerOptions(ctx).shouldRebalanceBasedOnThresholds( ctx, eqClass, a.metrics, @@ -1077,7 +1259,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) if ok { t.Fatalf("expected no rebalance, but got target s%d; details: %s", result.StoreID, details) @@ -1102,7 +1284,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) if ok { t.Fatalf("expected no rebalance, but got target s%d; details: %s", target.StoreID, details) @@ -1121,7 +1303,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) expTo := stores[1].StoreID expFrom := stores[0].StoreID @@ -1200,7 +1382,7 @@ func TestAllocatorRebalanceDeadNodes(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) if c.expected > 0 { if !ok { @@ -1347,7 +1529,7 @@ func TestAllocatorRebalanceThrashing(t *testing.T) { t.Fatalf("[store %d]: unable to get store %d descriptor", j, store.StoreID) } eqClass.existing = desc - if a, e := a.scorerOptions().shouldRebalanceBasedOnThresholds( + if a, e := a.scorerOptions(ctx).shouldRebalanceBasedOnThresholds( context.Background(), eqClass, a.metrics, @@ -1471,6 +1653,7 @@ func TestAllocatorRebalanceByQPS(t *testing.T) { gossiputil.NewStoreGossiper(g).GossipStores(subtest.testStores, t) var rangeUsageInfo RangeUsageInfo options := &qpsScorerOptions{ + storeHealthOptions: storeHealthOptions{enforcementLevel: storeHealthNoAction}, qpsPerReplica: 100, qpsRebalanceThreshold: 0.2, } @@ -1584,6 +1767,7 @@ func TestAllocatorRemoveBasedOnQPS(t *testing.T) { defer stopper.Stop(ctx) gossiputil.NewStoreGossiper(g).GossipStores(subtest.testStores, t) options := &qpsScorerOptions{ + storeHealthOptions: storeHealthOptions{enforcementLevel: storeHealthNoAction}, qpsRebalanceThreshold: 0.1, } remove, _, err := a.RemoveVoter( @@ -1647,7 +1831,7 @@ func TestAllocatorRebalanceByCount(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) if ok && result.StoreID != 4 { t.Errorf("expected store 4; got %d", result.StoreID) @@ -1665,7 +1849,7 @@ func TestAllocatorRebalanceByCount(t *testing.T) { existing: desc, candidateSL: sl, } - result := a.scorerOptions().shouldRebalanceBasedOnThresholds( + result := a.scorerOptions(ctx).shouldRebalanceBasedOnThresholds( ctx, eqClass, a.metrics, @@ -2227,7 +2411,7 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) var resultID roachpb.StoreID if ok { @@ -2299,7 +2483,7 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) var gotExpected bool if !ok { @@ -2844,7 +3028,7 @@ func TestAllocatorRemoveBasedOnDiversity(t *testing.T) { c.existingVoters, /* voterCandidates */ c.existingVoters, c.existingNonVoters, - a.scorerOptions(), + a.scorerOptions(ctx), ) require.NoError(t, err) @@ -2863,7 +3047,7 @@ func TestAllocatorRemoveBasedOnDiversity(t *testing.T) { c.existingVoters, c.existingVoters, nil, - a.scorerOptions(), + a.scorerOptions(ctx), ) require.NoError(t, err) require.Truef(t, checkReplExists(targetVoter, c.expVoterRemovals), @@ -2876,7 +3060,7 @@ func TestAllocatorRemoveBasedOnDiversity(t *testing.T) { c.existingNonVoters, /* nonVoterCandidates */ c.existingVoters, c.existingNonVoters, - a.scorerOptions(), + a.scorerOptions(ctx), ) require.NoError(t, err) require.True(t, checkReplExists(targetNonVoter, c.expNonVoterRemovals)) @@ -3152,7 +3336,7 @@ func TestAllocatorRebalanceTargetLocality(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) if !ok { t.Fatalf("%d: RebalanceVoter(%v) returned no target store; details: %s", i, c.existing, details) @@ -3384,7 +3568,7 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { a.storePool.getLocalitiesByStore(existingRepls), a.storePool.isStoreReadyForRoutineReplicaTransfer, false, /* allowMultipleReplsPerNode */ - a.scorerOptions(), + a.scorerOptions(ctx), ) if !expectedStoreIDsMatch(tc.expected, candidates) { @@ -3403,7 +3587,7 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { nil, a.storePool.getLocalitiesByStore(existingRepls), a.storePool.isStoreReadyForRoutineReplicaTransfer, - a.scorerOptions(), + a.scorerOptions(ctx), a.metrics, ) if len(tc.expected) > 0 { @@ -3726,7 +3910,7 @@ func TestAllocateCandidatesNumReplicasConstraints(t *testing.T) { a.storePool.getLocalitiesByStore(existingRepls), func(context.Context, roachpb.StoreID) bool { return true }, false, /* allowMultipleReplsPerNode */ - a.scorerOptions(), + a.scorerOptions(ctx), ) best := candidates.best() match := true @@ -3948,10 +4132,11 @@ func TestRemoveCandidatesNumReplicasConstraints(t *testing.T) { // Check behavior in a span config where `voter_constraints` are empty. checkFn := voterConstraintsCheckerForRemoval(analyzed, constraint.EmptyAnalyzedConstraints) - candidates := candidateListForRemoval(sl, + candidates := candidateListForRemoval(ctx, + sl, checkFn, a.storePool.getLocalitiesByStore(existingRepls), - a.scorerOptions()) + a.scorerOptions(ctx)) if !expectedStoreIDsMatch(tc.expected, candidates.worst()) { t.Errorf("%d (with `constraints`): expected candidateListForRemoval(%v)"+ " = %v, but got %v\n for candidates %v", testIdx, tc.existing, tc.expected, @@ -3961,10 +4146,11 @@ func TestRemoveCandidatesNumReplicasConstraints(t *testing.T) { // Check that we'd see the same result if the same constraints were // specified as `voter_constraints`. checkFn = voterConstraintsCheckerForRemoval(constraint.EmptyAnalyzedConstraints, analyzed) - candidates = candidateListForRemoval(sl, + candidates = candidateListForRemoval(ctx, + sl, checkFn, a.storePool.getLocalitiesByStore(existingRepls), - a.scorerOptions()) + a.scorerOptions(ctx)) if !expectedStoreIDsMatch(tc.expected, candidates.worst()) { t.Errorf("%d (with `voter_constraints`): expected candidateListForRemoval(%v)"+ " = %v, but got %v\n for candidates %v", testIdx, tc.existing, tc.expected, @@ -4108,6 +4294,7 @@ func TestAllocatorRebalanceNonVoters(t *testing.T) { defer stopper.Stop(ctx) sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(test.stores, t) + // Enable read disk health checking in candidate exclusion. add, remove, _, ok := a.RebalanceNonVoter( ctx, test.conf, @@ -4116,7 +4303,7 @@ func TestAllocatorRebalanceNonVoters(t *testing.T) { test.existingNonVoters, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) if test.expectNoAction { require.True(t, !ok) @@ -4135,6 +4322,134 @@ func TestAllocatorRebalanceNonVoters(t *testing.T) { } } +// TestAllocatorRebalanceReadAmpCheck ensures that rebalancing voters: +// (1) Respects storeHealthEnforcement setting, by ignoring L0 Sublevels in +// rebalancing decisions when disabled or set to log only. +// (2) Considers L0 sublevels when set to rebalanceOnly or allocate in +// conjunction with the mean. +// (3) Does not attempt to rebalance off of the store when read amplification +// is high, as this setting is only used for filtering candidates. +func TestAllocatorRebalanceReadAmpCheck(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + type testCase struct { + name string + stores []*roachpb.StoreDescriptor + conf roachpb.SpanConfig + existingVoters []roachpb.ReplicaDescriptor + expectNoAction bool + expectedRemoveTargets, expectedAddTargets []roachpb.StoreID + enforcement storeHealthEnforcement + } + tests := []testCase{ + { + name: "don't move off of nodes with high read amp when storeHealthBlockRebalanceTo", + // NB: Store 1,2, 4 have okay read amp. Store 3 has high read amp. + // We expect high read amplifaction to only be considered for + // exlcuding targets, not for triggering rebalancing. + stores: threeStoresHighReadAmpAscRangeCount, + conf: emptySpanConfig(), + existingVoters: replicas(3, 1), + expectNoAction: true, + enforcement: storeHealthBlockRebalanceTo, + }, + { + name: "don't move off of nodes with high read amp when storeHealthBlockAll", + // NB: Store 1,2, 4 have okay read amp. Store 3 has high read amp. + // We expect high read amplifaction to only be considered for + // exlcuding targets, not for triggering rebalancing. + stores: threeStoresHighReadAmpAscRangeCount, + conf: emptySpanConfig(), + existingVoters: replicas(3, 1), + expectNoAction: true, + enforcement: storeHealthBlockAll, + }, + { + name: "don't take action when enforcement is not storeHealthNoAction", + // NB: Store 3 has L0Sublevels > threshold. Store 2 has 3 x higher + // ranges as other stores. Should move to candidate to 4, however + // enforcement for rebalancing is not enabled so will pick + // candidate 3 which has a lower range count. + stores: oneStoreHighReadAmp, + conf: emptySpanConfig(), + existingVoters: replicas(1, 2), + expectedRemoveTargets: []roachpb.StoreID{2}, + expectedAddTargets: []roachpb.StoreID{3}, + enforcement: storeHealthNoAction, + }, + { + name: "don't rebalance to nodes with high read amp when storeHealthBlockRebalanceTo enforcement", + // NB: Store 3 has L0Sublevels > threshold. Store 2 has 3 x higher + // ranges as other stores. Should move to candidate to 4, which + // doesn't have high read amp. + stores: oneStoreHighReadAmp, + conf: emptySpanConfig(), + existingVoters: replicas(1, 2), + expectedRemoveTargets: []roachpb.StoreID{2}, + expectedAddTargets: []roachpb.StoreID{4}, + enforcement: storeHealthBlockRebalanceTo, + }, + { + name: "don't rebalance to nodes with high read amp when storeHealthBlockAll enforcement", + // NB: Store 3 has L0Sublevels > threshold. Store 2 has 3 x higher + // ranges as other stores. Should move to candidate to 4, which + // doesn't have high read amp. + stores: oneStoreHighReadAmp, + conf: emptySpanConfig(), + existingVoters: replicas(1, 2), + expectedRemoveTargets: []roachpb.StoreID{2}, + expectedAddTargets: []roachpb.StoreID{4}, + enforcement: storeHealthBlockAll, + }, + } + + var rangeUsageInfo RangeUsageInfo + chk := func(target roachpb.ReplicationTarget, expectedCandidates []roachpb.StoreID) bool { + for _, candidate := range expectedCandidates { + if target.StoreID == candidate { + return true + } + } + return false + } + + for i, test := range tests { + t.Run(fmt.Sprintf("%d_%s", i+1, test.name), func(t *testing.T) { + stopper, g, _, a, _ := createTestAllocator(ctx, 10, true /* deterministic */) + defer stopper.Stop(ctx) + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(test.stores, t) + // Enable read disk health checking in candidate exclusion. + options := a.scorerOptions(ctx) + options.storeHealthOptions = storeHealthOptions{enforcementLevel: test.enforcement, l0SublevelThreshold: 20} + add, remove, _, ok := a.RebalanceVoter( + ctx, + test.conf, + nil, + test.existingVoters, + []roachpb.ReplicaDescriptor{}, + rangeUsageInfo, + storeFilterThrottled, + options, + ) + if test.expectNoAction { + require.True(t, !ok) + } else { + require.Truef(t, ok, "no action taken on range") + require.Truef(t, + chk(add, test.expectedAddTargets), + "the addition target %+v from RebalanceVoter doesn't match expectation", + add) + require.Truef(t, + chk(remove, test.expectedRemoveTargets), + "the removal target %+v from RebalanceVoter doesn't match expectation", + remove) + } + }) + } +} + // TestVotersCanRebalanceToNonVoterStores ensures that rebalancing of voting // replicas considers stores that have non-voters as feasible candidates. func TestVotersCanRebalanceToNonVoterStores(t *testing.T) { @@ -4173,7 +4488,7 @@ func TestVotersCanRebalanceToNonVoterStores(t *testing.T) { existingNonVoters, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) require.Truef(t, ok, "no action taken") @@ -4232,7 +4547,7 @@ func TestNonVotersCannotRebalanceToVoterStores(t *testing.T) { existingNonVoters, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) require.Falsef( @@ -5035,7 +5350,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { nil, a.storePool.getLocalitiesByStore(existingRepls), func(context.Context, roachpb.StoreID) bool { return true }, - a.scorerOptions(), + a.scorerOptions(ctx), a.metrics, ) match := true @@ -5067,7 +5382,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) var found bool if !ok && len(tc.validTargets) == 0 { @@ -5456,7 +5771,7 @@ func TestAllocatorRemoveTargetBasedOnCapacity(t *testing.T) { replicas, replicas, nil, - a.scorerOptions(), + a.scorerOptions(ctx), ) if err != nil { t.Fatal(err) @@ -7296,7 +7611,7 @@ func TestAllocatorRebalanceWithScatter(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) require.False(t, ok) @@ -7309,7 +7624,7 @@ func TestAllocatorRebalanceWithScatter(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptionsForScatter(), + a.scorerOptionsForScatter(ctx), ) require.True(t, ok) } @@ -7423,7 +7738,7 @@ func TestAllocatorRebalanceAway(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) if tc.expected == nil && ok { @@ -7601,7 +7916,7 @@ func TestAllocatorFullDisks(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - alloc.scorerOptions(), + alloc.scorerOptions(ctx), ) if ok { if log.V(1) { @@ -7647,7 +7962,7 @@ func Example_rangeCountRebalancing() { nil, rangeUsageInfo, storeFilterThrottled, - alloc.scorerOptions(), + alloc.scorerOptions(ctx), ) if ok { log.Infof(ctx, "rebalancing to %v; details: %s", target, details) @@ -7741,6 +8056,7 @@ func qpsBasedRebalanceFn( avgQPS := candidate.Capacity.QueriesPerSecond / float64(candidate.Capacity.RangeCount) jitteredQPS := avgQPS * (1 + alloc.randGen.Float64()) opts := &qpsScorerOptions{ + storeHealthOptions: storeHealthOptions{enforcementLevel: storeHealthNoAction}, qpsPerReplica: jitteredQPS, qpsRebalanceThreshold: 0.2, } diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 6ca38491c386..23b5820e6311 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -293,6 +293,12 @@ var ( Measurement: "Keys/Sec", Unit: metric.Unit_COUNT, } + metaL0SubLevelHistogram = metric.Metadata{ + Name: "rebalancing.l0_sublevels_histogram", + Help: "The summary view of sub levels in level 0 of the stores LSM", + Measurement: "Storage", + Unit: metric.Unit_COUNT, + } // Metric for tracking follower reads. metaFollowerReadsCount = metric.Metadata{ @@ -1363,6 +1369,7 @@ type StoreMetrics struct { // Rebalancing metrics. AverageQueriesPerSecond *metric.GaugeFloat64 AverageWritesPerSecond *metric.GaugeFloat64 + L0SubLevelsHistogram *metric.Histogram // Follower read metrics. FollowerReadsCount *metric.Counter @@ -1809,6 +1816,12 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { // Rebalancing metrics. AverageQueriesPerSecond: metric.NewGaugeFloat64(metaAverageQueriesPerSecond), AverageWritesPerSecond: metric.NewGaugeFloat64(metaAverageWritesPerSecond), + L0SubLevelsHistogram: metric.NewHistogram( + metaL0SubLevelHistogram, + l0SublevelInterval, + l0SublevelMaxSampled, + 1, /* sig figures (integer) */ + ), // Follower reads metrics. FollowerReadsCount: metric.NewCounter(metaFollowerReadsCount), @@ -2064,6 +2077,7 @@ func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) { sm.RdbPendingCompaction.Update(int64(m.Compact.EstimatedDebt)) sm.RdbMarkedForCompactionFiles.Update(int64(m.Compact.MarkedFiles)) sm.RdbL0Sublevels.Update(int64(m.Levels[0].Sublevels)) + sm.L0SubLevelsHistogram.RecordValue(int64(m.Levels[0].Sublevels)) sm.RdbL0NumFiles.Update(m.Levels[0].NumFiles) sm.RdbNumSSTables.Update(m.NumSSTables()) sm.RdbWriteStalls.Update(m.WriteStallCount) diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index b1a14c47c724..3d925dc99ae7 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2990,7 +2990,7 @@ func (r *Replica) relocateOne( conf, existingVoters, existingNonVoters, - r.store.allocator.scorerOptions(), + r.store.allocator.scorerOptions(ctx), // NB: Allow the allocator to return target stores that might be on the // same node as an existing replica. This is to ensure that relocations // that require "lateral" movement of replicas within a node can succeed. @@ -3060,7 +3060,7 @@ func (r *Replica) relocateOne( existingVoters, existingNonVoters, args.targetType, - r.store.allocator.scorerOptions(), + r.store.allocator.scorerOptions(ctx), ) if err != nil { return nil, nil, errors.Wrapf( diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 77abb638c573..eb4d69aca1d2 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -431,7 +431,7 @@ func (rq *replicateQueue) shouldQueue( nonVoterReplicas, rangeUsageInfo, storeFilterThrottled, - rq.allocator.scorerOptions(), + rq.allocator.scorerOptions(ctx), ) if ok { log.VEventf(ctx, 2, "rebalance target found for voter, enqueuing") @@ -445,7 +445,7 @@ func (rq *replicateQueue) shouldQueue( nonVoterReplicas, rangeUsageInfo, storeFilterThrottled, - rq.allocator.scorerOptions(), + rq.allocator.scorerOptions(ctx), ) if ok { log.VEventf(ctx, 2, "rebalance target found for non-voter, enqueuing") @@ -1004,7 +1004,7 @@ func (rq *replicateQueue) findRemoveVoter( candidates, existingVoters, existingNonVoters, - rq.allocator.scorerOptions(), + rq.allocator.scorerOptions(ctx), ) } @@ -1119,7 +1119,7 @@ func (rq *replicateQueue) removeNonVoter( existingNonVoters, existingVoters, existingNonVoters, - rq.allocator.scorerOptions(), + rq.allocator.scorerOptions(ctx), ) if err != nil { return false, err @@ -1301,9 +1301,9 @@ func (rq *replicateQueue) considerRebalance( desc, conf := repl.DescAndSpanConfig() rebalanceTargetType := voterTarget - scorerOpts := scorerOptions(rq.allocator.scorerOptions()) + scorerOpts := scorerOptions(rq.allocator.scorerOptions(ctx)) if scatter { - scorerOpts = rq.allocator.scorerOptionsForScatter() + scorerOpts = rq.allocator.scorerOptionsForScatter(ctx) } if !rq.store.TestingKnobs().DisableReplicaRebalancing { rangeUsageInfo := rangeUsageInfoForRepl(repl) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index e9b1e588dc18..5cab1f373ae7 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2966,7 +2966,11 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa capacity.LogicalBytes = logicalBytes capacity.QueriesPerSecond = totalQueriesPerSecond capacity.WritesPerSecond = totalWritesPerSecond - capacity.L0Sublevels = s.metrics.RdbL0Sublevels.Value() + // We gossip the maximum number of L0 sub-levels that have been seen in + // past 2 windows. The recording length may vary between 5 and 10 minutes + // accordingly. + windowedL0Sublevels, _ := s.metrics.L0SubLevelsHistogram.Windowed() + capacity.L0Sublevels = windowedL0Sublevels.Max() capacity.BytesPerReplica = roachpb.PercentilesFromData(bytesPerReplica) capacity.WritesPerReplica = roachpb.PercentilesFromData(writesPerReplica) s.recordNewPerSecondStats(totalQueriesPerSecond, totalWritesPerSecond) diff --git a/pkg/kv/kvserver/store_pool.go b/pkg/kv/kvserver/store_pool.go index 751f59264aae..4ee6e95fd8c2 100644 --- a/pkg/kv/kvserver/store_pool.go +++ b/pkg/kv/kvserver/store_pool.go @@ -765,6 +765,10 @@ type StoreList struct { // candidateWritesPerSecond tracks writes-per-second stats for stores that are // eligible to be rebalance targets. candidateWritesPerSecond stat + + // candidateWritesPerSecond tracks L0 sub-level stats for stores that are + // eligible to be rebalance targets. + candidateL0Sublevels stat } // Generates a new store list based on the passed in descriptors. It will @@ -779,6 +783,7 @@ func makeStoreList(descriptors []roachpb.StoreDescriptor) StoreList { sl.candidateLogicalBytes.update(float64(desc.Capacity.LogicalBytes)) sl.candidateQueriesPerSecond.update(desc.Capacity.QueriesPerSecond) sl.candidateWritesPerSecond.update(desc.Capacity.WritesPerSecond) + sl.candidateL0Sublevels.update(float64(desc.Capacity.L0Sublevels)) } return sl } diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index fdf0f048ea1f..2079287cb4a2 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -249,8 +249,9 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { // `scorerOptions` here, which sets the range count rebalance threshold. // Instead, we use our own implementation of `scorerOptions` that promotes QPS // balance. -func (sr *StoreRebalancer) scorerOptions() *qpsScorerOptions { +func (sr *StoreRebalancer) scorerOptions(ctx context.Context) *qpsScorerOptions { return &qpsScorerOptions{ + storeHealthOptions: sr.rq.allocator.storeHealthOptions(ctx), deterministic: sr.rq.allocator.storePool.deterministic, qpsRebalanceThreshold: qpsRebalanceThreshold.Get(&sr.st.SV), minRequiredQPSDiff: minQPSDifferenceForTransfers.Get(&sr.st.SV), @@ -270,7 +271,7 @@ func (sr *StoreRebalancer) scorerOptions() *qpsScorerOptions { func (sr *StoreRebalancer) rebalanceStore( ctx context.Context, mode LBRebalancingMode, allStoresList StoreList, ) { - options := sr.scorerOptions() + options := sr.scorerOptions(ctx) var localDesc *roachpb.StoreDescriptor for i := range allStoresList.stores { if allStoresList.stores[i].StoreID == sr.rq.store.StoreID() { @@ -360,7 +361,7 @@ func (sr *StoreRebalancer) rebalanceStore( &replicasToMaybeRebalance, localDesc, allStoresList, - sr.scorerOptions(), + sr.scorerOptions(ctx), ) if replWithStats.repl == nil { log.Infof(ctx, diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go index 1f6e4e5f37da..ba3eed4b5a5c 100644 --- a/pkg/kv/kvserver/store_rebalancer_test.go +++ b/pkg/kv/kvserver/store_rebalancer_test.go @@ -53,6 +53,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 3000, + L0Sublevels: maxL0SublevelThreshold - 10, }, }, { @@ -70,6 +71,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 2800, + L0Sublevels: maxL0SublevelThreshold - 5, }, }, { @@ -87,6 +89,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 2600, + L0Sublevels: maxL0SublevelThreshold + 2, }, }, { @@ -104,6 +107,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 2400, + L0Sublevels: maxL0SublevelThreshold - 10, }, }, { @@ -121,6 +125,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 2200, + L0Sublevels: maxL0SublevelThreshold - 3, }, }, { @@ -138,6 +143,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 2000, + L0Sublevels: maxL0SublevelThreshold + 2, }, }, { @@ -155,6 +161,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1800, + L0Sublevels: maxL0SublevelThreshold - 10, }, }, { @@ -172,6 +179,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1600, + L0Sublevels: maxL0SublevelThreshold - 5, }, }, { @@ -189,6 +197,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1400, + L0Sublevels: maxL0SublevelThreshold + 3, }, }, } @@ -233,6 +242,188 @@ var ( }, }, } + + // noLocalityAscendingReadAmpStores specifies a set of stores identical to + // noLocalityStores, however they have ascending read + // amplification. Where store 1, store 2 and store 3 are below the + // threshold, whilst store 4 and store 5 are above. + noLocalityAscendingReadAmpStores = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1500, + L0Sublevels: maxL0SublevelThreshold - 15, + }, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1300, + L0Sublevels: maxL0SublevelThreshold - 10, + }, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1000, + L0Sublevels: maxL0SublevelThreshold - 5, + }, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 900, + L0Sublevels: maxL0SublevelThreshold + 20, + }, + }, + { + StoreID: 5, + Node: roachpb.NodeDescriptor{NodeID: 5}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 500, + L0Sublevels: maxL0SublevelThreshold + 25, + }, + }, + } + + // noLocalityUniformQPSHighReadAmp specifies a set of stores that are + // identical, except store 1 and 2 have high read amp. + noLocalityUniformQPSHighReadAmp = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1000, + L0Sublevels: maxL0SublevelThreshold + 100, + }, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1000, + L0Sublevels: maxL0SublevelThreshold - 15, + }, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1000, + L0Sublevels: maxL0SublevelThreshold + 100, + }, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1000, + L0Sublevels: maxL0SublevelThreshold - 15, + }, + }, + { + StoreID: 5, + Node: roachpb.NodeDescriptor{NodeID: 5}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1000, + L0Sublevels: maxL0SublevelThreshold + 100, + }, + }, + } + // noLocalityHighReadAmpStores specifies a set of stores identical to + // noLocalityStores, however they all have read amplification that exceeds + // the threshold. + noLocalityHighReadAmpStores = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1500, + L0Sublevels: maxL0SublevelThreshold + 1, + }, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1300, + L0Sublevels: maxL0SublevelThreshold + 1, + }, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1000, + L0Sublevels: maxL0SublevelThreshold + 1, + }, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 900, + L0Sublevels: maxL0SublevelThreshold + 1, + }, + }, + { + StoreID: 5, + Node: roachpb.NodeDescriptor{NodeID: 5}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 500, + L0Sublevels: maxL0SublevelThreshold + 1, + }, + }, + } + // noLocalityHighReadAmpSkewedStores specifies a set of stores identical to + // noLocalityStores, however they all have read amplification that exceeds + // the threshold in ascending order. + noLocalityHighReadAmpSkewedStores = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1500, + L0Sublevels: maxL0SublevelThreshold + 1, + }, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1300, + L0Sublevels: maxL0SublevelThreshold + 10, + }, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1000, + L0Sublevels: maxL0SublevelThreshold + 50, + }, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 900, + L0Sublevels: maxL0SublevelThreshold + 100, + }, + }, + { + StoreID: 5, + Node: roachpb.NodeDescriptor{NodeID: 5}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 500, + L0Sublevels: maxL0SublevelThreshold + 100, + }, + }, + } ) type testRange struct { @@ -632,6 +823,7 @@ func TestChooseRangeToRebalanceRandom(t *testing.T) { &localDesc, storeList, &qpsScorerOptions{ + storeHealthOptions: storeHealthOptions{enforcementLevel: storeHealthNoAction}, deterministic: false, qpsRebalanceThreshold: qpsRebalanceThreshold, }, @@ -725,21 +917,24 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { expRebalancedVoters, expRebalancedNonVoters []roachpb.StoreID }{ // All the replicas are already on the best possible stores. No - // rebalancing should be attempted. + // rebalancing should be attempted, note here that the high read + // amp of the current stores is ignored as it is not considered in + // moving a replica away from a store. { name: "no rebalance", voters: []roachpb.StoreID{3, 6, 9}, constraints: oneReplicaPerRegion, expRebalancedVoters: []roachpb.StoreID{}, }, - // A replica is in a heavily loaded region, on a relatively heavily loaded - // store. We expect it to be moved to a less busy store within the same - // region. + // A replica is in a heavily loaded region, on a relatively heavily + // loaded store. We expect it to be moved to a less busy store + // within the same region. However, it cannot be the least busy + // store as it has high read amp (3). { name: "rebalance one replica within heavy region", voters: []roachpb.StoreID{1, 6, 9}, constraints: oneReplicaPerRegion, - expRebalancedVoters: []roachpb.StoreID{9, 6, 3}, + expRebalancedVoters: []roachpb.StoreID{9, 6, 2}, }, // Two replicas are in the hot region, both on relatively heavily loaded // nodes. We expect one of those replicas to be moved to a less busy store @@ -761,22 +956,26 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { // maximized, replicas on hot stores are rebalanced to cooler stores within // the same region. { - // Within the hottest region, expect rebalance from the hottest node (n1) - // to the coolest node (n3). Within the lease hot region, we don't expect - // a rebalance from n8 to n9 because the qps difference between the two + // Within the hottest region, expect rebalance from the hottest + // node (n1) to the coolest node (n3), however since n3 has + // high read amp it should instead rebalance to n2. Within the + // lease hot region, we don't expect a rebalance from n8 to n9 + // because the qps difference between the two // stores is too small. name: "QPS balance without constraints", voters: []roachpb.StoreID{1, 5, 8}, - expRebalancedVoters: []roachpb.StoreID{8, 5, 3}, + expRebalancedVoters: []roachpb.StoreID{8, 5, 2}, }, { - // Within the second hottest region, expect rebalance from the hottest - // node (n4) to the coolest node (n6). Within the lease hot region, we - // don't expect a rebalance from n8 to n9 because the qps difference - // between the two stores is too small. + // Within the second hottest region, expect rebalance from the + // hottest node (n4) to the coolest node (n6), however since n6 + // has high read amp instead expect n5 to be selected. Within + // the lease hot region, we don't expect a rebalance from n8 to + // n9 because the qps difference between the two stores is too + // small. name: "QPS balance without constraints", voters: []roachpb.StoreID{8, 4, 3}, - expRebalancedVoters: []roachpb.StoreID{8, 6, 3}, + expRebalancedVoters: []roachpb.StoreID{8, 5, 3}, }, // Multi-region database configurations. @@ -790,9 +989,10 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { constraints: oneReplicaPerRegion, expRebalancedVoters: []roachpb.StoreID{3, 2, 1}, - // NB: Expect the non-voter on node 4 (hottest node in region B) to move - // to node 6 (least hot region in region B). - expRebalancedNonVoters: []roachpb.StoreID{6, 9}, + // NB: Expect the non-voter on node 4 (hottest node in region B) to + // move to node 5 (least hot region in region B), the least hot + // node without high read amp. + expRebalancedNonVoters: []roachpb.StoreID{5, 9}, }, { name: "primary region with second highest QPS, region survival, one voter on sub-optimal node", @@ -812,19 +1012,22 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { // constraints require at least one replica per each region. voterConstraints: twoReplicasInHotRegion, constraints: oneReplicaPerRegion, - // NB: We've got 3 voters in the hottest region, but we only need 2. We - // expect that one of the voters from the hottest region will be moved to - // the least hot region. Additionally, in region B, we've got one replica - // on store 4 (which is the hottest store in that region). We expect that - // replica to be moved to store 6. - expRebalancedVoters: []roachpb.StoreID{9, 2, 6, 8, 3}, + // NB: We've got 3 voters in the hottest region, but we only need + // 2. We expect that one of the voters from the hottest region + // will be moved to the least hot region. Additionally, in + // region B, we've got one replica on store 4 (which is the + // hottest store in that region). We expect that replica to be + // moved to store 5, which is the least hot node without high + // read amp. + expRebalancedVoters: []roachpb.StoreID{9, 2, 5, 8, 3}, }, { name: "one voter on sub-optimal node in the coldest region", voters: []roachpb.StoreID{5, 6, 7}, constraints: append(twoReplicasInSecondHottestRegion, oneReplicaInColdestRegion...), - // NB: Expect replica from node 7 to move to node 9. - expRebalancedVoters: []roachpb.StoreID{9, 5, 6}, + // NB: Expect replica from node 7 to move to node 8, despite node 9 + // having lower qps because node 9 exceeds the l0 sub-level threshold, + expRebalancedVoters: []roachpb.StoreID{8, 5, 6}, }, } for _, tc := range testCases { @@ -883,7 +1086,11 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { &hottestRanges, &localDesc, storeList, - &qpsScorerOptions{deterministic: true, qpsRebalanceThreshold: 0.05}, + &qpsScorerOptions{ + storeHealthOptions: storeHealthOptions{enforcementLevel: storeHealthBlockRebalanceTo}, + deterministic: true, + qpsRebalanceThreshold: 0.05, + }, ) require.Len(t, voterTargets, len(tc.expRebalancedVoters)) @@ -955,7 +1162,10 @@ func TestChooseRangeToRebalanceIgnoresRangeOnBestStores(t *testing.T) { loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{localDesc.StoreID}, qps: 100}}) hottestRanges := rr.topQPS() sr.chooseRangeToRebalance( - ctx, &hottestRanges, &localDesc, storeList, &qpsScorerOptions{qpsRebalanceThreshold: 0.05}, + ctx, &hottestRanges, &localDesc, storeList, &qpsScorerOptions{ + storeHealthOptions: storeHealthOptions{enforcementLevel: storeHealthNoAction}, + qpsRebalanceThreshold: 0.05, + }, ) trace := finishAndGetRecording() require.Regexpf( @@ -1117,7 +1327,11 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { &hottestRanges, &localDesc, storeList, - &qpsScorerOptions{deterministic: true, qpsRebalanceThreshold: tc.rebalanceThreshold}, + &qpsScorerOptions{ + storeHealthOptions: storeHealthOptions{enforcementLevel: storeHealthNoAction}, + deterministic: true, + qpsRebalanceThreshold: tc.rebalanceThreshold, + }, ) require.Len(t, voterTargets, len(tc.expRebalancedVoters)) @@ -1210,7 +1424,11 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { &hottestRanges, &localDesc, storeList, - &qpsScorerOptions{deterministic: true, qpsRebalanceThreshold: 0.05}, + &qpsScorerOptions{ + storeHealthOptions: storeHealthOptions{enforcementLevel: storeHealthNoAction}, + deterministic: true, + qpsRebalanceThreshold: 0.05, + }, ) expectTargets := []roachpb.ReplicationTarget{ {NodeID: 4, StoreID: 4}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, @@ -1220,3 +1438,167 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { targets, sr.getRaftStatusFn(repl), expectTargets) } } + +// TestStoreRebalancerReadAmpCheck checks that: +// - Under (1) disabled and (2) log that rebalancing decisions are unaffected +// by high read amplification. +// - Under (3) rebalanceOnly and (4) allocate that rebalance decisions exclude +// stores with high readamplification as candidate targets. +func TestStoreRebalancerReadAmpCheck(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + type testCase struct { + name string + stores []*roachpb.StoreDescriptor + conf roachpb.SpanConfig + expectedTargets []roachpb.ReplicationTarget + enforcement storeHealthEnforcement + } + tests := []testCase{ + { + name: "ignore read amp on allocation when storeHealthNoAction enforcement", + // NB: All stores have high read amp, this should be ignored. + stores: noLocalityHighReadAmpStores, + conf: emptySpanConfig(), + expectedTargets: []roachpb.ReplicationTarget{ + {NodeID: 4, StoreID: 4}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, + }, + enforcement: storeHealthNoAction, + }, + { + name: "ignore read amp on allocation when storeHealthLogOnly enforcement", + // NB: All stores have high read amp, this should be ignored. + stores: noLocalityHighReadAmpStores, + conf: emptySpanConfig(), + expectedTargets: []roachpb.ReplicationTarget{ + {NodeID: 4, StoreID: 4}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, + }, + enforcement: storeHealthLogOnly, + }, + { + name: "don't stop rebalancing when read amp uniformly above threshold and storeHealthBlockRebalanceTo enforcement", + // NB: All stores have high uniformly high read (threshold+1) this should be ignored. + stores: noLocalityHighReadAmpStores, + conf: emptySpanConfig(), + expectedTargets: []roachpb.ReplicationTarget{ + {NodeID: 4, StoreID: 4}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, + }, + enforcement: storeHealthBlockRebalanceTo, + }, + { + name: "don't stop rebalancing when read amp uniformly above threshold and storeHealthBlockRebalanceTo enforcement", + // NB: All stores have high uniformly high read (threshold+1) this should be ignored. + stores: noLocalityHighReadAmpStores, + conf: emptySpanConfig(), + expectedTargets: []roachpb.ReplicationTarget{ + {NodeID: 4, StoreID: 4}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, + }, + enforcement: storeHealthBlockAll, + }, + { + name: "rebalance should ignore stores with high read amp that are also above the mean when storeHealthBlockAll enforcement", + // NB: All stores have high read amp, however store 2 is below the mean read amp so is a viable candidate. + stores: noLocalityHighReadAmpSkewedStores, + conf: emptySpanConfig(), + expectedTargets: []roachpb.ReplicationTarget{ + {NodeID: 2, StoreID: 2}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, + }, + enforcement: storeHealthBlockAll, + }, + { + name: "rebalance should ignore stores with high read amp that are also above the mean when storeHealthBlockRebalanceTo enforcement", + // NB: All stores have high read amp, however store 2 is below the mean read amp so is a viable candidate. + stores: noLocalityHighReadAmpSkewedStores, + conf: emptySpanConfig(), + expectedTargets: []roachpb.ReplicationTarget{ + {NodeID: 2, StoreID: 2}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, + }, + enforcement: storeHealthBlockRebalanceTo, + }, + { + name: "rebalance should ignore stores with high read amp when storeHealthBlockRebalanceTo enforcement", + // NB: Store 4, 5 have high read amp, they should not be rebalance + // targets. Only 1,2,3 are valid targets, yet only 2 is not already + // a voter. 1 should transfer it's lease to 2. 5 could also have + // transferred its lease to 2, However, high read amp does not + // affect removing replicas from stores, only in blocking new + // replicas. + stores: noLocalityAscendingReadAmpStores, + conf: emptySpanConfig(), + expectedTargets: []roachpb.ReplicationTarget{ + {NodeID: 2, StoreID: 2}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, + }, + enforcement: storeHealthBlockRebalanceTo, + }, + { + name: "rebalance should ignore stores with high read amp when storeHealthBlockAll enforcement", + // NB: This scenario and result should be identical to storeHealthBlockRebalanceTo. + stores: noLocalityAscendingReadAmpStores, + conf: emptySpanConfig(), + expectedTargets: []roachpb.ReplicationTarget{ + {NodeID: 2, StoreID: 2}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, + }, + enforcement: storeHealthBlockAll, + }, + { + name: "rebalance should not rebalance away from stores with high read amp when storeHealthBlockAll enforcement", + // NB: Node 1,3,5 all have extremely high read amp. However, since + // read amp does not trigger rebalancing away, only blocking + // rebalancing to this should be ignored and no action taken. + stores: noLocalityUniformQPSHighReadAmp, + conf: emptySpanConfig(), + expectedTargets: nil, + enforcement: storeHealthBlockAll, + }, + { + name: "rebalance should not rebalance away from stores with high read amp when storeHealthBlockRebalanceTo enforcement", + // NB: Node 1,3,5 all have extremely high read amp. However, since + // read amp does not trigger rebalancing away, only blocking + // rebalancing to this should be ignored and no action taken. + stores: noLocalityUniformQPSHighReadAmp, + conf: emptySpanConfig(), + expectedTargets: nil, + enforcement: storeHealthBlockRebalanceTo, + }, + } + + for i, test := range tests { + t.Run(fmt.Sprintf("%d_%s", i+1, test.name), func(t *testing.T) { + stopper, g, _, a, _ := createTestAllocator(ctx, 10, false /* deterministic */) + defer stopper.Stop(ctx) + storeList, _, _ := a.storePool.getStoreList(storeFilterThrottled) + + localDesc := *noLocalityStores[0] + cfg := TestStoreConfig(nil) + cfg.Gossip = g + cfg.StorePool = a.storePool + s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) + gossiputil.NewStoreGossiper(cfg.Gossip).GossipStores(test.stores, t) + s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID} + rq := newReplicateQueue(s, a) + rr := newReplicaRankings() + + sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr) + + // Load in a range with replicas on an overfull node, a slightly underfull + // node, and a very underfull node. + loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100}}) + hottestRanges := rr.topQPS() + + _, targetVoters, _ := sr.chooseRangeToRebalance( + ctx, + &hottestRanges, + &localDesc, + storeList, + &qpsScorerOptions{ + storeHealthOptions: storeHealthOptions{enforcementLevel: test.enforcement, l0SublevelThreshold: maxL0SublevelThreshold}, + deterministic: true, + qpsRebalanceThreshold: 0.05, + }, + ) + require.Equal(t, test.expectedTargets, targetVoters) + }) + } +} diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 3f6710fe4e8d..5ef8be6020e1 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -648,6 +648,17 @@ var charts = []sectionDescription{ }, }, }, + { + Organization: [][]string{ + {DistributionLayer, "Rebalancing"}, + }, + Charts: []chartDescription{ + { + Title: "L0 sub-level rebalancing", + Metrics: []string{"rebalancing.l0_sublevels_histogram"}, + }, + }, + }, { Organization: [][]string{ {DistributionLayer, "Rebalancing"}, diff --git a/pkg/ts/catalog/metrics.go b/pkg/ts/catalog/metrics.go index 301f662c4608..8162ba4ede98 100644 --- a/pkg/ts/catalog/metrics.go +++ b/pkg/ts/catalog/metrics.go @@ -91,6 +91,7 @@ var histogramMetricsNames = map[string]struct{}{ "txnwaitqueue.query.wait_time": {}, "raft.process.applycommitted.latency": {}, "sql.stats.txn_stats_collection.duration": {}, + "rebalancing.l0_sublevels_histogram": {}, } func allInternalTSMetricsNames() []string {