From 83deaaab08e1ea4e25a2d5f63ce007142edd365f Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Tue, 29 Mar 2022 13:46:38 +0000 Subject: [PATCH] kvserver: check L0 sub-levels on allocation Previously, the only store health signal used as a hard allocation and rebalancing constraint was disk capacity. This patch introduces L0 sub-levels as an additional constraint, to avoid allocation and rebalancing to replicas to stores which are unhealthy, indicated by a high number of L0 sub-levels. A store's sub-level count must exceed both the (1) threshold and (2) cluster in order to be considered unhealthy. The average check ensures that a cluster full of moderately high read amplification stores is not unable to make progress, whilst still ensuring that positively skewed distributions exclude the positive tail. Simulation of the effect on candidate exclusion under different L0 sub-level distributions by using the mean as an additional check vs percentiles can be found here: https://gist.github.com/kvoli/be27efd4662e89e8918430a9c7117858 The threshold corresponds to the cluster setting `kv.allocator.L0_sublevels_threshold`, which is the number of L0 sub-levels, that when a candidate store exceeds it will be potentially excluded as a target for rebalancing, or both rebalancing and allocation of replicas. The enforcement of this threshold can be applied under 4 different levels of strictness. This is configured by the cluster setting: `kv.allocator.L0_sublevels_threshold_enforce`. The 4 levels are: `block_none`: L0 sub-levels is ignored entirely. `block_none_log`: L0 sub-levels are logged if threshold exceeded. Both states below log as above. `block_rebalance_to`: L0 sub-levels are considered when excluding stores for rebalance targets. `block_all`: L0 sub-levels are considered when excluding stores for rebalance targets and allocation targets. By default, `kv.allocator.L0_sublevels_threshold` is `20`. Which corresponds to admissions control's threshold, above which it begins limiting admission of work to a store based on store health. The default enforcement level of `kv.allocator.L0_sublevels_threshold_enforce` is `block_none_log`. resolves #73714 Release justification: low risk, high benefit during high read amplification scenarios where an operator may limit rebalancing to high read amplification stores, to stop fueling the flame. Release note (ops change): introduce cluster settings `kv.allocator.l0_sublevels_threshold` and `kv.allocator.L0_sublevels_threshold_enforce`, which enable excluding stores as targets for allocation and rebalancing of replicas when they have high read amplification, indicated by the number of L0 sub-levels in level 0 of the store's LSM. When both `kv.allocator.l0_sublevels_threshold` and the cluster average is exceeded, the action corresponding to `kv.allocator.l0_sublevels_threshold_enforce` is taken. `block_none` will exclude no candidate stores, `block_none_log` will exclude no candidates but log an event, `block_rebalance_to` will exclude candidates stores from being targets of rebalance actions, `block_all` will exclude candidate stores from being targets of both allocation and rebalancing. Default `kv.allocator.l0_sublevels_threshold` is set to `20` and `kv.allocator.l0_sublevels_threshold_enforce` is set to `block_none_log`. --- pkg/kv/kvserver/allocator.go | 35 +- pkg/kv/kvserver/allocator_scorer.go | 290 +++++++++++++-- pkg/kv/kvserver/allocator_scorer_test.go | 37 +- pkg/kv/kvserver/allocator_test.go | 388 ++++++++++++++++++-- pkg/kv/kvserver/metrics.go | 14 + pkg/kv/kvserver/replica_command.go | 4 +- pkg/kv/kvserver/replicate_queue.go | 12 +- pkg/kv/kvserver/store.go | 6 +- pkg/kv/kvserver/store_pool.go | 5 + pkg/kv/kvserver/store_rebalancer.go | 7 +- pkg/kv/kvserver/store_rebalancer_test.go | 440 +++++++++++++++++++++-- pkg/ts/catalog/chart_catalog.go | 11 + pkg/ts/catalog/metrics.go | 1 + 13 files changed, 1122 insertions(+), 128 deletions(-) diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index 83b1d2becc0c..7a42219721ac 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -19,6 +19,7 @@ import ( "strings" "time" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -899,7 +900,7 @@ func (a *Allocator) allocateTarget( conf, existingVoters, existingNonVoters, - a.scorerOptions(), + a.scorerOptions(ctx), // When allocating a *new* replica, we explicitly disregard nodes with any // existing replicas. This is important for multi-store scenarios as // otherwise, stores on the nodes that have existing replicas are simply @@ -1122,6 +1123,7 @@ func (a Allocator) removeTarget( replicaSetForDiversityCalc := getReplicasForDiversityCalc(targetType, existingVoters, existingReplicas) rankedCandidates := candidateListForRemoval( + ctx, candidateStoreList, constraintsChecker, a.storePool.getLocalitiesByStore(replicaSetForDiversityCalc), @@ -1451,16 +1453,18 @@ func (a Allocator) RebalanceNonVoter( ) } -func (a *Allocator) scorerOptions() *rangeCountScorerOptions { +func (a *Allocator) scorerOptions(ctx context.Context) *rangeCountScorerOptions { return &rangeCountScorerOptions{ + storeHealthOptions: a.storeHealthOptions(ctx), deterministic: a.storePool.deterministic, rangeRebalanceThreshold: rangeRebalanceThreshold.Get(&a.storePool.st.SV), } } -func (a *Allocator) scorerOptionsForScatter() *scatterScorerOptions { +func (a *Allocator) scorerOptionsForScatter(ctx context.Context) *scatterScorerOptions { return &scatterScorerOptions{ rangeCountScorerOptions: rangeCountScorerOptions{ + storeHealthOptions: a.storeHealthOptions(ctx), deterministic: a.storePool.deterministic, rangeRebalanceThreshold: 0, }, @@ -1588,6 +1592,24 @@ func (a *Allocator) leaseholderShouldMoveDueToPreferences( return true } +// storeHealthOptions returns the store health options, currently only +// considering the threshold for L0 sub-levels. This threshold is not +// considered in allocation or rebalancing decisions (excluding candidate +// stores as targets) when enforcementLevel is set to storeHealthNoAction or +// storeHealthLogOnly. By default storeHealthLogOnly is the action taken. When +// there is a mixed version cluster, storeHealthNoAction is set instead. +func (a *Allocator) storeHealthOptions(ctx context.Context) storeHealthOptions { + enforcementLevel := storeHealthNoAction + if a.storePool.st.Version.IsActive(ctx, clusterversion.AutoStatsTableSettings) { + enforcementLevel = storeHealthEnforcement(l0SublevelsThresholdEnforce.Get(&a.storePool.st.SV)) + } + + return storeHealthOptions{ + enforcementLevel: enforcementLevel, + l0SublevelThreshold: l0SublevelsThreshold.Get(&a.storePool.st.SV), + } +} + // TransferLeaseTarget returns a suitable replica to transfer the range lease // to from the provided list. It includes the current lease holder replica // unless asked to do otherwise by the excludeLeaseRepl parameter. @@ -1735,11 +1757,14 @@ func (a *Allocator) TransferLeaseTarget( // https://github.com/cockroachdb/cockroach/issues/75630. bestStore, noRebalanceReason := bestStoreToMinimizeQPSDelta( leaseReplQPS, - qpsRebalanceThreshold.Get(&a.storePool.st.SV), - minQPSDifferenceForTransfers.Get(&a.storePool.st.SV), leaseRepl.StoreID(), candidates, storeDescMap, + &qpsScorerOptions{ + storeHealthOptions: a.storeHealthOptions(ctx), + qpsRebalanceThreshold: qpsRebalanceThreshold.Get(&a.storePool.st.SV), + minRequiredQPSDiff: minQPSDifferenceForTransfers.Get(&a.storePool.st.SV), + }, ) switch noRebalanceReason { diff --git a/pkg/kv/kvserver/allocator_scorer.go b/pkg/kv/kvserver/allocator_scorer.go index 3c33358f74e8..1131cc52e358 100644 --- a/pkg/kv/kvserver/allocator_scorer.go +++ b/pkg/kv/kvserver/allocator_scorer.go @@ -16,6 +16,7 @@ import ( "fmt" "math" "sort" + "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -70,6 +71,50 @@ const ( // consider a store full/empty if it's at least minRebalanceThreshold // away from the mean. minRangeRebalanceThreshold = 2 + + // maxL0SublevelThreshold is the number of L0 sub-levels of a store + // descriptor, that when greater than this value and in excees of the + // average L0 sub-levels in the cluster - will have the action defined by + // l0SublevelsThresholdEnforce taken. This value does not affect the + // allocator in deciding to remove replicas from it's store, only + // potentially block adding or moving replicas to other stores. + maxL0SublevelThreshold = 20 + + // l0SublevelInterval is the period over which to accumulate statistics on + // the number of L0 sublevels within a store. + l0SublevelInterval = time.Minute * 5 + + // l0SublevelMaxSampled is maximum number of L0 sub-levels that may exist + // in a sample. This setting limits the extreme skew that could occur by + // capping the highest possible value considered. + l0SublevelMaxSampled = 500 + + // l0SubLevelWaterMark is the percentage above the mean after which a store + // could be conisdered unhealthy if also exceeding the threshold. + l0SubLevelWaterMark = 1.10 +) + +// storeHealthEnforcement represents the level of action that may be taken or +// excluded when a candidate disk is considered unhealthy. +type storeHealthEnforcement int64 + +const ( + // storeHealthNoAction will take no action upon candidate stores when they + // exceed l0SublevelThreshold. + storeHealthNoAction storeHealthEnforcement = iota + // storeHealthLogOnly will take no action upon candidate stores when they + // exceed l0SublevelThreshold except log an event. + storeHealthLogOnly + // storeHealthBlockRebalanceTo will take action to exclude candidate stores + // when they exceed l0SublevelThreshold and mean from being considered + // targets for rebalance actions only. Allocation actions such as adding + // upreplicaing an range will not be affected. + storeHealthBlockRebalanceTo + // storeHealthBlockAll will take action to exclude candidate stores when + // they exceed l0SublevelThreshold and mean from being candidates for all + // replica allocation and rebalancing. When enabled and stores exceed the + // threshold, they will not receive any new replicas. + storeHealthBlockAll ) // rangeRebalanceThreshold is the minimum ratio of a store's range count to @@ -87,6 +132,55 @@ var rangeRebalanceThreshold = func() *settings.FloatSetting { return s }() +// l0SublevelsThreshold is the maximum number of sub-levels within level 0 that +// may exist on candidate store descriptors before they are considered +// unhealthy. Once considered unhealthy, the action taken will be dictated by +// l0SublevelsThresholdEnforce cluster setting defined below. The rationale for +// using L0 sub-levels as opposed to read amplification is that it is more +// generally the variable component that makes up read amplification. When +// L0 sub-levels is high, it is an indicator of poor LSM health as L0 is usually +// in memory and must be first visited before traversing any further level. See +// this issue for additional information: +// https://github.com/cockroachdb/pebble/issues/609 +var l0SublevelsThreshold = settings.RegisterIntSetting( + settings.SystemOnly, + "kv.allocator.l0_sublevels_threshold", + "the maximum number of l0 sublevels within a store that may exist "+ + "before the action defined in "+ + "`kv.allocator.l0_sublevels_threshold_enforce` will be taken "+ + "if also exceeding the cluster average", + maxL0SublevelThreshold, +) + +// l0SublevelsThresholdEnforce is the level of enforcement taken upon candidate +// stores when their L0-sublevels exceeds the threshold defined in +// l0SublevelThreshold. Under disabled and log enforcement, no action is taken +// to exclude the candidate store either as a potential allocation nor +// rebalance target by the replicate queue and store rebalancer. When the +// enforcement level is rebalance, candidate stores will be excluded as targets +// for rebalancing when exceeding the threshold, however will remain candidates +// for allocation of voters and non-voters. When allocate is set, candidates +// are excluded as targets for all rebalancing and also allocation of voters +// and non-voters. +var l0SublevelsThresholdEnforce = settings.RegisterEnumSetting( + settings.SystemOnly, + "kv.allocator.l0_sublevels_threshold_enforce", + "the level of enforcement when a candidate disk has L0 sub-levels "+ + "exceeding `kv.allocator.l0_sublevels_threshold` and above the "+ + "cluster average:`block_none` will exclude "+ + "no candidate stores, `block_none_log` will exclude no candidates but log an "+ + "event, `block_rebalance_to` will exclude candidates stores from being "+ + "targets of rebalance actions, `block_all` will exclude candidate stores "+ + "from being targets of both allocation and rebalancing", + "block_none_log", + map[int64]string{ + int64(storeHealthNoAction): "block_none", + int64(storeHealthLogOnly): "block_none_log", + int64(storeHealthBlockRebalanceTo): "block_rebalance_to", + int64(storeHealthBlockAll): "block_all", + }, +) + // CockroachDB has two heuristics that trigger replica rebalancing: range count // convergence and QPS convergence. scorerOptions defines the interface that // both of these heuristics must implement. @@ -140,6 +234,9 @@ type scorerOptions interface { // with the same QPS) that would converge the range's existing stores' QPS the // most. removalMaximallyConvergesScore(removalCandStoreList StoreList, existing roachpb.StoreDescriptor) int + // getStoreHealthOptions returns the scorer options for store health. It is + // used to inform scoring based on the health of a store. + getStoreHealthOptions() storeHealthOptions } func jittered(val float64, jitter float64, rand allocatorRand) float64 { @@ -181,6 +278,7 @@ func (o *scatterScorerOptions) maybeJitterStoreStats( // This means that the resulting rebalancing decisions will further the goal of // converging range counts across stores in the cluster. type rangeCountScorerOptions struct { + storeHealthOptions deterministic bool rangeRebalanceThreshold float64 } @@ -295,6 +393,7 @@ func (o *rangeCountScorerOptions) removalMaximallyConvergesScore( // queries-per-second. This means that the resulting rebalancing decisions will // further the goal of converging QPS across stores in the cluster. type qpsScorerOptions struct { + storeHealthOptions deterministic bool qpsRebalanceThreshold, minRequiredQPSDiff float64 @@ -443,13 +542,15 @@ func (o *qpsScorerOptions) removalMaximallyConvergesScore( return 0 } -// candidate store for allocation. +// candidate store for allocation. These are ordered by importance. type candidate struct { store roachpb.StoreDescriptor valid bool fullDisk bool necessary bool diversityScore float64 + highReadAmp bool + l0SubLevels int convergesScore int balanceScore balanceStatus rangeCount int @@ -457,9 +558,9 @@ type candidate struct { } func (c candidate) String() string { - str := fmt.Sprintf("s%d, valid:%t, fulldisk:%t, necessary:%t, diversity:%.2f, converges:%d, "+ + str := fmt.Sprintf("s%d, valid:%t, fulldisk:%t, necessary:%t, diversity:%.2f, highReadAmp: %t, l0SubLevels: %d, converges:%d, "+ "balance:%d, rangeCount:%d, queriesPerSecond:%.2f", - c.store.StoreID, c.valid, c.fullDisk, c.necessary, c.diversityScore, c.convergesScore, + c.store.StoreID, c.valid, c.fullDisk, c.necessary, c.diversityScore, c.highReadAmp, c.l0SubLevels, c.convergesScore, c.balanceScore, c.rangeCount, c.store.Capacity.QueriesPerSecond) if c.details != "" { return fmt.Sprintf("%s, details:(%s)", str, c.details) @@ -482,6 +583,12 @@ func (c candidate) compactString() string { if c.diversityScore != 0 { fmt.Fprintf(&buf, ", diversity:%.2f", c.diversityScore) } + if c.highReadAmp { + fmt.Fprintf(&buf, ", highReadAmp:%t", c.highReadAmp) + } + if c.l0SubLevels > 0 { + fmt.Fprintf(&buf, ", l0SubLevels:%d", c.l0SubLevels) + } fmt.Fprintf(&buf, ", converges:%d, balance:%d, rangeCount:%d", c.convergesScore, c.balanceScore, c.rangeCount) if c.details != "" { @@ -502,29 +609,43 @@ func (c candidate) less(o candidate) bool { // candidate is. func (c candidate) compare(o candidate) float64 { if !o.valid { - return 6 + return 60 } if !c.valid { - return -6 + return -60 } if o.fullDisk { - return 5 + return 50 } if c.fullDisk { - return -5 + return -50 } if c.necessary != o.necessary { if c.necessary { - return 4 + return 40 } - return -4 + return -40 } if !scoresAlmostEqual(c.diversityScore, o.diversityScore) { if c.diversityScore > o.diversityScore { - return 3 + return 30 } - return -3 + return -30 + } + // If both o and c have high read amplification, then we prefer the + // canidate with lower read amp. + if o.highReadAmp && c.highReadAmp { + if o.l0SubLevels > c.l0SubLevels { + return 25 + } + } + if c.highReadAmp { + return -25 } + if o.highReadAmp { + return 25 + } + if c.convergesScore != o.convergesScore { if c.convergesScore > o.convergesScore { return 2 + float64(c.convergesScore-o.convergesScore)/10.0 @@ -587,6 +708,7 @@ func (c byScoreAndID) Less(i, j int) bool { c[i].rangeCount == c[j].rangeCount && c[i].necessary == c[j].necessary && c[i].fullDisk == c[j].fullDisk && + c[i].highReadAmp == c[j].highReadAmp && c[i].valid == c[j].valid { return c[i].store.StoreID < c[j].store.StoreID } @@ -594,11 +716,12 @@ func (c byScoreAndID) Less(i, j int) bool { } func (c byScoreAndID) Swap(i, j int) { c[i], c[j] = c[j], c[i] } -// onlyValidAndNotFull returns all the elements in a sorted (by score reversed) -// candidate list that are valid and not nearly full. -func (cl candidateList) onlyValidAndNotFull() candidateList { +// onlyValidAndHealthyDisk returns all the elements in a sorted (by score +// reversed) candidate list that are valid and not nearly full or with high +// read amplification. +func (cl candidateList) onlyValidAndHealthyDisk() candidateList { for i := len(cl) - 1; i >= 0; i-- { - if cl[i].valid && !cl[i].fullDisk { + if cl[i].valid && !cl[i].fullDisk && !cl[i].highReadAmp { return cl[:i+1] } } @@ -608,7 +731,7 @@ func (cl candidateList) onlyValidAndNotFull() candidateList { // best returns all the elements in a sorted (by score reversed) candidate list // that share the highest constraint score and are valid. func (cl candidateList) best() candidateList { - cl = cl.onlyValidAndNotFull() + cl = cl.onlyValidAndHealthyDisk() if len(cl) <= 1 { return cl } @@ -650,6 +773,14 @@ func (cl candidateList) worst() candidateList { } } } + // Are there candidates with high read amplification? If so, pick those. + if cl[len(cl)-1].highReadAmp { + for i := len(cl) - 2; i >= 0; i-- { + if !cl[i].highReadAmp { + return cl[i+1:] + } + } + } // Find the worst constraint/locality/converges/balanceScore values. for i := len(cl) - 2; i >= 0; i-- { if cl[i].necessary == cl[len(cl)-1].necessary && @@ -774,7 +905,11 @@ func rankedCandidateListForAllocation( continue } - if !maxCapacityCheck(s) { + if !maxCapacityCheck(s) || !options.storeHealthOptions.readAmpIsHealthy( + ctx, + s, + candidateStores.candidateL0Sublevels.mean, + ) { continue } diversityScore := diversityAllocateScore(s, existingStoreLocalities) @@ -808,6 +943,7 @@ func rankedCandidateListForAllocation( // // Stores that are marked as not valid, are in violation of a required criteria. func candidateListForRemoval( + ctx context.Context, existingReplsStoreList StoreList, constraintsCheck constraintsCheckFn, existingStoreLocalities map[roachpb.StoreID]roachpb.Locality, @@ -827,10 +963,15 @@ func candidateListForRemoval( } diversityScore := diversityRemovalScore(s.StoreID, existingStoreLocalities) candidates = append(candidates, candidate{ - store: s, - valid: constraintsOK, - necessary: necessary, - fullDisk: !maxCapacityCheck(s), + store: s, + valid: constraintsOK, + necessary: necessary, + fullDisk: !maxCapacityCheck(s), + // When removing a replica from a store, we do not want to include + // high amplification in ranking stores. This would submit already + // high read amplification stores to additional load of moving a + // replica. + highReadAmp: false, diversityScore: diversityScore, }) } @@ -980,10 +1121,11 @@ const ( // decision would minimize the QPS range between the existing store and the // coldest store in the equivalence class. func bestStoreToMinimizeQPSDelta( - replQPS, rebalanceThreshold, minRequiredQPSDiff float64, + replQPS float64, existing roachpb.StoreID, candidates []roachpb.StoreID, storeDescMap map[roachpb.StoreID]*roachpb.StoreDescriptor, + options *qpsScorerOptions, ) (bestCandidate roachpb.StoreID, reason declineReason) { storeQPSMap := make(map[roachpb.StoreID]float64, len(candidates)+1) for _, store := range candidates { @@ -1027,7 +1169,7 @@ func bestStoreToMinimizeQPSDelta( // `bestCandidate` (not accounting for the replica under consideration) is // higher than `minQPSDifferenceForTransfers`. diffIgnoringRepl := existingQPSIgnoringRepl - bestCandQPS - if diffIgnoringRepl < minRequiredQPSDiff { + if diffIgnoringRepl < options.minRequiredQPSDiff { return 0, deltaNotSignificant } @@ -1035,7 +1177,7 @@ func bestStoreToMinimizeQPSDelta( // the equivalence class. mean := domainStoreList.candidateQueriesPerSecond.mean overfullThreshold := overfullQPSThreshold( - &qpsScorerOptions{qpsRebalanceThreshold: rebalanceThreshold}, + options, mean, ) if existingQPS < overfullThreshold { @@ -1093,11 +1235,10 @@ func (o *qpsScorerOptions) getRebalanceTargetToMinimizeDelta( } return bestStoreToMinimizeQPSDelta( o.qpsPerReplica, - o.qpsRebalanceThreshold, - o.minRequiredQPSDiff, eqClass.existing.StoreID, candidates, storeListToMap(domainStoreList), + o, ) } @@ -1127,6 +1268,7 @@ func rankedCandidateListForRebalancing( } valid, necessary := removalConstraintsChecker(store) fullDisk := !maxCapacityCheck(store) + if !valid { if !needRebalanceFrom { log.VEventf(ctx, 2, "s%d: should-rebalance(invalid): locality:%q", @@ -1142,10 +1284,15 @@ func rankedCandidateListForRebalancing( needRebalanceFrom = true } existingStores[store.StoreID] = candidate{ - store: store, - valid: valid, - necessary: necessary, - fullDisk: fullDisk, + store: store, + valid: valid, + necessary: necessary, + fullDisk: fullDisk, + // When rebalancing a replica away from a store, we do not want + // to include high amplification in ranking stores. This would + // submit already high read amplification stores to additional + // load of moving a replica. + highReadAmp: false, diversityScore: curDiversityScore, } } @@ -1215,15 +1362,19 @@ func rankedCandidateListForRebalancing( continue } + // NB: We construct equivalence classes based on locality hierarchies, + // the diversityScore must be the only thing that's populated at + // this stage, in additon to hard checks and validation. + // TODO(kvoli,ayushshah15): Refactor this to make it harder to + // inadvertently break the invariant above, constraintsOK, necessary := rebalanceConstraintsChecker(store, existing.store) - maxCapacityOK := maxCapacityCheck(store) diversityScore := diversityRebalanceFromScore( store, existing.store.StoreID, existingStoreLocalities) cand := candidate{ store: store, valid: constraintsOK, necessary: necessary, - fullDisk: !maxCapacityOK, + fullDisk: !maxCapacityCheck(store), diversityScore: diversityScore, } if !cand.less(existing) { @@ -1322,6 +1473,14 @@ func rankedCandidateListForRebalancing( // rebalance candidates. s := cand.store cand.fullDisk = !rebalanceToMaxCapacityCheck(s) + cand.l0SubLevels = int(s.Capacity.L0Sublevels) + cand.highReadAmp = !options.getStoreHealthOptions().rebalanceToReadAmpIsHealthy( + ctx, + s, + // We only wish to compare the read amplification to the + // comparable stores average and not the cluster. + comparable.candidateSL.candidateL0Sublevels.mean, + ) cand.balanceScore = options.balanceScore(comparable.candidateSL, s.Capacity) cand.convergesScore = options.rebalanceToConvergesScore(comparable, s) cand.rangeCount = int(s.Capacity.RangeCount) @@ -1856,6 +2015,73 @@ func convergesOnMean(oldVal, newVal, mean float64) bool { return math.Abs(newVal-mean) < math.Abs(oldVal-mean) } +type storeHealthOptions struct { + enforcementLevel storeHealthEnforcement + l0SublevelThreshold int64 +} + +func (o storeHealthOptions) getStoreHealthOptions() storeHealthOptions { + return o +} + +// readAmpIsHealthy returns true if the store read amplification does not exceed +// the cluster threshold and mean, or the enforcement level does not include +// excluding candidates from being allocation targets. +func (o storeHealthOptions) readAmpIsHealthy( + ctx context.Context, store roachpb.StoreDescriptor, avg float64, +) bool { + if o.enforcementLevel == storeHealthNoAction || + store.Capacity.L0Sublevels < o.l0SublevelThreshold { + return true + } + + // Still log an event when the L0 sub-levels exceeds the threshold, however + // does not exceed the cluster average. This is enabled to avoid confusion + // where candidate stores are still targets, despite exeeding the + // threshold. + if float64(store.Capacity.L0Sublevels) < avg*l0SubLevelWaterMark { + log.Eventf(ctx, "s%d, allocate check l0 sublevels %d exceeds threshold %d, but below average: %f, action enabled %d", + store.StoreID, store.Capacity.L0Sublevels, + o.l0SublevelThreshold, avg, o.enforcementLevel) + return true + } + + log.Eventf(ctx, "s%d, allocate check l0 sublevels %d exceeds threshold %d, above average: %f, action enabled %d", + store.StoreID, store.Capacity.L0Sublevels, + o.l0SublevelThreshold, avg, o.enforcementLevel) + + // The store is only considered unhealthy when the enforcement level is + // storeHealthBlockAll. + return o.enforcementLevel < storeHealthBlockAll +} + +// rebalanceToReadAmpIsHealthy returns true if the store read amplification does +// not exceed the cluster threshold and mean, or the enforcement level does not +// include excluding candidates from being rebalancing targets. +func (o storeHealthOptions) rebalanceToReadAmpIsHealthy( + ctx context.Context, store roachpb.StoreDescriptor, avg float64, +) bool { + if o.enforcementLevel == storeHealthNoAction || + store.Capacity.L0Sublevels < o.l0SublevelThreshold { + return true + } + + if float64(store.Capacity.L0Sublevels) < avg*l0SubLevelWaterMark { + log.Eventf(ctx, "s%d, allocate check l0 sublevels %d exceeds threshold %d, but below average watermark: %f, action enabled %d", + store.StoreID, store.Capacity.L0Sublevels, + o.l0SublevelThreshold, avg*l0SubLevelWaterMark, o.enforcementLevel) + return true + } + + log.Eventf(ctx, "s%d, allocate check l0 sublevels %d exceeds threshold %d, above average watermark: %f, action enabled %d", + store.StoreID, store.Capacity.L0Sublevels, + o.l0SublevelThreshold, avg*l0SubLevelWaterMark, o.enforcementLevel) + + // The store is only considered unhealthy when the enforcement level is + // storeHealthBlockRebalanceTo or storeHealthBlockAll. + return o.enforcementLevel < storeHealthBlockRebalanceTo +} + // maxCapacityCheck returns true if the store has room for a new replica. func maxCapacityCheck(store roachpb.StoreDescriptor) bool { return store.Capacity.FractionUsed() < maxFractionUsedThreshold diff --git a/pkg/kv/kvserver/allocator_scorer_test.go b/pkg/kv/kvserver/allocator_scorer_test.go index bb48103d62d8..17817259b0c5 100644 --- a/pkg/kv/kvserver/allocator_scorer_test.go +++ b/pkg/kv/kvserver/allocator_scorer_test.go @@ -44,22 +44,24 @@ func (s storeScores) Less(i, j int) bool { } func (s storeScores) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func TestOnlyValidAndNotFull(t *testing.T) { +func TestOnlyValidAndHealthyDisk(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) testCases := []struct { - valid, invalid int + valid, invalid, full, readAmpHigh int }{ - {0, 0}, - {1, 0}, - {0, 1}, - {1, 1}, - {2, 0}, - {2, 1}, - {2, 2}, - {1, 2}, - {0, 2}, + {0, 0, 0, 0}, + {1, 0, 0, 0}, + {0, 1, 0, 0}, + {0, 0, 1, 0}, + {0, 0, 0, 1}, + {1, 1, 1, 1}, + {2, 0, 0, 0}, + {2, 1, 1, 1}, + {2, 2, 2, 2}, + {1, 2, 3, 4}, + {0, 2, 4, 6}, } for _, tc := range testCases { @@ -72,13 +74,19 @@ func TestOnlyValidAndNotFull(t *testing.T) { for i := 0; i < tc.valid; i++ { cl = append(cl, candidate{valid: true}) } + for i := 0; i < tc.full; i++ { + cl = append(cl, candidate{fullDisk: true}) + } + for i := 0; i < tc.readAmpHigh; i++ { + cl = append(cl, candidate{highReadAmp: true}) + } sort.Sort(sort.Reverse(byScore(cl))) - valid := cl.onlyValidAndNotFull() + valid := cl.onlyValidAndHealthyDisk() if a, e := len(valid), tc.valid; a != e { t.Errorf("expected %d valid, actual %d", e, a) } - if a, e := len(cl)-len(valid), tc.invalid; a != e { + if a, e := len(cl)-len(valid), tc.invalid+tc.full+tc.readAmpHigh; a != e { t.Errorf("expected %d invalid, actual %d", e, a) } }) @@ -1046,13 +1054,14 @@ func TestRemoveConstraintsCheck(t *testing.T) { } }) } + } func TestShouldRebalanceDiversity(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - options := &rangeCountScorerOptions{} + options := &rangeCountScorerOptions{storeHealthOptions: storeHealthOptions{enforcementLevel: storeHealthNoAction}} newStore := func(id int, locality roachpb.Locality) roachpb.StoreDescriptor { return roachpb.StoreDescriptor{ StoreID: roachpb.StoreID(id), diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index 66df83cf0718..395c97328c1d 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -342,6 +342,93 @@ var oneStoreWithFullDisk = []*roachpb.StoreDescriptor{ }, } +var oneStoreHighReadAmp = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 600, L0Sublevels: maxL0SublevelThreshold - 5}, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 1800, L0Sublevels: maxL0SublevelThreshold - 5}, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 600, L0Sublevels: maxL0SublevelThreshold + 5}, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 1200, L0Sublevels: maxL0SublevelThreshold - 5}, + }, +} + +var allStoresHighReadAmp = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 1200, L0Sublevels: maxL0SublevelThreshold + 1}, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 800, L0Sublevels: maxL0SublevelThreshold + 1}, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 600, L0Sublevels: maxL0SublevelThreshold + 1}, + }, +} + +var allStoresHighReadAmpSkewed = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 1200, L0Sublevels: maxL0SublevelThreshold + 1}, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 800, L0Sublevels: maxL0SublevelThreshold + 50}, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 600, L0Sublevels: maxL0SublevelThreshold + 55}, + }, +} + +var threeStoresHighReadAmpAscRangeCount = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 100, L0Sublevels: maxL0SublevelThreshold + 10}, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 400, L0Sublevels: maxL0SublevelThreshold + 10}, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 1600, L0Sublevels: maxL0SublevelThreshold + 10}, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 6400, L0Sublevels: maxL0SublevelThreshold - 10}, + }, + { + StoreID: 5, + Node: roachpb.NodeDescriptor{NodeID: 5}, + Capacity: roachpb.StoreCapacity{Capacity: 200, Available: 200, RangeCount: 25000, L0Sublevels: maxL0SublevelThreshold - 10}, + }, +} + var oneStoreWithTooManyRanges = []*roachpb.StoreDescriptor{ { StoreID: 1, @@ -527,6 +614,101 @@ func TestAllocatorNoAvailableDisks(t *testing.T) { } } +func TestAllocatorReadAmpCheck(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + type testCase struct { + name string + stores []*roachpb.StoreDescriptor + conf roachpb.SpanConfig + expectedAddTarget roachpb.StoreID + enforcement storeHealthEnforcement + } + tests := []testCase{ + { + name: "ignore read amp on allocation when storeHealthNoAction enforcement", + // NB: All stores have high read amp, this should be ignored and + // allocate to the store with the lowest range count. + stores: allStoresHighReadAmp, + conf: emptySpanConfig(), + expectedAddTarget: roachpb.StoreID(3), + enforcement: storeHealthNoAction, + }, + { + name: "ignore read amp on allocation when storeHealthLogOnly enforcement", + // NB: All stores have high read amp, this should be ignored and + // allocate to the store with the lowest range count. + stores: allStoresHighReadAmp, + conf: emptySpanConfig(), + expectedAddTarget: roachpb.StoreID(3), + enforcement: storeHealthLogOnly, + }, + { + name: "ignore read amp on allocation when storeHealthBlockRebalanceTo enforcement", + // NB: All stores have high read amp, this should be ignored and + // allocate to the store with the lowest range count. + stores: allStoresHighReadAmp, + conf: emptySpanConfig(), + expectedAddTarget: roachpb.StoreID(3), + enforcement: storeHealthBlockRebalanceTo, + }, + { + name: "don't allocate to stores when all have high read amp and storeHealthBlockAll", + // NB: All stores have high read amp (limit + 1), none are above the watermark, select the lowest range count. + stores: allStoresHighReadAmp, + conf: emptySpanConfig(), + expectedAddTarget: roachpb.StoreID(3), + enforcement: storeHealthBlockAll, + }, + { + name: "allocate to store below the mean when all have high read amp and storeHealthBlockAll", + // NB: All stores have high read amp, however store 1 is below the watermark mean read amp. + stores: allStoresHighReadAmpSkewed, + conf: emptySpanConfig(), + expectedAddTarget: roachpb.StoreID(1), + enforcement: storeHealthBlockAll, + }, + { + name: "allocate to lowest range count store without high read amp when storeHealthBlockAll enforcement", + // NB: Store 1, 2 and 3 have high read amp and are above the watermark, the lowest range count (4) + // should be selected. + stores: threeStoresHighReadAmpAscRangeCount, + conf: emptySpanConfig(), + expectedAddTarget: roachpb.StoreID(4), + enforcement: storeHealthBlockAll, + }, + } + + chk := func(target roachpb.ReplicationTarget, expectedTarget roachpb.StoreID) bool { + return target.StoreID == expectedTarget + } + + for i, test := range tests { + t.Run(fmt.Sprintf("%d_%s", i+1, test.name), func(t *testing.T) { + stopper, g, _, a, _ := createTestAllocator(ctx, 10, false /* deterministic */) + defer stopper.Stop(ctx) + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(test.stores, t) + + // Enable read disk health checking in candidate exclusion. + l0SublevelsThresholdEnforce.Override(ctx, &a.storePool.st.SV, int64(test.enforcement)) + add, _, err := a.AllocateVoter( + ctx, + test.conf, + nil, + nil, + ) + require.NoError(t, err) + require.Truef(t, + chk(add, test.expectedAddTarget), + "the addition target %+v from AllocateVoter doesn't match expectation", + add) + }) + } +} + func TestAllocatorTwoDatacenters(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -728,7 +910,7 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) if e, a := tc.expectTargetRebalance, ok; e != a { t.Errorf( @@ -802,7 +984,7 @@ func TestAllocatorMultipleStoresPerNodeLopsided(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) if ok { // Update the descriptor. @@ -843,7 +1025,7 @@ func TestAllocatorMultipleStoresPerNodeLopsided(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) require.False(t, ok) } @@ -915,7 +1097,7 @@ func TestAllocatorRebalanceBasedOnRangeCount(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) if !ok { i-- // loop until we find 10 candidates @@ -939,7 +1121,7 @@ func TestAllocatorRebalanceBasedOnRangeCount(t *testing.T) { t.Fatalf("%d: unable to get store %d descriptor", i, store.StoreID) } eqClass.existing = desc - result := a.scorerOptions().shouldRebalanceBasedOnThresholds( + result := a.scorerOptions(ctx).shouldRebalanceBasedOnThresholds( ctx, eqClass, a.metrics, @@ -1077,7 +1259,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) if ok { t.Fatalf("expected no rebalance, but got target s%d; details: %s", result.StoreID, details) @@ -1102,7 +1284,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) if ok { t.Fatalf("expected no rebalance, but got target s%d; details: %s", target.StoreID, details) @@ -1121,7 +1303,7 @@ func TestAllocatorRebalanceTarget(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) expTo := stores[1].StoreID expFrom := stores[0].StoreID @@ -1200,7 +1382,7 @@ func TestAllocatorRebalanceDeadNodes(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) if c.expected > 0 { if !ok { @@ -1347,7 +1529,7 @@ func TestAllocatorRebalanceThrashing(t *testing.T) { t.Fatalf("[store %d]: unable to get store %d descriptor", j, store.StoreID) } eqClass.existing = desc - if a, e := a.scorerOptions().shouldRebalanceBasedOnThresholds( + if a, e := a.scorerOptions(ctx).shouldRebalanceBasedOnThresholds( context.Background(), eqClass, a.metrics, @@ -1471,6 +1653,7 @@ func TestAllocatorRebalanceByQPS(t *testing.T) { gossiputil.NewStoreGossiper(g).GossipStores(subtest.testStores, t) var rangeUsageInfo RangeUsageInfo options := &qpsScorerOptions{ + storeHealthOptions: storeHealthOptions{enforcementLevel: storeHealthNoAction}, qpsPerReplica: 100, qpsRebalanceThreshold: 0.2, } @@ -1584,6 +1767,7 @@ func TestAllocatorRemoveBasedOnQPS(t *testing.T) { defer stopper.Stop(ctx) gossiputil.NewStoreGossiper(g).GossipStores(subtest.testStores, t) options := &qpsScorerOptions{ + storeHealthOptions: storeHealthOptions{enforcementLevel: storeHealthNoAction}, qpsRebalanceThreshold: 0.1, } remove, _, err := a.RemoveVoter( @@ -1647,7 +1831,7 @@ func TestAllocatorRebalanceByCount(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) if ok && result.StoreID != 4 { t.Errorf("expected store 4; got %d", result.StoreID) @@ -1665,7 +1849,7 @@ func TestAllocatorRebalanceByCount(t *testing.T) { existing: desc, candidateSL: sl, } - result := a.scorerOptions().shouldRebalanceBasedOnThresholds( + result := a.scorerOptions(ctx).shouldRebalanceBasedOnThresholds( ctx, eqClass, a.metrics, @@ -2227,7 +2411,7 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) var resultID roachpb.StoreID if ok { @@ -2299,7 +2483,7 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) var gotExpected bool if !ok { @@ -2844,7 +3028,7 @@ func TestAllocatorRemoveBasedOnDiversity(t *testing.T) { c.existingVoters, /* voterCandidates */ c.existingVoters, c.existingNonVoters, - a.scorerOptions(), + a.scorerOptions(ctx), ) require.NoError(t, err) @@ -2863,7 +3047,7 @@ func TestAllocatorRemoveBasedOnDiversity(t *testing.T) { c.existingVoters, c.existingVoters, nil, - a.scorerOptions(), + a.scorerOptions(ctx), ) require.NoError(t, err) require.Truef(t, checkReplExists(targetVoter, c.expVoterRemovals), @@ -2876,7 +3060,7 @@ func TestAllocatorRemoveBasedOnDiversity(t *testing.T) { c.existingNonVoters, /* nonVoterCandidates */ c.existingVoters, c.existingNonVoters, - a.scorerOptions(), + a.scorerOptions(ctx), ) require.NoError(t, err) require.True(t, checkReplExists(targetNonVoter, c.expNonVoterRemovals)) @@ -3152,7 +3336,7 @@ func TestAllocatorRebalanceTargetLocality(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) if !ok { t.Fatalf("%d: RebalanceVoter(%v) returned no target store; details: %s", i, c.existing, details) @@ -3384,7 +3568,7 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { a.storePool.getLocalitiesByStore(existingRepls), a.storePool.isStoreReadyForRoutineReplicaTransfer, false, /* allowMultipleReplsPerNode */ - a.scorerOptions(), + a.scorerOptions(ctx), ) if !expectedStoreIDsMatch(tc.expected, candidates) { @@ -3403,7 +3587,7 @@ func TestAllocateCandidatesExcludeNonReadyNodes(t *testing.T) { nil, a.storePool.getLocalitiesByStore(existingRepls), a.storePool.isStoreReadyForRoutineReplicaTransfer, - a.scorerOptions(), + a.scorerOptions(ctx), a.metrics, ) if len(tc.expected) > 0 { @@ -3726,7 +3910,7 @@ func TestAllocateCandidatesNumReplicasConstraints(t *testing.T) { a.storePool.getLocalitiesByStore(existingRepls), func(context.Context, roachpb.StoreID) bool { return true }, false, /* allowMultipleReplsPerNode */ - a.scorerOptions(), + a.scorerOptions(ctx), ) best := candidates.best() match := true @@ -3948,10 +4132,11 @@ func TestRemoveCandidatesNumReplicasConstraints(t *testing.T) { // Check behavior in a span config where `voter_constraints` are empty. checkFn := voterConstraintsCheckerForRemoval(analyzed, constraint.EmptyAnalyzedConstraints) - candidates := candidateListForRemoval(sl, + candidates := candidateListForRemoval(ctx, + sl, checkFn, a.storePool.getLocalitiesByStore(existingRepls), - a.scorerOptions()) + a.scorerOptions(ctx)) if !expectedStoreIDsMatch(tc.expected, candidates.worst()) { t.Errorf("%d (with `constraints`): expected candidateListForRemoval(%v)"+ " = %v, but got %v\n for candidates %v", testIdx, tc.existing, tc.expected, @@ -3961,10 +4146,11 @@ func TestRemoveCandidatesNumReplicasConstraints(t *testing.T) { // Check that we'd see the same result if the same constraints were // specified as `voter_constraints`. checkFn = voterConstraintsCheckerForRemoval(constraint.EmptyAnalyzedConstraints, analyzed) - candidates = candidateListForRemoval(sl, + candidates = candidateListForRemoval(ctx, + sl, checkFn, a.storePool.getLocalitiesByStore(existingRepls), - a.scorerOptions()) + a.scorerOptions(ctx)) if !expectedStoreIDsMatch(tc.expected, candidates.worst()) { t.Errorf("%d (with `voter_constraints`): expected candidateListForRemoval(%v)"+ " = %v, but got %v\n for candidates %v", testIdx, tc.existing, tc.expected, @@ -4108,6 +4294,7 @@ func TestAllocatorRebalanceNonVoters(t *testing.T) { defer stopper.Stop(ctx) sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(test.stores, t) + // Enable read disk health checking in candidate exclusion. add, remove, _, ok := a.RebalanceNonVoter( ctx, test.conf, @@ -4116,7 +4303,7 @@ func TestAllocatorRebalanceNonVoters(t *testing.T) { test.existingNonVoters, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) if test.expectNoAction { require.True(t, !ok) @@ -4135,6 +4322,134 @@ func TestAllocatorRebalanceNonVoters(t *testing.T) { } } +// TestAllocatorRebalanceReadAmpCheck ensures that rebalancing voters: +// (1) Respects storeHealthEnforcement setting, by ignoring L0 Sublevels in +// rebalancing decisions when disabled or set to log only. +// (2) Considers L0 sublevels when set to rebalanceOnly or allocate in +// conjunction with the mean. +// (3) Does not attempt to rebalance off of the store when read amplification +// is high, as this setting is only used for filtering candidates. +func TestAllocatorRebalanceReadAmpCheck(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + type testCase struct { + name string + stores []*roachpb.StoreDescriptor + conf roachpb.SpanConfig + existingVoters []roachpb.ReplicaDescriptor + expectNoAction bool + expectedRemoveTargets, expectedAddTargets []roachpb.StoreID + enforcement storeHealthEnforcement + } + tests := []testCase{ + { + name: "don't move off of nodes with high read amp when storeHealthBlockRebalanceTo", + // NB: Store 1,2, 4 have okay read amp. Store 3 has high read amp. + // We expect high read amplifaction to only be considered for + // exlcuding targets, not for triggering rebalancing. + stores: threeStoresHighReadAmpAscRangeCount, + conf: emptySpanConfig(), + existingVoters: replicas(3, 1), + expectNoAction: true, + enforcement: storeHealthBlockRebalanceTo, + }, + { + name: "don't move off of nodes with high read amp when storeHealthBlockAll", + // NB: Store 1,2, 4 have okay read amp. Store 3 has high read amp. + // We expect high read amplifaction to only be considered for + // exlcuding targets, not for triggering rebalancing. + stores: threeStoresHighReadAmpAscRangeCount, + conf: emptySpanConfig(), + existingVoters: replicas(3, 1), + expectNoAction: true, + enforcement: storeHealthBlockAll, + }, + { + name: "don't take action when enforcement is not storeHealthNoAction", + // NB: Store 3 has L0Sublevels > threshold. Store 2 has 3 x higher + // ranges as other stores. Should move to candidate to 4, however + // enforcement for rebalancing is not enabled so will pick + // candidate 3 which has a lower range count. + stores: oneStoreHighReadAmp, + conf: emptySpanConfig(), + existingVoters: replicas(1, 2), + expectedRemoveTargets: []roachpb.StoreID{2}, + expectedAddTargets: []roachpb.StoreID{3}, + enforcement: storeHealthNoAction, + }, + { + name: "don't rebalance to nodes with high read amp when storeHealthBlockRebalanceTo enforcement", + // NB: Store 3 has L0Sublevels > threshold. Store 2 has 3 x higher + // ranges as other stores. Should move to candidate to 4, which + // doesn't have high read amp. + stores: oneStoreHighReadAmp, + conf: emptySpanConfig(), + existingVoters: replicas(1, 2), + expectedRemoveTargets: []roachpb.StoreID{2}, + expectedAddTargets: []roachpb.StoreID{4}, + enforcement: storeHealthBlockRebalanceTo, + }, + { + name: "don't rebalance to nodes with high read amp when storeHealthBlockAll enforcement", + // NB: Store 3 has L0Sublevels > threshold. Store 2 has 3 x higher + // ranges as other stores. Should move to candidate to 4, which + // doesn't have high read amp. + stores: oneStoreHighReadAmp, + conf: emptySpanConfig(), + existingVoters: replicas(1, 2), + expectedRemoveTargets: []roachpb.StoreID{2}, + expectedAddTargets: []roachpb.StoreID{4}, + enforcement: storeHealthBlockAll, + }, + } + + var rangeUsageInfo RangeUsageInfo + chk := func(target roachpb.ReplicationTarget, expectedCandidates []roachpb.StoreID) bool { + for _, candidate := range expectedCandidates { + if target.StoreID == candidate { + return true + } + } + return false + } + + for i, test := range tests { + t.Run(fmt.Sprintf("%d_%s", i+1, test.name), func(t *testing.T) { + stopper, g, _, a, _ := createTestAllocator(ctx, 10, true /* deterministic */) + defer stopper.Stop(ctx) + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(test.stores, t) + // Enable read disk health checking in candidate exclusion. + options := a.scorerOptions(ctx) + options.storeHealthOptions = storeHealthOptions{enforcementLevel: test.enforcement, l0SublevelThreshold: 20} + add, remove, _, ok := a.RebalanceVoter( + ctx, + test.conf, + nil, + test.existingVoters, + []roachpb.ReplicaDescriptor{}, + rangeUsageInfo, + storeFilterThrottled, + options, + ) + if test.expectNoAction { + require.True(t, !ok) + } else { + require.Truef(t, ok, "no action taken on range") + require.Truef(t, + chk(add, test.expectedAddTargets), + "the addition target %+v from RebalanceVoter doesn't match expectation", + add) + require.Truef(t, + chk(remove, test.expectedRemoveTargets), + "the removal target %+v from RebalanceVoter doesn't match expectation", + remove) + } + }) + } +} + // TestVotersCanRebalanceToNonVoterStores ensures that rebalancing of voting // replicas considers stores that have non-voters as feasible candidates. func TestVotersCanRebalanceToNonVoterStores(t *testing.T) { @@ -4173,7 +4488,7 @@ func TestVotersCanRebalanceToNonVoterStores(t *testing.T) { existingNonVoters, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) require.Truef(t, ok, "no action taken") @@ -4232,7 +4547,7 @@ func TestNonVotersCannotRebalanceToVoterStores(t *testing.T) { existingNonVoters, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) require.Falsef( @@ -5035,7 +5350,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { nil, a.storePool.getLocalitiesByStore(existingRepls), func(context.Context, roachpb.StoreID) bool { return true }, - a.scorerOptions(), + a.scorerOptions(ctx), a.metrics, ) match := true @@ -5067,7 +5382,7 @@ func TestRebalanceCandidatesNumReplicasConstraints(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) var found bool if !ok && len(tc.validTargets) == 0 { @@ -5456,7 +5771,7 @@ func TestAllocatorRemoveTargetBasedOnCapacity(t *testing.T) { replicas, replicas, nil, - a.scorerOptions(), + a.scorerOptions(ctx), ) if err != nil { t.Fatal(err) @@ -7296,7 +7611,7 @@ func TestAllocatorRebalanceWithScatter(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) require.False(t, ok) @@ -7309,7 +7624,7 @@ func TestAllocatorRebalanceWithScatter(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptionsForScatter(), + a.scorerOptionsForScatter(ctx), ) require.True(t, ok) } @@ -7423,7 +7738,7 @@ func TestAllocatorRebalanceAway(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - a.scorerOptions(), + a.scorerOptions(ctx), ) if tc.expected == nil && ok { @@ -7601,7 +7916,7 @@ func TestAllocatorFullDisks(t *testing.T) { nil, rangeUsageInfo, storeFilterThrottled, - alloc.scorerOptions(), + alloc.scorerOptions(ctx), ) if ok { if log.V(1) { @@ -7647,7 +7962,7 @@ func Example_rangeCountRebalancing() { nil, rangeUsageInfo, storeFilterThrottled, - alloc.scorerOptions(), + alloc.scorerOptions(ctx), ) if ok { log.Infof(ctx, "rebalancing to %v; details: %s", target, details) @@ -7741,6 +8056,7 @@ func qpsBasedRebalanceFn( avgQPS := candidate.Capacity.QueriesPerSecond / float64(candidate.Capacity.RangeCount) jitteredQPS := avgQPS * (1 + alloc.randGen.Float64()) opts := &qpsScorerOptions{ + storeHealthOptions: storeHealthOptions{enforcementLevel: storeHealthNoAction}, qpsPerReplica: jitteredQPS, qpsRebalanceThreshold: 0.2, } diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 6ca38491c386..23b5820e6311 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -293,6 +293,12 @@ var ( Measurement: "Keys/Sec", Unit: metric.Unit_COUNT, } + metaL0SubLevelHistogram = metric.Metadata{ + Name: "rebalancing.l0_sublevels_histogram", + Help: "The summary view of sub levels in level 0 of the stores LSM", + Measurement: "Storage", + Unit: metric.Unit_COUNT, + } // Metric for tracking follower reads. metaFollowerReadsCount = metric.Metadata{ @@ -1363,6 +1369,7 @@ type StoreMetrics struct { // Rebalancing metrics. AverageQueriesPerSecond *metric.GaugeFloat64 AverageWritesPerSecond *metric.GaugeFloat64 + L0SubLevelsHistogram *metric.Histogram // Follower read metrics. FollowerReadsCount *metric.Counter @@ -1809,6 +1816,12 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { // Rebalancing metrics. AverageQueriesPerSecond: metric.NewGaugeFloat64(metaAverageQueriesPerSecond), AverageWritesPerSecond: metric.NewGaugeFloat64(metaAverageWritesPerSecond), + L0SubLevelsHistogram: metric.NewHistogram( + metaL0SubLevelHistogram, + l0SublevelInterval, + l0SublevelMaxSampled, + 1, /* sig figures (integer) */ + ), // Follower reads metrics. FollowerReadsCount: metric.NewCounter(metaFollowerReadsCount), @@ -2064,6 +2077,7 @@ func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) { sm.RdbPendingCompaction.Update(int64(m.Compact.EstimatedDebt)) sm.RdbMarkedForCompactionFiles.Update(int64(m.Compact.MarkedFiles)) sm.RdbL0Sublevels.Update(int64(m.Levels[0].Sublevels)) + sm.L0SubLevelsHistogram.RecordValue(int64(m.Levels[0].Sublevels)) sm.RdbL0NumFiles.Update(m.Levels[0].NumFiles) sm.RdbNumSSTables.Update(m.NumSSTables()) sm.RdbWriteStalls.Update(m.WriteStallCount) diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index b1a14c47c724..3d925dc99ae7 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2990,7 +2990,7 @@ func (r *Replica) relocateOne( conf, existingVoters, existingNonVoters, - r.store.allocator.scorerOptions(), + r.store.allocator.scorerOptions(ctx), // NB: Allow the allocator to return target stores that might be on the // same node as an existing replica. This is to ensure that relocations // that require "lateral" movement of replicas within a node can succeed. @@ -3060,7 +3060,7 @@ func (r *Replica) relocateOne( existingVoters, existingNonVoters, args.targetType, - r.store.allocator.scorerOptions(), + r.store.allocator.scorerOptions(ctx), ) if err != nil { return nil, nil, errors.Wrapf( diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 77abb638c573..eb4d69aca1d2 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -431,7 +431,7 @@ func (rq *replicateQueue) shouldQueue( nonVoterReplicas, rangeUsageInfo, storeFilterThrottled, - rq.allocator.scorerOptions(), + rq.allocator.scorerOptions(ctx), ) if ok { log.VEventf(ctx, 2, "rebalance target found for voter, enqueuing") @@ -445,7 +445,7 @@ func (rq *replicateQueue) shouldQueue( nonVoterReplicas, rangeUsageInfo, storeFilterThrottled, - rq.allocator.scorerOptions(), + rq.allocator.scorerOptions(ctx), ) if ok { log.VEventf(ctx, 2, "rebalance target found for non-voter, enqueuing") @@ -1004,7 +1004,7 @@ func (rq *replicateQueue) findRemoveVoter( candidates, existingVoters, existingNonVoters, - rq.allocator.scorerOptions(), + rq.allocator.scorerOptions(ctx), ) } @@ -1119,7 +1119,7 @@ func (rq *replicateQueue) removeNonVoter( existingNonVoters, existingVoters, existingNonVoters, - rq.allocator.scorerOptions(), + rq.allocator.scorerOptions(ctx), ) if err != nil { return false, err @@ -1301,9 +1301,9 @@ func (rq *replicateQueue) considerRebalance( desc, conf := repl.DescAndSpanConfig() rebalanceTargetType := voterTarget - scorerOpts := scorerOptions(rq.allocator.scorerOptions()) + scorerOpts := scorerOptions(rq.allocator.scorerOptions(ctx)) if scatter { - scorerOpts = rq.allocator.scorerOptionsForScatter() + scorerOpts = rq.allocator.scorerOptionsForScatter(ctx) } if !rq.store.TestingKnobs().DisableReplicaRebalancing { rangeUsageInfo := rangeUsageInfoForRepl(repl) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index e9b1e588dc18..5cab1f373ae7 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2966,7 +2966,11 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa capacity.LogicalBytes = logicalBytes capacity.QueriesPerSecond = totalQueriesPerSecond capacity.WritesPerSecond = totalWritesPerSecond - capacity.L0Sublevels = s.metrics.RdbL0Sublevels.Value() + // We gossip the maximum number of L0 sub-levels that have been seen in + // past 2 windows. The recording length may vary between 5 and 10 minutes + // accordingly. + windowedL0Sublevels, _ := s.metrics.L0SubLevelsHistogram.Windowed() + capacity.L0Sublevels = windowedL0Sublevels.Max() capacity.BytesPerReplica = roachpb.PercentilesFromData(bytesPerReplica) capacity.WritesPerReplica = roachpb.PercentilesFromData(writesPerReplica) s.recordNewPerSecondStats(totalQueriesPerSecond, totalWritesPerSecond) diff --git a/pkg/kv/kvserver/store_pool.go b/pkg/kv/kvserver/store_pool.go index 751f59264aae..4ee6e95fd8c2 100644 --- a/pkg/kv/kvserver/store_pool.go +++ b/pkg/kv/kvserver/store_pool.go @@ -765,6 +765,10 @@ type StoreList struct { // candidateWritesPerSecond tracks writes-per-second stats for stores that are // eligible to be rebalance targets. candidateWritesPerSecond stat + + // candidateWritesPerSecond tracks L0 sub-level stats for stores that are + // eligible to be rebalance targets. + candidateL0Sublevels stat } // Generates a new store list based on the passed in descriptors. It will @@ -779,6 +783,7 @@ func makeStoreList(descriptors []roachpb.StoreDescriptor) StoreList { sl.candidateLogicalBytes.update(float64(desc.Capacity.LogicalBytes)) sl.candidateQueriesPerSecond.update(desc.Capacity.QueriesPerSecond) sl.candidateWritesPerSecond.update(desc.Capacity.WritesPerSecond) + sl.candidateL0Sublevels.update(float64(desc.Capacity.L0Sublevels)) } return sl } diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index fdf0f048ea1f..2079287cb4a2 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -249,8 +249,9 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { // `scorerOptions` here, which sets the range count rebalance threshold. // Instead, we use our own implementation of `scorerOptions` that promotes QPS // balance. -func (sr *StoreRebalancer) scorerOptions() *qpsScorerOptions { +func (sr *StoreRebalancer) scorerOptions(ctx context.Context) *qpsScorerOptions { return &qpsScorerOptions{ + storeHealthOptions: sr.rq.allocator.storeHealthOptions(ctx), deterministic: sr.rq.allocator.storePool.deterministic, qpsRebalanceThreshold: qpsRebalanceThreshold.Get(&sr.st.SV), minRequiredQPSDiff: minQPSDifferenceForTransfers.Get(&sr.st.SV), @@ -270,7 +271,7 @@ func (sr *StoreRebalancer) scorerOptions() *qpsScorerOptions { func (sr *StoreRebalancer) rebalanceStore( ctx context.Context, mode LBRebalancingMode, allStoresList StoreList, ) { - options := sr.scorerOptions() + options := sr.scorerOptions(ctx) var localDesc *roachpb.StoreDescriptor for i := range allStoresList.stores { if allStoresList.stores[i].StoreID == sr.rq.store.StoreID() { @@ -360,7 +361,7 @@ func (sr *StoreRebalancer) rebalanceStore( &replicasToMaybeRebalance, localDesc, allStoresList, - sr.scorerOptions(), + sr.scorerOptions(ctx), ) if replWithStats.repl == nil { log.Infof(ctx, diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go index 1f6e4e5f37da..ba3eed4b5a5c 100644 --- a/pkg/kv/kvserver/store_rebalancer_test.go +++ b/pkg/kv/kvserver/store_rebalancer_test.go @@ -53,6 +53,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 3000, + L0Sublevels: maxL0SublevelThreshold - 10, }, }, { @@ -70,6 +71,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 2800, + L0Sublevels: maxL0SublevelThreshold - 5, }, }, { @@ -87,6 +89,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 2600, + L0Sublevels: maxL0SublevelThreshold + 2, }, }, { @@ -104,6 +107,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 2400, + L0Sublevels: maxL0SublevelThreshold - 10, }, }, { @@ -121,6 +125,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 2200, + L0Sublevels: maxL0SublevelThreshold - 3, }, }, { @@ -138,6 +143,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 2000, + L0Sublevels: maxL0SublevelThreshold + 2, }, }, { @@ -155,6 +161,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1800, + L0Sublevels: maxL0SublevelThreshold - 10, }, }, { @@ -172,6 +179,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1600, + L0Sublevels: maxL0SublevelThreshold - 5, }, }, { @@ -189,6 +197,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1400, + L0Sublevels: maxL0SublevelThreshold + 3, }, }, } @@ -233,6 +242,188 @@ var ( }, }, } + + // noLocalityAscendingReadAmpStores specifies a set of stores identical to + // noLocalityStores, however they have ascending read + // amplification. Where store 1, store 2 and store 3 are below the + // threshold, whilst store 4 and store 5 are above. + noLocalityAscendingReadAmpStores = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1500, + L0Sublevels: maxL0SublevelThreshold - 15, + }, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1300, + L0Sublevels: maxL0SublevelThreshold - 10, + }, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1000, + L0Sublevels: maxL0SublevelThreshold - 5, + }, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 900, + L0Sublevels: maxL0SublevelThreshold + 20, + }, + }, + { + StoreID: 5, + Node: roachpb.NodeDescriptor{NodeID: 5}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 500, + L0Sublevels: maxL0SublevelThreshold + 25, + }, + }, + } + + // noLocalityUniformQPSHighReadAmp specifies a set of stores that are + // identical, except store 1 and 2 have high read amp. + noLocalityUniformQPSHighReadAmp = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1000, + L0Sublevels: maxL0SublevelThreshold + 100, + }, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1000, + L0Sublevels: maxL0SublevelThreshold - 15, + }, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1000, + L0Sublevels: maxL0SublevelThreshold + 100, + }, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1000, + L0Sublevels: maxL0SublevelThreshold - 15, + }, + }, + { + StoreID: 5, + Node: roachpb.NodeDescriptor{NodeID: 5}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1000, + L0Sublevels: maxL0SublevelThreshold + 100, + }, + }, + } + // noLocalityHighReadAmpStores specifies a set of stores identical to + // noLocalityStores, however they all have read amplification that exceeds + // the threshold. + noLocalityHighReadAmpStores = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1500, + L0Sublevels: maxL0SublevelThreshold + 1, + }, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1300, + L0Sublevels: maxL0SublevelThreshold + 1, + }, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1000, + L0Sublevels: maxL0SublevelThreshold + 1, + }, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 900, + L0Sublevels: maxL0SublevelThreshold + 1, + }, + }, + { + StoreID: 5, + Node: roachpb.NodeDescriptor{NodeID: 5}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 500, + L0Sublevels: maxL0SublevelThreshold + 1, + }, + }, + } + // noLocalityHighReadAmpSkewedStores specifies a set of stores identical to + // noLocalityStores, however they all have read amplification that exceeds + // the threshold in ascending order. + noLocalityHighReadAmpSkewedStores = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1500, + L0Sublevels: maxL0SublevelThreshold + 1, + }, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1300, + L0Sublevels: maxL0SublevelThreshold + 10, + }, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1000, + L0Sublevels: maxL0SublevelThreshold + 50, + }, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 900, + L0Sublevels: maxL0SublevelThreshold + 100, + }, + }, + { + StoreID: 5, + Node: roachpb.NodeDescriptor{NodeID: 5}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 500, + L0Sublevels: maxL0SublevelThreshold + 100, + }, + }, + } ) type testRange struct { @@ -632,6 +823,7 @@ func TestChooseRangeToRebalanceRandom(t *testing.T) { &localDesc, storeList, &qpsScorerOptions{ + storeHealthOptions: storeHealthOptions{enforcementLevel: storeHealthNoAction}, deterministic: false, qpsRebalanceThreshold: qpsRebalanceThreshold, }, @@ -725,21 +917,24 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { expRebalancedVoters, expRebalancedNonVoters []roachpb.StoreID }{ // All the replicas are already on the best possible stores. No - // rebalancing should be attempted. + // rebalancing should be attempted, note here that the high read + // amp of the current stores is ignored as it is not considered in + // moving a replica away from a store. { name: "no rebalance", voters: []roachpb.StoreID{3, 6, 9}, constraints: oneReplicaPerRegion, expRebalancedVoters: []roachpb.StoreID{}, }, - // A replica is in a heavily loaded region, on a relatively heavily loaded - // store. We expect it to be moved to a less busy store within the same - // region. + // A replica is in a heavily loaded region, on a relatively heavily + // loaded store. We expect it to be moved to a less busy store + // within the same region. However, it cannot be the least busy + // store as it has high read amp (3). { name: "rebalance one replica within heavy region", voters: []roachpb.StoreID{1, 6, 9}, constraints: oneReplicaPerRegion, - expRebalancedVoters: []roachpb.StoreID{9, 6, 3}, + expRebalancedVoters: []roachpb.StoreID{9, 6, 2}, }, // Two replicas are in the hot region, both on relatively heavily loaded // nodes. We expect one of those replicas to be moved to a less busy store @@ -761,22 +956,26 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { // maximized, replicas on hot stores are rebalanced to cooler stores within // the same region. { - // Within the hottest region, expect rebalance from the hottest node (n1) - // to the coolest node (n3). Within the lease hot region, we don't expect - // a rebalance from n8 to n9 because the qps difference between the two + // Within the hottest region, expect rebalance from the hottest + // node (n1) to the coolest node (n3), however since n3 has + // high read amp it should instead rebalance to n2. Within the + // lease hot region, we don't expect a rebalance from n8 to n9 + // because the qps difference between the two // stores is too small. name: "QPS balance without constraints", voters: []roachpb.StoreID{1, 5, 8}, - expRebalancedVoters: []roachpb.StoreID{8, 5, 3}, + expRebalancedVoters: []roachpb.StoreID{8, 5, 2}, }, { - // Within the second hottest region, expect rebalance from the hottest - // node (n4) to the coolest node (n6). Within the lease hot region, we - // don't expect a rebalance from n8 to n9 because the qps difference - // between the two stores is too small. + // Within the second hottest region, expect rebalance from the + // hottest node (n4) to the coolest node (n6), however since n6 + // has high read amp instead expect n5 to be selected. Within + // the lease hot region, we don't expect a rebalance from n8 to + // n9 because the qps difference between the two stores is too + // small. name: "QPS balance without constraints", voters: []roachpb.StoreID{8, 4, 3}, - expRebalancedVoters: []roachpb.StoreID{8, 6, 3}, + expRebalancedVoters: []roachpb.StoreID{8, 5, 3}, }, // Multi-region database configurations. @@ -790,9 +989,10 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { constraints: oneReplicaPerRegion, expRebalancedVoters: []roachpb.StoreID{3, 2, 1}, - // NB: Expect the non-voter on node 4 (hottest node in region B) to move - // to node 6 (least hot region in region B). - expRebalancedNonVoters: []roachpb.StoreID{6, 9}, + // NB: Expect the non-voter on node 4 (hottest node in region B) to + // move to node 5 (least hot region in region B), the least hot + // node without high read amp. + expRebalancedNonVoters: []roachpb.StoreID{5, 9}, }, { name: "primary region with second highest QPS, region survival, one voter on sub-optimal node", @@ -812,19 +1012,22 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { // constraints require at least one replica per each region. voterConstraints: twoReplicasInHotRegion, constraints: oneReplicaPerRegion, - // NB: We've got 3 voters in the hottest region, but we only need 2. We - // expect that one of the voters from the hottest region will be moved to - // the least hot region. Additionally, in region B, we've got one replica - // on store 4 (which is the hottest store in that region). We expect that - // replica to be moved to store 6. - expRebalancedVoters: []roachpb.StoreID{9, 2, 6, 8, 3}, + // NB: We've got 3 voters in the hottest region, but we only need + // 2. We expect that one of the voters from the hottest region + // will be moved to the least hot region. Additionally, in + // region B, we've got one replica on store 4 (which is the + // hottest store in that region). We expect that replica to be + // moved to store 5, which is the least hot node without high + // read amp. + expRebalancedVoters: []roachpb.StoreID{9, 2, 5, 8, 3}, }, { name: "one voter on sub-optimal node in the coldest region", voters: []roachpb.StoreID{5, 6, 7}, constraints: append(twoReplicasInSecondHottestRegion, oneReplicaInColdestRegion...), - // NB: Expect replica from node 7 to move to node 9. - expRebalancedVoters: []roachpb.StoreID{9, 5, 6}, + // NB: Expect replica from node 7 to move to node 8, despite node 9 + // having lower qps because node 9 exceeds the l0 sub-level threshold, + expRebalancedVoters: []roachpb.StoreID{8, 5, 6}, }, } for _, tc := range testCases { @@ -883,7 +1086,11 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { &hottestRanges, &localDesc, storeList, - &qpsScorerOptions{deterministic: true, qpsRebalanceThreshold: 0.05}, + &qpsScorerOptions{ + storeHealthOptions: storeHealthOptions{enforcementLevel: storeHealthBlockRebalanceTo}, + deterministic: true, + qpsRebalanceThreshold: 0.05, + }, ) require.Len(t, voterTargets, len(tc.expRebalancedVoters)) @@ -955,7 +1162,10 @@ func TestChooseRangeToRebalanceIgnoresRangeOnBestStores(t *testing.T) { loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{localDesc.StoreID}, qps: 100}}) hottestRanges := rr.topQPS() sr.chooseRangeToRebalance( - ctx, &hottestRanges, &localDesc, storeList, &qpsScorerOptions{qpsRebalanceThreshold: 0.05}, + ctx, &hottestRanges, &localDesc, storeList, &qpsScorerOptions{ + storeHealthOptions: storeHealthOptions{enforcementLevel: storeHealthNoAction}, + qpsRebalanceThreshold: 0.05, + }, ) trace := finishAndGetRecording() require.Regexpf( @@ -1117,7 +1327,11 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { &hottestRanges, &localDesc, storeList, - &qpsScorerOptions{deterministic: true, qpsRebalanceThreshold: tc.rebalanceThreshold}, + &qpsScorerOptions{ + storeHealthOptions: storeHealthOptions{enforcementLevel: storeHealthNoAction}, + deterministic: true, + qpsRebalanceThreshold: tc.rebalanceThreshold, + }, ) require.Len(t, voterTargets, len(tc.expRebalancedVoters)) @@ -1210,7 +1424,11 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { &hottestRanges, &localDesc, storeList, - &qpsScorerOptions{deterministic: true, qpsRebalanceThreshold: 0.05}, + &qpsScorerOptions{ + storeHealthOptions: storeHealthOptions{enforcementLevel: storeHealthNoAction}, + deterministic: true, + qpsRebalanceThreshold: 0.05, + }, ) expectTargets := []roachpb.ReplicationTarget{ {NodeID: 4, StoreID: 4}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, @@ -1220,3 +1438,167 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { targets, sr.getRaftStatusFn(repl), expectTargets) } } + +// TestStoreRebalancerReadAmpCheck checks that: +// - Under (1) disabled and (2) log that rebalancing decisions are unaffected +// by high read amplification. +// - Under (3) rebalanceOnly and (4) allocate that rebalance decisions exclude +// stores with high readamplification as candidate targets. +func TestStoreRebalancerReadAmpCheck(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + type testCase struct { + name string + stores []*roachpb.StoreDescriptor + conf roachpb.SpanConfig + expectedTargets []roachpb.ReplicationTarget + enforcement storeHealthEnforcement + } + tests := []testCase{ + { + name: "ignore read amp on allocation when storeHealthNoAction enforcement", + // NB: All stores have high read amp, this should be ignored. + stores: noLocalityHighReadAmpStores, + conf: emptySpanConfig(), + expectedTargets: []roachpb.ReplicationTarget{ + {NodeID: 4, StoreID: 4}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, + }, + enforcement: storeHealthNoAction, + }, + { + name: "ignore read amp on allocation when storeHealthLogOnly enforcement", + // NB: All stores have high read amp, this should be ignored. + stores: noLocalityHighReadAmpStores, + conf: emptySpanConfig(), + expectedTargets: []roachpb.ReplicationTarget{ + {NodeID: 4, StoreID: 4}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, + }, + enforcement: storeHealthLogOnly, + }, + { + name: "don't stop rebalancing when read amp uniformly above threshold and storeHealthBlockRebalanceTo enforcement", + // NB: All stores have high uniformly high read (threshold+1) this should be ignored. + stores: noLocalityHighReadAmpStores, + conf: emptySpanConfig(), + expectedTargets: []roachpb.ReplicationTarget{ + {NodeID: 4, StoreID: 4}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, + }, + enforcement: storeHealthBlockRebalanceTo, + }, + { + name: "don't stop rebalancing when read amp uniformly above threshold and storeHealthBlockRebalanceTo enforcement", + // NB: All stores have high uniformly high read (threshold+1) this should be ignored. + stores: noLocalityHighReadAmpStores, + conf: emptySpanConfig(), + expectedTargets: []roachpb.ReplicationTarget{ + {NodeID: 4, StoreID: 4}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, + }, + enforcement: storeHealthBlockAll, + }, + { + name: "rebalance should ignore stores with high read amp that are also above the mean when storeHealthBlockAll enforcement", + // NB: All stores have high read amp, however store 2 is below the mean read amp so is a viable candidate. + stores: noLocalityHighReadAmpSkewedStores, + conf: emptySpanConfig(), + expectedTargets: []roachpb.ReplicationTarget{ + {NodeID: 2, StoreID: 2}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, + }, + enforcement: storeHealthBlockAll, + }, + { + name: "rebalance should ignore stores with high read amp that are also above the mean when storeHealthBlockRebalanceTo enforcement", + // NB: All stores have high read amp, however store 2 is below the mean read amp so is a viable candidate. + stores: noLocalityHighReadAmpSkewedStores, + conf: emptySpanConfig(), + expectedTargets: []roachpb.ReplicationTarget{ + {NodeID: 2, StoreID: 2}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, + }, + enforcement: storeHealthBlockRebalanceTo, + }, + { + name: "rebalance should ignore stores with high read amp when storeHealthBlockRebalanceTo enforcement", + // NB: Store 4, 5 have high read amp, they should not be rebalance + // targets. Only 1,2,3 are valid targets, yet only 2 is not already + // a voter. 1 should transfer it's lease to 2. 5 could also have + // transferred its lease to 2, However, high read amp does not + // affect removing replicas from stores, only in blocking new + // replicas. + stores: noLocalityAscendingReadAmpStores, + conf: emptySpanConfig(), + expectedTargets: []roachpb.ReplicationTarget{ + {NodeID: 2, StoreID: 2}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, + }, + enforcement: storeHealthBlockRebalanceTo, + }, + { + name: "rebalance should ignore stores with high read amp when storeHealthBlockAll enforcement", + // NB: This scenario and result should be identical to storeHealthBlockRebalanceTo. + stores: noLocalityAscendingReadAmpStores, + conf: emptySpanConfig(), + expectedTargets: []roachpb.ReplicationTarget{ + {NodeID: 2, StoreID: 2}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, + }, + enforcement: storeHealthBlockAll, + }, + { + name: "rebalance should not rebalance away from stores with high read amp when storeHealthBlockAll enforcement", + // NB: Node 1,3,5 all have extremely high read amp. However, since + // read amp does not trigger rebalancing away, only blocking + // rebalancing to this should be ignored and no action taken. + stores: noLocalityUniformQPSHighReadAmp, + conf: emptySpanConfig(), + expectedTargets: nil, + enforcement: storeHealthBlockAll, + }, + { + name: "rebalance should not rebalance away from stores with high read amp when storeHealthBlockRebalanceTo enforcement", + // NB: Node 1,3,5 all have extremely high read amp. However, since + // read amp does not trigger rebalancing away, only blocking + // rebalancing to this should be ignored and no action taken. + stores: noLocalityUniformQPSHighReadAmp, + conf: emptySpanConfig(), + expectedTargets: nil, + enforcement: storeHealthBlockRebalanceTo, + }, + } + + for i, test := range tests { + t.Run(fmt.Sprintf("%d_%s", i+1, test.name), func(t *testing.T) { + stopper, g, _, a, _ := createTestAllocator(ctx, 10, false /* deterministic */) + defer stopper.Stop(ctx) + storeList, _, _ := a.storePool.getStoreList(storeFilterThrottled) + + localDesc := *noLocalityStores[0] + cfg := TestStoreConfig(nil) + cfg.Gossip = g + cfg.StorePool = a.storePool + s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) + gossiputil.NewStoreGossiper(cfg.Gossip).GossipStores(test.stores, t) + s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID} + rq := newReplicateQueue(s, a) + rr := newReplicaRankings() + + sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr) + + // Load in a range with replicas on an overfull node, a slightly underfull + // node, and a very underfull node. + loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100}}) + hottestRanges := rr.topQPS() + + _, targetVoters, _ := sr.chooseRangeToRebalance( + ctx, + &hottestRanges, + &localDesc, + storeList, + &qpsScorerOptions{ + storeHealthOptions: storeHealthOptions{enforcementLevel: test.enforcement, l0SublevelThreshold: maxL0SublevelThreshold}, + deterministic: true, + qpsRebalanceThreshold: 0.05, + }, + ) + require.Equal(t, test.expectedTargets, targetVoters) + }) + } +} diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 3f6710fe4e8d..5ef8be6020e1 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -648,6 +648,17 @@ var charts = []sectionDescription{ }, }, }, + { + Organization: [][]string{ + {DistributionLayer, "Rebalancing"}, + }, + Charts: []chartDescription{ + { + Title: "L0 sub-level rebalancing", + Metrics: []string{"rebalancing.l0_sublevels_histogram"}, + }, + }, + }, { Organization: [][]string{ {DistributionLayer, "Rebalancing"}, diff --git a/pkg/ts/catalog/metrics.go b/pkg/ts/catalog/metrics.go index 301f662c4608..8162ba4ede98 100644 --- a/pkg/ts/catalog/metrics.go +++ b/pkg/ts/catalog/metrics.go @@ -91,6 +91,7 @@ var histogramMetricsNames = map[string]struct{}{ "txnwaitqueue.query.wait_time": {}, "raft.process.applycommitted.latency": {}, "sql.stats.txn_stats_collection.duration": {}, + "rebalancing.l0_sublevels_histogram": {}, } func allInternalTSMetricsNames() []string {