From 40afa04147db37e6e2205201738667316b89e5d9 Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Sat, 7 Jul 2018 13:32:46 -0500 Subject: [PATCH] storage: make lease rebalancing decisions at the store level In order to better balance the QPS being served by each store to avoid overloaded nodes. Fixes #21419 Release note (performance improvement): Range leases will be automatically rebalanced throughout the cluster to even out the amount of QPS being handled by each node. --- docs/generated/settings/settings.html | 2 +- pkg/storage/allocator.go | 87 +++++++-- pkg/storage/allocator_scorer.go | 2 +- pkg/storage/allocator_test.go | 4 +- pkg/storage/replicate_queue.go | 38 ++-- pkg/storage/store.go | 8 +- pkg/storage/store_rebalancer.go | 256 ++++++++++++++++++++++++++ pkg/storage/store_rebalancer_test.go | 168 +++++++++++++++++ 8 files changed, 529 insertions(+), 36 deletions(-) create mode 100644 pkg/storage/store_rebalancer.go create mode 100644 pkg/storage/store_rebalancer_test.go diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 3e45f4d21c69..2e22496b8a8e 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -18,7 +18,7 @@ kv.allocator.lease_rebalancing_aggressivenessfloat1set greater than 1.0 to rebalance leases toward load more aggressively, or between 0 and 1.0 to be more conservative about rebalancing leases kv.allocator.load_based_lease_rebalancing.enabledbooleantrueset to enable rebalancing of range leases based on load and latency kv.allocator.range_rebalance_thresholdfloat0.05minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull -kv.allocator.stat_based_rebalancing.enabledbooleanfalseset to enable rebalancing of range replicas based on write load and disk usage +kv.allocator.stat_based_rebalancing.enabledbooleantrueset to enable rebalancing of range replicas based on write load and disk usage kv.allocator.stat_rebalance_thresholdfloat0.2minimum fraction away from the mean a store's stats (like disk usage or writes per second) can be before it is considered overfull or underfull kv.bulk_io_write.concurrent_export_requestsinteger5number of export requests a store will handle concurrently before queuing kv.bulk_io_write.concurrent_import_requestsinteger1number of import requests a store will handle concurrently before queuing diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index ea1cbbdbcccc..53738336ea6f 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -35,10 +35,16 @@ import ( ) const ( - // baseLeaseRebalanceThreshold is the minimum ratio of a store's lease surplus + // leaseRebalanceThreshold is the minimum ratio of a store's lease surplus // to the mean range/lease count that permits lease-transfers away from that // store. - baseLeaseRebalanceThreshold = 0.05 + leaseRebalanceThreshold = 0.05 + + // baseLoadBasedLeaseRebalanceThreshold is the equivalent of + // leaseRebalanceThreshold for load-based lease rebalance decisions (i.e. + // "follow-the-workload"). Its the base threshold for decisions that gets + // adjusted based on the load and latency of the involved ranges/nodes. + baseLoadBasedLeaseRebalanceThreshold = 2 * leaseRebalanceThreshold // minReplicaWeight sets a floor for how low a replica weight can be. This is // needed because a weight of zero doesn't work in the current lease scoring @@ -695,7 +701,7 @@ func (a *Allocator) TransferLeaseTarget( // whether we actually should be transferring the lease. The transfer // decision is only needed if we've been asked to check the source. transferDec, repl := a.shouldTransferLeaseUsingStats( - ctx, sl, source, existing, stats, + ctx, sl, source, existing, stats, nil, ) if checkTransferLeaseSource { switch transferDec { @@ -795,7 +801,7 @@ func (a *Allocator) ShouldTransferLease( return false } - transferDec, _ := a.shouldTransferLeaseUsingStats(ctx, sl, source, existing, stats) + transferDec, _ := a.shouldTransferLeaseUsingStats(ctx, sl, source, existing, stats, nil) var result bool switch transferDec { case shouldNotTransfer: @@ -812,12 +818,36 @@ func (a *Allocator) ShouldTransferLease( return result } +func (a Allocator) followTheWorkloadPrefersLocal( + ctx context.Context, + sl StoreList, + source roachpb.StoreDescriptor, + candidate roachpb.StoreID, + existing []roachpb.ReplicaDescriptor, + stats *replicaStats, +) bool { + adjustments := make(map[roachpb.StoreID]float64) + decision, _ := a.shouldTransferLeaseUsingStats(ctx, sl, source, existing, stats, adjustments) + if decision == decideWithoutStats { + return false + } + adjustment := adjustments[candidate] + if adjustment > baseLoadBasedLeaseRebalanceThreshold { + log.VEventf(ctx, 3, + "s%d is a better fit than s%d due to follow-the-workload (score: %.2f; threshold: %.2f)", + source.StoreID, candidate, adjustment, baseLoadBasedLeaseRebalanceThreshold) + return true + } + return false +} + func (a Allocator) shouldTransferLeaseUsingStats( ctx context.Context, sl StoreList, source roachpb.StoreDescriptor, existing []roachpb.ReplicaDescriptor, stats *replicaStats, + rebalanceAdjustments map[roachpb.StoreID]float64, ) (transferDecision, roachpb.ReplicaDescriptor) { // Only use load-based rebalancing if it's enabled and we have both // stats and locality information to base our decision on. @@ -884,7 +914,7 @@ func (a Allocator) shouldTransferLeaseUsingStats( } addr, err := a.storePool.gossip.GetNodeIDAddress(repl.NodeID) if err != nil { - log.Errorf(ctx, "missing address for node %d: %s", repl.NodeID, err) + log.Errorf(ctx, "missing address for n%d: %s", repl.NodeID, err) continue } remoteLatency, ok := a.nodeLatencyFn(addr.String()) @@ -893,20 +923,24 @@ func (a Allocator) shouldTransferLeaseUsingStats( } remoteWeight := math.Max(minReplicaWeight, replicaWeights[repl.NodeID]) - score := loadBasedLeaseRebalanceScore( + replScore, rebalanceAdjustment := loadBasedLeaseRebalanceScore( ctx, a.storePool.st, remoteWeight, remoteLatency, storeDesc, sourceWeight, source, sl.candidateLeases.mean) - if score > bestReplScore { - bestReplScore = score + if replScore > bestReplScore { + bestReplScore = replScore bestRepl = repl } + if rebalanceAdjustments != nil { + rebalanceAdjustments[repl.StoreID] = rebalanceAdjustment + } } - // Return the best replica even in cases where transferring is not advised in - // order to support forced lease transfers, such as when removing a replica or - // draining all leases before shutdown. if bestReplScore > 0 { return shouldTransfer, bestRepl } + + // Return the best replica even in cases where transferring is not advised in + // order to support forced lease transfers, such as when removing a replica or + // draining all leases before shutdown. return shouldNotTransfer, bestRepl } @@ -929,7 +963,7 @@ func (a Allocator) shouldTransferLeaseUsingStats( // logic behind each part of the formula is as follows: // // * LeaseRebalancingAggressiveness: Allow the aggressiveness to be tuned via -// an environment variable. +// a cluster setting. // * 0.1: Constant factor to reduce aggressiveness by default // * math.Log10(remoteWeight/sourceWeight): Comparison of the remote replica's // weight to the local replica's weight. Taking the log of the ratio instead @@ -944,6 +978,18 @@ func (a Allocator) shouldTransferLeaseUsingStats( // of the ideal number of leases on each store. We then calculate these to // compare how close each node is to its ideal state and use the differences // from the ideal state on each node to compute a final score. +// +// Returns a total score for the replica that takes into account the number of +// leases already on each store. Also returns the raw "adjustment" value that's +// purely based on replica weights and latency in order for the caller to +// determine how large a role the user's workload played in the decision. The +// adjustment value is positive if the remote store is preferred for load-based +// reasons or negative if the local store is preferred. The magnitude depends +// on the difference in load and the latency between the nodes. +// +// TODO(a-robinson): Should this be changed to avoid even thinking about lease +// counts now that we try to spread leases and replicas based on QPS? As is it +// may fight back a little bit against store-level QPS--based rebalancing. func loadBasedLeaseRebalanceScore( ctx context.Context, st *cluster.Settings, @@ -953,14 +999,14 @@ func loadBasedLeaseRebalanceScore( sourceWeight float64, source roachpb.StoreDescriptor, meanLeases float64, -) int32 { +) (int32, float64) { remoteLatencyMillis := float64(remoteLatency) / float64(time.Millisecond) rebalanceAdjustment := leaseRebalancingAggressiveness.Get(&st.SV) * 0.1 * math.Log10(remoteWeight/sourceWeight) * math.Log1p(remoteLatencyMillis) // Start with twice the base rebalance threshold in order to fight more // strongly against thrashing caused by small variances in the distribution // of request weights. - rebalanceThreshold := (2 * baseLeaseRebalanceThreshold) - rebalanceAdjustment + rebalanceThreshold := baseLoadBasedLeaseRebalanceThreshold - rebalanceAdjustment overfullLeaseThreshold := int32(math.Ceil(meanLeases * (1 + rebalanceThreshold))) overfullScore := source.Capacity.LeaseCount - overfullLeaseThreshold @@ -976,7 +1022,7 @@ func loadBasedLeaseRebalanceScore( rebalanceThreshold, meanLeases, source.Capacity.LeaseCount, overfullLeaseThreshold, remoteStore.Capacity.LeaseCount, underfullLeaseThreshold, totalScore, ) - return totalScore + return totalScore, rebalanceAdjustment } func (a Allocator) shouldTransferLeaseWithoutStats( @@ -985,9 +1031,14 @@ func (a Allocator) shouldTransferLeaseWithoutStats( source roachpb.StoreDescriptor, existing []roachpb.ReplicaDescriptor, ) bool { + // TODO(a-robinson): Should we disable this behavior when load-based lease + // rebalancing is enabled? In happy cases it's nice to keep this working + // to even out the number of leases in addition to the number of replicas, + // but it's certainly a blunt instrument that could undo what we want. + // Allow lease transfer if we're above the overfull threshold, which is - // mean*(1+baseLeaseRebalanceThreshold). - overfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 + baseLeaseRebalanceThreshold))) + // mean*(1+leaseRebalanceThreshold). + overfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 + leaseRebalanceThreshold))) minOverfullThreshold := int32(math.Ceil(sl.candidateLeases.mean + 5)) if overfullLeaseThreshold < minOverfullThreshold { overfullLeaseThreshold = minOverfullThreshold @@ -997,7 +1048,7 @@ func (a Allocator) shouldTransferLeaseWithoutStats( } if float64(source.Capacity.LeaseCount) > sl.candidateLeases.mean { - underfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 - baseLeaseRebalanceThreshold))) + underfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 - leaseRebalanceThreshold))) minUnderfullThreshold := int32(math.Ceil(sl.candidateLeases.mean - 5)) if underfullLeaseThreshold > minUnderfullThreshold { underfullLeaseThreshold = minUnderfullThreshold diff --git a/pkg/storage/allocator_scorer.go b/pkg/storage/allocator_scorer.go index 9d24fadef684..7dbde1a7ded0 100644 --- a/pkg/storage/allocator_scorer.go +++ b/pkg/storage/allocator_scorer.go @@ -60,7 +60,7 @@ const ( var EnableStatsBasedRebalancing = settings.RegisterBoolSetting( "kv.allocator.stat_based_rebalancing.enabled", "set to enable rebalancing of range replicas based on write load and disk usage", - false, + false, // TODO(a-robinson): switch to true for v2.1 once the store-rebalancer is done ) // rangeRebalanceThreshold is the minimum ratio of a store's range count to diff --git a/pkg/storage/allocator_test.go b/pkg/storage/allocator_test.go index 6fb8fb9688e9..f20491fc94e8 100644 --- a/pkg/storage/allocator_test.go +++ b/pkg/storage/allocator_test.go @@ -1248,6 +1248,7 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { defer leaktest.AfterTest(t)() stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false) + EnableStatsBasedRebalancing.Override(&a.storePool.st.SV, false) ctx := context.Background() defer stopper.Stop(ctx) @@ -2040,6 +2041,7 @@ func TestAllocatorRebalanceTargetLocality(t *testing.T) { defer leaktest.AfterTest(t)() stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false) + EnableStatsBasedRebalancing.Override(&a.storePool.st.SV, false) defer stopper.Stop(context.Background()) stores := []*roachpb.StoreDescriptor{ @@ -3817,7 +3819,7 @@ func TestLoadBasedLeaseRebalanceScore(t *testing.T) { for _, c := range testCases { remoteStore.Capacity.LeaseCount = c.remoteLeases sourceStore.Capacity.LeaseCount = c.sourceLeases - score := loadBasedLeaseRebalanceScore( + score, _ := loadBasedLeaseRebalanceScore( context.Background(), st, c.remoteWeight, diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go index 6ae97ae6bc5a..bc712c5cfe79 100644 --- a/pkg/storage/replicate_queue.go +++ b/pkg/storage/replicate_queue.go @@ -374,7 +374,7 @@ func (rq *replicateQueue) processOneChange( // out of situations where this store is overfull and yet holds all the // leases. The fullness checks need to be ignored for cases where // a replica needs to be removed for constraint violations. - transferred, err := rq.transferLease( + transferred, err := rq.findTargetAndTransferLease( ctx, repl, desc, @@ -419,7 +419,7 @@ func (rq *replicateQueue) processOneChange( if dryRun { return false, nil } - transferred, err := rq.transferLease( + transferred, err := rq.findTargetAndTransferLease( ctx, repl, desc, @@ -503,7 +503,7 @@ func (rq *replicateQueue) processOneChange( if canTransferLease() { // We require the lease in order to process replicas, so // repl.store.StoreID() corresponds to the lease-holder's store ID. - transferred, err := rq.transferLease( + transferred, err := rq.findTargetAndTransferLease( ctx, repl, desc, @@ -537,7 +537,7 @@ type transferLeaseOptions struct { dryRun bool } -func (rq *replicateQueue) transferLease( +func (rq *replicateQueue) findTargetAndTransferLease( ctx context.Context, repl *Replica, desc *roachpb.RangeDescriptor, @@ -556,25 +556,35 @@ func (rq *replicateQueue) transferLease( opts.checkCandidateFullness, false, /* alwaysAllowDecisionWithoutStats */ ); target != (roachpb.ReplicaDescriptor{}) { - rq.metrics.TransferLeaseCount.Inc(1) - log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID) if opts.dryRun { + log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID) return false, nil } - avgQPS, qpsMeasurementDur := repl.leaseholderStats.avgQPS() - if err := repl.AdminTransferLease(ctx, target.StoreID); err != nil { - return false, errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, target.StoreID) - } - rq.lastLeaseTransfer.Store(timeutil.Now()) - if qpsMeasurementDur >= MinStatsDuration { - rq.allocator.storePool.updateLocalStoresAfterLeaseTransfer( - repl.store.StoreID(), target.StoreID, avgQPS) + if err := rq.transferLease(ctx, repl, target); err != nil { + return false, err } return true, nil } return false, nil } +func (rq *replicateQueue) transferLease( + ctx context.Context, repl *Replica, target roachpb.ReplicaDescriptor, +) error { + rq.metrics.TransferLeaseCount.Inc(1) + log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID) + avgQPS, qpsMeasurementDur := repl.leaseholderStats.avgQPS() + if err := repl.AdminTransferLease(ctx, target.StoreID); err != nil { + return errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, target.StoreID) + } + rq.lastLeaseTransfer.Store(timeutil.Now()) + if qpsMeasurementDur >= MinStatsDuration { + rq.allocator.storePool.updateLocalStoresAfterLeaseTransfer( + repl.store.StoreID(), target.StoreID, avgQPS) + } + return nil +} + func (rq *replicateQueue) addReplica( ctx context.Context, repl *Replica, diff --git a/pkg/storage/store.go b/pkg/storage/store.go index e9fcfa26616b..855713a4b21e 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -361,6 +361,7 @@ type Store struct { tsCache tscache.Cache // Most recent timestamps for keys / key ranges allocator Allocator // Makes allocation decisions replRankings *replicaRankings + storeRebalancer *StoreRebalancer rangeIDAlloc *idalloc.Allocator // Range ID allocator gcQueue *gcQueue // Garbage collection queue mergeQueue *mergeQueue // Range merging queue @@ -987,6 +988,9 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript } } + s.storeRebalancer = NewStoreRebalancer( + s.cfg.AmbientCtx, cfg.Settings, s.replicateQueue, s.replRankings) + if cfg.TestingKnobs.DisableGCQueue { s.setGCQueueActive(false) } @@ -1119,7 +1123,7 @@ func (s *Store) SetDraining(drain bool) { log.Errorf(ctx, "could not get zone config for key %s when draining: %s", desc.StartKey, err) } } - leaseTransferred, err := s.replicateQueue.transferLease( + leaseTransferred, err := s.replicateQueue.findTargetAndTransferLease( ctx, r, desc, @@ -1509,6 +1513,8 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { s.startLeaseRenewer(ctx) } + s.storeRebalancer.Start(ctx, s.stopper, s.StoreID()) + // Start the storage engine compactor. if envutil.EnvOrDefaultBool("COCKROACH_ENABLE_COMPACTOR", true) { s.compactor.Start(s.AnnotateCtx(context.Background()), s.stopper) diff --git a/pkg/storage/store_rebalancer.go b/pkg/storage/store_rebalancer.go new file mode 100644 index 000000000000..a7cead998ceb --- /dev/null +++ b/pkg/storage/store_rebalancer.go @@ -0,0 +1,256 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package storage + +import ( + "context" + "math" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" +) + +const ( + // storeRebalancerTimerDuration is how frequently to check the store-level + // balance of the cluster. + storeRebalancerTimerDuration = time.Minute + + // minQPSThresholdDifference is the minimum QPS difference from the cluster + // mean that this system should care about. In other words, we won't worry + // about rebalancing for QPS reasons if a store's QPS differs from the mean + // by less than this amount even if the amount is greater than the percentage + // threshold. This avoids too many lease transfers in lightly loaded clusters. + minQPSThresholdDifference = 100 +) + +// StoreRebalancer is responsible for examining how the associated store's load +// compares to the load on other stores in the cluster and transferring leases +// or replicas away if the local store is overloaded. +type StoreRebalancer struct { + log.AmbientContext + st *cluster.Settings + rq *replicateQueue + replRankings *replicaRankings +} + +// NewStoreRebalancer creates a StoreRebalancer to work in tandem with the +// provided replicateQueue. +func NewStoreRebalancer( + ambientCtx log.AmbientContext, + st *cluster.Settings, + rq *replicateQueue, + replRankings *replicaRankings, +) *StoreRebalancer { + ambientCtx.AddLogTag("store-rebalancer", nil) + return &StoreRebalancer{ + AmbientContext: ambientCtx, + st: st, + rq: rq, + replRankings: replRankings, + } +} + +// Start runs an infinite loop in a goroutine which regularly checks whether +// the store is overloaded along any important dimension (e.g. range count, +// QPS, disk usage), and if so attempts to correct that by moving leases or +// replicas elsewhere. +// +// This worker acts on store-level imbalances, whereas the replicate queue +// makes decisions based on the zone config constraints and diversity of +// individual ranges. This means that there are two different workers that +// could potentially be making decisions about a given range, so they have to +// be careful to avoid stepping on each others' toes. +// +// TODO(a-robinson): Expose metrics to make this understandable without having +// to dive into logspy. +func (sr *StoreRebalancer) Start( + ctx context.Context, stopper *stop.Stopper, storeID roachpb.StoreID, +) { + ctx = sr.AnnotateCtx(ctx) + + // Start a goroutine that watches and proactively renews certain + // expiration-based leases. + stopper.RunWorker(ctx, func(ctx context.Context) { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + for { + // Wait out the first tick before doing anything since the store is still + // starting up and we might as well wait for some qps/wps stats to + // accumulate. + select { + case <-stopper.ShouldQuiesce(): + return + case <-ticker.C: + } + + if !EnableStatsBasedRebalancing.Get(&sr.st.SV) { + continue + } + + localDesc, found := sr.rq.allocator.storePool.getStoreDescriptor(storeID) + if !found { + log.Warningf(ctx, "StorePool missing descriptor for local store") + continue + } + storelist, _, _ := sr.rq.allocator.storePool.getStoreList(roachpb.RangeID(0), storeFilterNone) + sr.rebalanceStore(ctx, localDesc, storelist) + } + }) +} + +func (sr *StoreRebalancer) rebalanceStore( + ctx context.Context, localDesc roachpb.StoreDescriptor, storelist StoreList, +) { + + statThreshold := statRebalanceThreshold.Get(&sr.st.SV) + + // First check if we should transfer leases away to better balance QPS. + qpsMinThreshold := math.Min(storelist.candidateQueriesPerSecond.mean*(1-statThreshold), + storelist.candidateQueriesPerSecond.mean-minQPSThresholdDifference) + qpsMaxThreshold := math.Max(storelist.candidateQueriesPerSecond.mean*(1+statThreshold), + storelist.candidateQueriesPerSecond.mean+minQPSThresholdDifference) + + for localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold { + log.Infof(ctx, "considering load-based lease transfers for s%d with %.2f qps (mean=%.2f, upperThreshold=%2.f)", + localDesc.StoreID, localDesc.Capacity.QueriesPerSecond, storelist.candidateQueriesPerSecond.mean, qpsMaxThreshold) + storeMap := storeListToMap(storelist) + replWithStats, target := sr.chooseLeaseToTransfer( + ctx, localDesc, storelist, storeMap, qpsMinThreshold, qpsMaxThreshold) + if replWithStats.repl == nil { + log.Infof(ctx, + "ran out of leases worth transferring; qps (%.2f) still above desired threshold (%.2f)", + localDesc.Capacity.QueriesPerSecond, qpsMaxThreshold) + break + } + log.VEventf(ctx, 1, "transferring r%d (%.2f qps) to s%d to better balance load", + replWithStats.repl.RangeID, replWithStats.qps, target.StoreID) + replCtx := replWithStats.repl.AnnotateCtx(ctx) + if err := sr.rq.transferLease(replCtx, replWithStats.repl, target); err != nil { + log.Errorf(replCtx, "unable to transfer lease to s%d: %v", target.StoreID, err) + return + } + localDesc.Capacity.QueriesPerSecond -= replWithStats.qps + } +} + +// TODO(a-robinson): Should we take the number of leases on each store into +// account here or just continue to let that happen in allocator.go? +func (sr *StoreRebalancer) chooseLeaseToTransfer( + ctx context.Context, + localDesc roachpb.StoreDescriptor, + storelist StoreList, + storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor, + minQPS float64, + maxQPS float64, +) (replicaWithStats, roachpb.ReplicaDescriptor) { + sysCfg, cfgOk := sr.rq.allocator.storePool.gossip.GetSystemConfig() + if !cfgOk { + log.VEventf(ctx, 1, "no system config available, unable to choose a lease transfer target") + return replicaWithStats{}, roachpb.ReplicaDescriptor{} + } + + now := sr.rq.store.Clock().Now() + for { + replWithStats := sr.replRankings.topQPS() + + // We're all out of replicas. + if replWithStats.repl == nil { + return replicaWithStats{}, roachpb.ReplicaDescriptor{} + } + + if !replWithStats.repl.OwnsValidLease(now) { + log.VEventf(ctx, 3, "store doesn't own the lease for r%d", replWithStats.repl.RangeID) + continue + } + + if localDesc.Capacity.QueriesPerSecond-replWithStats.qps < minQPS { + log.VEventf(ctx, 3, "moving r%d's %.2f qps would bring s%d below the min threshold (%.2f)", + replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, minQPS) + continue + } + + // TODO: Don't bother moving leases whose QPS is below some fraction of the + // store's QPS. It's just unnecessary churn with no benefit to move leases + // responsible for, for example, 1 qps on a store with 5000 qps. + + desc := replWithStats.repl.Desc() + log.VEventf(ctx, 3, "considering lease transfer for r%d with %.2f qps", desc.RangeID, replWithStats.qps) + // TODO(a-robinson): Should we sort these to first examine the stores with lower QPS? + for _, candidate := range desc.Replicas { + if candidate.StoreID == localDesc.StoreID { + continue + } + storeDesc, ok := storeMap[candidate.StoreID] + if !ok { + log.VEventf(ctx, 3, "missing store descriptor for s%d", candidate.StoreID) + continue + } + + newCandidateQPS := storeDesc.Capacity.QueriesPerSecond + replWithStats.qps + if storeDesc.Capacity.QueriesPerSecond < minQPS { + if newCandidateQPS > maxQPS { + log.VEventf(ctx, 3, + "r%d's %.2f qps would push s%d over the max threshold (%.2f) with %.2f qps afterwards", + desc.RangeID, replWithStats.qps, candidate.StoreID, maxQPS, newCandidateQPS) + continue + } + } else if newCandidateQPS > storelist.candidateQueriesPerSecond.mean { + log.VEventf(ctx, 3, + "r%d's %.2f qps would push s%d over the mean (%.2f) with %.2f qps afterwards", + desc.RangeID, replWithStats.qps, candidate.StoreID, + storelist.candidateQueriesPerSecond.mean, newCandidateQPS) + continue + } + + zone, err := sysCfg.GetZoneConfigForKey(desc.StartKey) + if err != nil { + log.Error(ctx, err) + return replicaWithStats{}, roachpb.ReplicaDescriptor{} + } + preferred := sr.rq.allocator.preferredLeaseholders(zone, desc.Replicas) + if len(preferred) > 0 && !storeHasReplica(candidate.StoreID, preferred) { + log.VEventf(ctx, 3, "s%d not a preferred leaseholder; preferred: %v", candidate.StoreID, preferred) + continue + } + + filteredStorelist := storelist.filter(zone.Constraints) + if sr.rq.allocator.followTheWorkloadPrefersLocal( + ctx, + filteredStorelist, + localDesc, + candidate.StoreID, + desc.Replicas, + replWithStats.repl.leaseholderStats, + ) { + log.VEventf(ctx, 3, "r%d is on s%d due to follow-the-workload; skipping", + desc.RangeID, localDesc.StoreID) + continue + } + + return replWithStats, candidate + } + } +} + +func storeListToMap(sl StoreList) map[roachpb.StoreID]*roachpb.StoreDescriptor { + storeMap := make(map[roachpb.StoreID]*roachpb.StoreDescriptor) + for i := range sl.stores { + storeMap[sl.stores[i].StoreID] = &sl.stores[i] + } + return storeMap +} diff --git a/pkg/storage/store_rebalancer_test.go b/pkg/storage/store_rebalancer_test.go new file mode 100644 index 000000000000..0dcff0a49d52 --- /dev/null +++ b/pkg/storage/store_rebalancer_test.go @@ -0,0 +1,168 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package storage + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/gossip" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/stop" +) + +var ( + // noLocalityStores specifies a set of stores where one store is + // under-utilized in terms of QPS, three are in the middle, and one is + // over-utilized. + noLocalityStores = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1500, + }, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1100, + }, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1000, + }, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 900, + }, + }, + { + StoreID: 5, + Node: roachpb.NodeDescriptor{NodeID: 5}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 500, + }, + }, + } +) + +type testRange struct { + // The first storeID in the list will be the leaseholder. + storeIDs []roachpb.StoreID + qps float64 +} + +func loadRanges(rr *replicaRankings, s *Store, ranges []testRange) { + acc := rr.newAccumulator() + for _, r := range ranges { + repl := &Replica{store: s} + repl.mu.state.Desc = &roachpb.RangeDescriptor{} + for _, storeID := range r.storeIDs { + repl.mu.state.Desc.Replicas = append(repl.mu.state.Desc.Replicas, roachpb.ReplicaDescriptor{ + NodeID: roachpb.NodeID(storeID), + StoreID: storeID, + ReplicaID: roachpb.ReplicaID(storeID), + }) + } + repl.mu.state.Lease = &roachpb.Lease{ + Expiration: &hlc.MaxTimestamp, + Replica: repl.mu.state.Desc.Replicas[0], + } + acc.addReplica(replicaWithStats{ + repl: repl, + qps: r.qps, + }) + } + rr.update(acc) +} + +func TestChooseLeaseToTransfer(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false) + defer stopper.Stop(context.Background()) + gossiputil.NewStoreGossiper(g).GossipStores(noLocalityStores, t) + storelist, _, _ := a.storePool.getStoreList(firstRange, storeFilterThrottled) + storeMap := storeListToMap(storelist) + + const minQPS = 800 + const maxQPS = 1200 + + if err := a.storePool.gossip.AddInfoProto( + gossip.KeySystemConfig, &config.SystemConfig{}, 0, + ); err != nil { + t.Fatal(err) + } + + localDesc := *noLocalityStores[0] + cfg := TestStoreConfig(nil) + s := createTestStoreWithoutStart(t, stopper, &cfg) + s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID} + rq := newReplicateQueue(s, g, a) + rr := newReplicaRankings() + + sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr) + + testCases := []struct { + storeIDs []roachpb.StoreID + qps float64 + expectTarget roachpb.StoreID + }{ + {[]roachpb.StoreID{1}, 100, 0}, + {[]roachpb.StoreID{1, 2}, 100, 0}, + {[]roachpb.StoreID{1, 3}, 100, 0}, + {[]roachpb.StoreID{1, 4}, 100, 4}, + {[]roachpb.StoreID{1, 5}, 100, 5}, + {[]roachpb.StoreID{5, 1}, 100, 0}, + {[]roachpb.StoreID{1, 2}, 200, 0}, + {[]roachpb.StoreID{1, 3}, 200, 0}, + {[]roachpb.StoreID{1, 4}, 200, 0}, + {[]roachpb.StoreID{1, 5}, 200, 5}, + {[]roachpb.StoreID{1, 2}, 500, 0}, + {[]roachpb.StoreID{1, 3}, 500, 0}, + {[]roachpb.StoreID{1, 4}, 500, 0}, + {[]roachpb.StoreID{1, 5}, 500, 5}, + {[]roachpb.StoreID{1, 5}, 600, 5}, + {[]roachpb.StoreID{1, 5}, 700, 5}, + {[]roachpb.StoreID{1, 5}, 800, 0}, + } + + for _, tc := range testCases { + loadRanges(rr, s, []testRange{{storeIDs: tc.storeIDs, qps: tc.qps}}) + + _, target := sr.chooseLeaseToTransfer( + ctx, localDesc, storelist, storeMap, minQPS, maxQPS) + if target.StoreID != tc.expectTarget { + t.Errorf("got target store %d for range with replicas %v and %f qps; want %d", + target.StoreID, tc.storeIDs, tc.qps, tc.expectTarget) + } + } +}