From a11ff0f3e6d1ffd95be3eb0bd22460bb101ac2c1 Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Thu, 19 Jul 2018 16:14:11 -0500 Subject: [PATCH 1/4] storage: Track the top-k replicas by QPS in each store This makes it easier to pick replicas whose leases should be transferred when a store is overloaded. Release note: None --- pkg/storage/replica_rankings.go | 132 +++++++++++++++++++++++++++ pkg/storage/replica_rankings_test.go | 77 ++++++++++++++++ pkg/storage/store.go | 22 +++-- 3 files changed, 225 insertions(+), 6 deletions(-) create mode 100644 pkg/storage/replica_rankings.go create mode 100644 pkg/storage/replica_rankings_test.go diff --git a/pkg/storage/replica_rankings.go b/pkg/storage/replica_rankings.go new file mode 100644 index 000000000000..f6b560085d4e --- /dev/null +++ b/pkg/storage/replica_rankings.go @@ -0,0 +1,132 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package storage + +import ( + "container/heap" + + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +const ( + numTopReplicasToTrack = 128 +) + +type replicaWithStats struct { + repl *Replica + qps float64 +} + +// replicaRankings maintains top-k orderings of the replicas in a store along +// different dimensions of concern, such as QPS, keys written per second, and +// disk used. +type replicaRankings struct { + mu struct { + syncutil.Mutex + qpsAccumulator *rrAccumulator + byQPS []replicaWithStats + } +} + +func newReplicaRankings() *replicaRankings { + return &replicaRankings{} +} + +func (rr *replicaRankings) newAccumulator() *rrAccumulator { + res := &rrAccumulator{} + res.qps.val = func(r replicaWithStats) float64 { return r.qps } + return res +} + +func (rr *replicaRankings) update(acc *rrAccumulator) { + rr.mu.Lock() + rr.mu.qpsAccumulator = acc + rr.mu.Unlock() +} + +func (rr *replicaRankings) topQPS() []replicaWithStats { + rr.mu.Lock() + defer rr.mu.Unlock() + // If we have a new set of data, consume it. Otherwise, just return the most + // recently consumed data. + if rr.mu.qpsAccumulator.qps.Len() > 0 { + rr.mu.byQPS = consumeAccumulator(&rr.mu.qpsAccumulator.qps) + } + return rr.mu.byQPS +} + +// rrAccumulator is used to update the replicas tracked by replicaRankings. +// The typical pattern should be to call replicaRankings.newAccumulator, add +// all the replicas you care about to the accumulator using addReplica, then +// pass the accumulator back to the replicaRankings using the update method. +// This method of loading the new rankings all at once avoids interfering with +// any consumers that are concurrently reading from the rankings, and also +// prevents concurrent loaders of data from messing with each other -- the last +// `update`d accumulator will win. +type rrAccumulator struct { + qps rrPriorityQueue +} + +func (a *rrAccumulator) addReplica(repl replicaWithStats) { + // If the heap isn't full, just push the new replica and return. + if a.qps.Len() < numTopReplicasToTrack { + heap.Push(&a.qps, repl) + return + } + + // Otherwise, conditionally push if the new replica is more deserving than + // the current tip of the heap. + if repl.qps > a.qps.entries[0].qps { + heap.Pop(&a.qps) + heap.Push(&a.qps, repl) + } +} + +func consumeAccumulator(pq *rrPriorityQueue) []replicaWithStats { + length := pq.Len() + sorted := make([]replicaWithStats, length) + for i := 1; i <= length; i++ { + sorted[length-i] = heap.Pop(pq).(replicaWithStats) + } + return sorted +} + +type rrPriorityQueue struct { + entries []replicaWithStats + val func(replicaWithStats) float64 +} + +func (pq rrPriorityQueue) Len() int { return len(pq.entries) } + +func (pq rrPriorityQueue) Less(i, j int) bool { + return pq.val(pq.entries[i]) < pq.val(pq.entries[j]) +} + +func (pq rrPriorityQueue) Swap(i, j int) { + pq.entries[i], pq.entries[j] = pq.entries[j], pq.entries[i] +} + +func (pq *rrPriorityQueue) Push(x interface{}) { + item := x.(replicaWithStats) + pq.entries = append(pq.entries, item) +} + +func (pq *rrPriorityQueue) Pop() interface{} { + old := pq.entries + n := len(old) + item := old[n-1] + pq.entries = old[0 : n-1] + return item +} diff --git a/pkg/storage/replica_rankings_test.go b/pkg/storage/replica_rankings_test.go new file mode 100644 index 000000000000..80c007790337 --- /dev/null +++ b/pkg/storage/replica_rankings_test.go @@ -0,0 +1,77 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package storage + +import ( + "math/rand" + "reflect" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" +) + +func TestReplicaRankings(t *testing.T) { + defer leaktest.AfterTest(t)() + + rr := newReplicaRankings() + + testCases := []struct { + replicasByQPS []float64 + }{ + {replicasByQPS: []float64{}}, + {replicasByQPS: []float64{0}}, + {replicasByQPS: []float64{1, 0}}, + {replicasByQPS: []float64{3, 2, 1, 0}}, + {replicasByQPS: []float64{3, 3, 2, 2, 1, 1, 0, 0}}, + {replicasByQPS: []float64{1.1, 1.0, 0.9, -0.9, -1.0, -1.1}}, + } + + for _, tc := range testCases { + acc := rr.newAccumulator() + + // Randomize the order of the inputs each time the test is run. + want := make([]float64, len(tc.replicasByQPS)) + copy(want, tc.replicasByQPS) + rand.Shuffle(len(tc.replicasByQPS), func(i, j int) { + tc.replicasByQPS[i], tc.replicasByQPS[j] = tc.replicasByQPS[j], tc.replicasByQPS[i] + }) + + for i, replQPS := range tc.replicasByQPS { + acc.addReplica(replicaWithStats{ + repl: &Replica{RangeID: roachpb.RangeID(i)}, + qps: replQPS, + }) + } + rr.update(acc) + + // Make sure we can read off all expected replicas in the correct order. + repls := rr.topQPS() + if len(repls) != len(want) { + t.Errorf("wrong number of replicas in output; got: %v; want: %v", repls, tc.replicasByQPS) + continue + } + for i := range want { + if repls[i].qps != want[i] { + t.Errorf("got %f for %d'th element; want %f (input: %v)", repls[i].qps, i, want, tc.replicasByQPS) + break + } + } + replsCopy := rr.topQPS() + if !reflect.DeepEqual(repls, replsCopy) { + t.Errorf("got different replicas on second call to topQPS; first call: %v, second call: %v", repls, replsCopy) + } + } +} diff --git a/pkg/storage/store.go b/pkg/storage/store.go index afd22e83145f..bba4bac225ee 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -357,10 +357,11 @@ type Store struct { Ident *roachpb.StoreIdent // pointer to catch access before Start() is called cfg StoreConfig db *client.DB - engine engine.Engine // The underlying key-value store - compactor *compactor.Compactor // Schedules compaction of the engine - tsCache tscache.Cache // Most recent timestamps for keys / key ranges - allocator Allocator // Makes allocation decisions + engine engine.Engine // The underlying key-value store + compactor *compactor.Compactor // Schedules compaction of the engine + tsCache tscache.Cache // Most recent timestamps for keys / key ranges + allocator Allocator // Makes allocation decisions + replRankings *replicaRankings rangeIDAlloc *idalloc.Allocator // Range ID allocator gcQueue *gcQueue // Garbage collection queue mergeQueue *mergeQueue // Range merging queue @@ -904,6 +905,7 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript return 0, false }) } + s.replRankings = newReplicaRankings() s.intentResolver = newIntentResolver(s, cfg.IntentResolverTaskLimit) s.raftEntryCache = newRaftEntryCache(cfg.RaftEntryCacheSize) s.draining.Store(false) @@ -2716,6 +2718,7 @@ func (s *Store) Capacity(useCached bool) (roachpb.StoreCapacity, error) { replicaCount := s.metrics.ReplicaCount.Value() bytesPerReplica := make([]float64, 0, replicaCount) writesPerReplica := make([]float64, 0, replicaCount) + rankingsAccumulator := s.replRankings.newAccumulator() newStoreReplicaVisitor(s).Visit(func(r *Replica) bool { rangeCount++ if r.OwnsValidLease(now) { @@ -2728,14 +2731,20 @@ func (s *Store) Capacity(useCached bool) (roachpb.StoreCapacity, error) { // incorrectly low the first time or two it gets gossiped when a store // starts? We can't easily have a countdown as its value changes like for // leases/replicas. - if qps, dur := r.leaseholderStats.avgQPS(); dur >= MinStatsDuration { - totalQueriesPerSecond += qps + var qps float64 + if avgQPS, dur := r.leaseholderStats.avgQPS(); dur >= MinStatsDuration { + qps = avgQPS + totalQueriesPerSecond += avgQPS // TODO(a-robinson): Calculate percentiles for qps? Get rid of other percentiles? } if wps, dur := r.writeStats.avgQPS(); dur >= MinStatsDuration { totalWritesPerSecond += wps writesPerReplica = append(writesPerReplica, wps) } + rankingsAccumulator.addReplica(replicaWithStats{ + repl: r, + qps: qps, + }) return true }) capacity.RangeCount = rangeCount @@ -2746,6 +2755,7 @@ func (s *Store) Capacity(useCached bool) (roachpb.StoreCapacity, error) { capacity.BytesPerReplica = roachpb.PercentilesFromData(bytesPerReplica) capacity.WritesPerReplica = roachpb.PercentilesFromData(writesPerReplica) s.recordNewPerSecondStats(totalQueriesPerSecond, totalWritesPerSecond) + s.replRankings.update(rankingsAccumulator) s.cachedCapacity.Lock() s.cachedCapacity.StoreCapacity = capacity From c7772acb337535f19ef304d06c5e88c48d428a07 Mon Sep 17 00:00:00 2001 From: Andrew Kimball Date: Mon, 20 Aug 2018 11:09:57 -0700 Subject: [PATCH 2/4] opt: Fix rule cycle bug The PushFilterIntoJoinLeftAndRight and TryDecorrelateSelect rules can cycle with one another in a rare case: 1. Right side of join has outer column due to being un-decorrelatable. 2. Filter conjunct is pushed down to both left and right side by mapping equivalencies in PushFilterIntoJoinLeftAndRight. 3. Left conjunct is pulled back up to join condition by TryDecorrelateSelect. Steps #2 and #3 will cycle with one another. Cycle detection is not possible in this case, because the left side keeps changing (because new conjuct is pushed down to it each time). The fix is simple: don't let PushFilterIntoJoinLeftAndRight push down filters if either the left or right side has outer column(s). This fixes #28818. Release note: None --- pkg/sql/opt/norm/rules/join.opt | 13 +-- pkg/sql/opt/norm/testdata/rules/join | 125 +++++++++++++++++++++++++++ 2 files changed, 132 insertions(+), 6 deletions(-) diff --git a/pkg/sql/opt/norm/rules/join.opt b/pkg/sql/opt/norm/rules/join.opt index 6c182e23d664..0240f7aa1f55 100644 --- a/pkg/sql/opt/norm/rules/join.opt +++ b/pkg/sql/opt/norm/rules/join.opt @@ -49,17 +49,18 @@ # Given this mapping, we can safely push the filter down to both sides and # remove it from the ON filters list. # -# Note that this rule is only applied to InnerJoin and SemiJoin, not -# InnerJoinApply or SemiJoinApply. The apply variants would cause a -# non-detectable cycle with TryDecorrelateSelect, causing the filters to get -# remapped to both sides and pushed down over and over again. +# Note that this rule is only applied when the left and right inputs do not have +# outer columns. If they do, then this rule can cause undetectable cycles with +# TryDecorrelateSelect, since the filter is pushed down to both sides, but then +# only pulled up from the right side by TryDecorrelateSelect. For this reason, +# the rule also does not apply to InnerJoinApply or SemiJoinApply. # # NOTE: It is important that this rule is first among the join filter push-down # rules. [PushFilterIntoJoinLeftAndRight, Normalize] (InnerJoin | SemiJoin - $left:* - $right:* + $left:* & ^(HasOuterCols $left) + $right:* & ^(HasOuterCols $right) $filters:(Filters $list:[ ... diff --git a/pkg/sql/opt/norm/testdata/rules/join b/pkg/sql/opt/norm/testdata/rules/join index 072fe0629144..a21883ef155e 100644 --- a/pkg/sql/opt/norm/testdata/rules/join +++ b/pkg/sql/opt/norm/testdata/rules/join @@ -19,6 +19,24 @@ TABLE b └── INDEX primary └── x int not null +exec-ddl +CREATE TABLE xy (x INT PRIMARY KEY, y INT) +---- +TABLE xy + ├── x int not null + ├── y int + └── INDEX primary + └── x int not null + +exec-ddl +CREATE TABLE uv (u INT PRIMARY KEY, v INT) +---- +TABLE uv + ├── u int not null + ├── v int + └── INDEX primary + └── u int not null + # -------------------------------------------------- # EnsureJoinFiltersAnd # -------------------------------------------------- @@ -939,6 +957,113 @@ inner-join └── filters [type=bool, outer=(1,3), constraints=(/1: (/NULL - ]; /3: (/NULL - ]), fd=(1)==(3), (3)==(1)] └── a = b [type=bool, outer=(1,3), constraints=(/1: (/NULL - ]; /3: (/NULL - ])] +# Regression for issue 28818. Try to trigger undetectable cycle between the +# PushFilterIntoJoinLeftAndRight and TryDecorrelateSelect rules. +opt +SELECT 1 +FROM a +WHERE EXISTS ( + SELECT 1 + FROM xy + INNER JOIN uv + ON EXISTS ( + SELECT 1 + FROM b + WHERE a.s >= 'foo' + LIMIT 10 + ) + WHERE + (SELECT s FROM a) = 'foo' +) +---- +project + ├── columns: "?column?":22(int!null) + ├── fd: ()-->(22) + ├── distinct-on + │ ├── columns: a.k:1(int!null) + │ ├── grouping columns: a.k:1(int!null) + │ ├── key: (1) + │ └── select + │ ├── columns: a.k:1(int!null) xy.x:6(int!null) u:8(int!null) true_agg:14(bool!null) + │ ├── key: (1,6,8) + │ ├── fd: (1,6,8)-->(14) + │ ├── group-by + │ │ ├── columns: a.k:1(int!null) xy.x:6(int!null) u:8(int!null) true_agg:14(bool) + │ │ ├── grouping columns: a.k:1(int!null) xy.x:6(int!null) u:8(int!null) + │ │ ├── key: (1,6,8) + │ │ ├── fd: (1,6,8)-->(14) + │ │ ├── project + │ │ │ ├── columns: true:13(bool!null) a.k:1(int!null) xy.x:6(int!null) u:8(int!null) + │ │ │ ├── fd: ()-->(13) + │ │ │ ├── inner-join-apply + │ │ │ │ ├── columns: a.k:1(int!null) a.s:4(string) xy.x:6(int!null) u:8(int!null) + │ │ │ │ ├── fd: (1)-->(4) + │ │ │ │ ├── scan a + │ │ │ │ │ ├── columns: a.k:1(int!null) a.s:4(string) + │ │ │ │ │ ├── key: (1) + │ │ │ │ │ └── fd: (1)-->(4) + │ │ │ │ ├── inner-join + │ │ │ │ │ ├── columns: xy.x:6(int!null) u:8(int!null) + │ │ │ │ │ ├── outer: (4) + │ │ │ │ │ ├── inner-join + │ │ │ │ │ │ ├── columns: xy.x:6(int!null) u:8(int!null) + │ │ │ │ │ │ ├── key: (6,8) + │ │ │ │ │ │ ├── select + │ │ │ │ │ │ │ ├── columns: xy.x:6(int!null) + │ │ │ │ │ │ │ ├── key: (6) + │ │ │ │ │ │ │ ├── scan xy + │ │ │ │ │ │ │ │ ├── columns: xy.x:6(int!null) + │ │ │ │ │ │ │ │ └── key: (6) + │ │ │ │ │ │ │ └── filters [type=bool] + │ │ │ │ │ │ │ └── eq [type=bool] + │ │ │ │ │ │ │ ├── subquery [type=string] + │ │ │ │ │ │ │ │ └── max1-row + │ │ │ │ │ │ │ │ ├── columns: a.s:19(string) + │ │ │ │ │ │ │ │ ├── cardinality: [0 - 1] + │ │ │ │ │ │ │ │ ├── key: () + │ │ │ │ │ │ │ │ ├── fd: ()-->(19) + │ │ │ │ │ │ │ │ └── scan a + │ │ │ │ │ │ │ │ └── columns: a.s:19(string) + │ │ │ │ │ │ │ └── const: 'foo' [type=string] + │ │ │ │ │ │ ├── select + │ │ │ │ │ │ │ ├── columns: u:8(int!null) + │ │ │ │ │ │ │ ├── key: (8) + │ │ │ │ │ │ │ ├── scan uv + │ │ │ │ │ │ │ │ ├── columns: u:8(int!null) + │ │ │ │ │ │ │ │ └── key: (8) + │ │ │ │ │ │ │ └── filters [type=bool] + │ │ │ │ │ │ │ └── eq [type=bool] + │ │ │ │ │ │ │ ├── subquery [type=string] + │ │ │ │ │ │ │ │ └── max1-row + │ │ │ │ │ │ │ │ ├── columns: a.s:19(string) + │ │ │ │ │ │ │ │ ├── cardinality: [0 - 1] + │ │ │ │ │ │ │ │ ├── key: () + │ │ │ │ │ │ │ │ ├── fd: ()-->(19) + │ │ │ │ │ │ │ │ └── scan a + │ │ │ │ │ │ │ │ └── columns: a.s:19(string) + │ │ │ │ │ │ │ └── const: 'foo' [type=string] + │ │ │ │ │ │ └── true [type=bool] + │ │ │ │ │ ├── limit + │ │ │ │ │ │ ├── outer: (4) + │ │ │ │ │ │ ├── cardinality: [0 - 10] + │ │ │ │ │ │ ├── select + │ │ │ │ │ │ │ ├── outer: (4) + │ │ │ │ │ │ │ ├── scan b + │ │ │ │ │ │ │ └── filters [type=bool, outer=(4), constraints=(/4: [/'foo' - ]; tight)] + │ │ │ │ │ │ │ └── a.s >= 'foo' [type=bool, outer=(4), constraints=(/4: [/'foo' - ]; tight)] + │ │ │ │ │ │ └── const: 10 [type=int] + │ │ │ │ │ └── true [type=bool] + │ │ │ │ └── true [type=bool] + │ │ │ └── projections [outer=(1,6,8)] + │ │ │ └── true [type=bool] + │ │ └── aggregations [outer=(13)] + │ │ └── const-not-null-agg [type=bool, outer=(13)] + │ │ └── variable: true [type=bool, outer=(13)] + │ └── filters [type=bool, outer=(14), constraints=(/14: (/NULL - ]; tight)] + │ └── true_agg IS NOT NULL [type=bool, outer=(14), constraints=(/14: (/NULL - ]; tight)] + └── projections + └── const: 1 [type=int] + # -------------------------------------------------- # PushFilterIntoJoinLeft + PushFilterIntoJoinRight # -------------------------------------------------- From 256df1464281345d7835029b8b80ffa6bd5a7e83 Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Sat, 7 Jul 2018 13:32:46 -0500 Subject: [PATCH 3/4] storage: make lease rebalancing decisions at the store level In order to better balance the QPS being served by each store to avoid overloaded nodes. Fixes #21419 Release note (performance improvement): Range leases will be automatically rebalanced throughout the cluster to even out the amount of QPS being handled by each node. --- docs/generated/settings/settings.html | 2 +- pkg/storage/allocator.go | 87 ++++++-- pkg/storage/allocator_scorer.go | 4 +- pkg/storage/allocator_test.go | 4 +- pkg/storage/replica_rankings.go | 1 + pkg/storage/replicate_queue.go | 53 +++-- pkg/storage/store.go | 8 +- pkg/storage/store_rebalancer.go | 309 ++++++++++++++++++++++++++ pkg/storage/store_rebalancer_test.go | 165 ++++++++++++++ 9 files changed, 589 insertions(+), 44 deletions(-) create mode 100644 pkg/storage/store_rebalancer.go create mode 100644 pkg/storage/store_rebalancer_test.go diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 3e45f4d21c69..90db15bceb04 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -18,7 +18,7 @@ kv.allocator.lease_rebalancing_aggressivenessfloat1set greater than 1.0 to rebalance leases toward load more aggressively, or between 0 and 1.0 to be more conservative about rebalancing leases kv.allocator.load_based_lease_rebalancing.enabledbooleantrueset to enable rebalancing of range leases based on load and latency kv.allocator.range_rebalance_thresholdfloat0.05minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull -kv.allocator.stat_based_rebalancing.enabledbooleanfalseset to enable rebalancing of range replicas based on write load and disk usage +kv.allocator.stat_based_rebalancing.enabledbooleantrueset to enable rebalancing range replicas and leases to more evenly distribute read and write load across the stores in a cluster kv.allocator.stat_rebalance_thresholdfloat0.2minimum fraction away from the mean a store's stats (like disk usage or writes per second) can be before it is considered overfull or underfull kv.bulk_io_write.concurrent_export_requestsinteger5number of export requests a store will handle concurrently before queuing kv.bulk_io_write.concurrent_import_requestsinteger1number of import requests a store will handle concurrently before queuing diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index 55e3c37ec69f..b5d4b6216349 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -34,10 +34,16 @@ import ( ) const ( - // baseLeaseRebalanceThreshold is the minimum ratio of a store's lease surplus + // leaseRebalanceThreshold is the minimum ratio of a store's lease surplus // to the mean range/lease count that permits lease-transfers away from that // store. - baseLeaseRebalanceThreshold = 0.05 + leaseRebalanceThreshold = 0.05 + + // baseLoadBasedLeaseRebalanceThreshold is the equivalent of + // leaseRebalanceThreshold for load-based lease rebalance decisions (i.e. + // "follow-the-workload"). It's the base threshold for decisions that get + // adjusted based on the load and latency of the involved ranges/nodes. + baseLoadBasedLeaseRebalanceThreshold = 2 * leaseRebalanceThreshold // minReplicaWeight sets a floor for how low a replica weight can be. This is // needed because a weight of zero doesn't work in the current lease scoring @@ -714,7 +720,7 @@ func (a *Allocator) TransferLeaseTarget( // whether we actually should be transferring the lease. The transfer // decision is only needed if we've been asked to check the source. transferDec, repl := a.shouldTransferLeaseUsingStats( - ctx, sl, source, existing, stats, + ctx, sl, source, existing, stats, nil, ) if checkTransferLeaseSource { switch transferDec { @@ -814,7 +820,7 @@ func (a *Allocator) ShouldTransferLease( return false } - transferDec, _ := a.shouldTransferLeaseUsingStats(ctx, sl, source, existing, stats) + transferDec, _ := a.shouldTransferLeaseUsingStats(ctx, sl, source, existing, stats, nil) var result bool switch transferDec { case shouldNotTransfer: @@ -831,12 +837,36 @@ func (a *Allocator) ShouldTransferLease( return result } +func (a Allocator) followTheWorkloadPrefersLocal( + ctx context.Context, + sl StoreList, + source roachpb.StoreDescriptor, + candidate roachpb.StoreID, + existing []roachpb.ReplicaDescriptor, + stats *replicaStats, +) bool { + adjustments := make(map[roachpb.StoreID]float64) + decision, _ := a.shouldTransferLeaseUsingStats(ctx, sl, source, existing, stats, adjustments) + if decision == decideWithoutStats { + return false + } + adjustment := adjustments[candidate] + if adjustment > baseLoadBasedLeaseRebalanceThreshold { + log.VEventf(ctx, 3, + "s%d is a better fit than s%d due to follow-the-workload (score: %.2f; threshold: %.2f)", + source.StoreID, candidate, adjustment, baseLoadBasedLeaseRebalanceThreshold) + return true + } + return false +} + func (a Allocator) shouldTransferLeaseUsingStats( ctx context.Context, sl StoreList, source roachpb.StoreDescriptor, existing []roachpb.ReplicaDescriptor, stats *replicaStats, + rebalanceAdjustments map[roachpb.StoreID]float64, ) (transferDecision, roachpb.ReplicaDescriptor) { // Only use load-based rebalancing if it's enabled and we have both // stats and locality information to base our decision on. @@ -903,7 +933,7 @@ func (a Allocator) shouldTransferLeaseUsingStats( } addr, err := a.storePool.gossip.GetNodeIDAddress(repl.NodeID) if err != nil { - log.Errorf(ctx, "missing address for node %d: %s", repl.NodeID, err) + log.Errorf(ctx, "missing address for n%d: %s", repl.NodeID, err) continue } remoteLatency, ok := a.nodeLatencyFn(addr.String()) @@ -912,20 +942,24 @@ func (a Allocator) shouldTransferLeaseUsingStats( } remoteWeight := math.Max(minReplicaWeight, replicaWeights[repl.NodeID]) - score := loadBasedLeaseRebalanceScore( + replScore, rebalanceAdjustment := loadBasedLeaseRebalanceScore( ctx, a.storePool.st, remoteWeight, remoteLatency, storeDesc, sourceWeight, source, sl.candidateLeases.mean) - if score > bestReplScore { - bestReplScore = score + if replScore > bestReplScore { + bestReplScore = replScore bestRepl = repl } + if rebalanceAdjustments != nil { + rebalanceAdjustments[repl.StoreID] = rebalanceAdjustment + } } - // Return the best replica even in cases where transferring is not advised in - // order to support forced lease transfers, such as when removing a replica or - // draining all leases before shutdown. if bestReplScore > 0 { return shouldTransfer, bestRepl } + + // Return the best replica even in cases where transferring is not advised in + // order to support forced lease transfers, such as when removing a replica or + // draining all leases before shutdown. return shouldNotTransfer, bestRepl } @@ -948,7 +982,7 @@ func (a Allocator) shouldTransferLeaseUsingStats( // logic behind each part of the formula is as follows: // // * LeaseRebalancingAggressiveness: Allow the aggressiveness to be tuned via -// an environment variable. +// a cluster setting. // * 0.1: Constant factor to reduce aggressiveness by default // * math.Log10(remoteWeight/sourceWeight): Comparison of the remote replica's // weight to the local replica's weight. Taking the log of the ratio instead @@ -963,6 +997,18 @@ func (a Allocator) shouldTransferLeaseUsingStats( // of the ideal number of leases on each store. We then calculate these to // compare how close each node is to its ideal state and use the differences // from the ideal state on each node to compute a final score. +// +// Returns a total score for the replica that takes into account the number of +// leases already on each store. Also returns the raw "adjustment" value that's +// purely based on replica weights and latency in order for the caller to +// determine how large a role the user's workload played in the decision. The +// adjustment value is positive if the remote store is preferred for load-based +// reasons or negative if the local store is preferred. The magnitude depends +// on the difference in load and the latency between the nodes. +// +// TODO(a-robinson): Should this be changed to avoid even thinking about lease +// counts now that we try to spread leases and replicas based on QPS? As is it +// may fight back a little bit against store-level QPS-based rebalancing. func loadBasedLeaseRebalanceScore( ctx context.Context, st *cluster.Settings, @@ -972,14 +1018,14 @@ func loadBasedLeaseRebalanceScore( sourceWeight float64, source roachpb.StoreDescriptor, meanLeases float64, -) int32 { +) (int32, float64) { remoteLatencyMillis := float64(remoteLatency) / float64(time.Millisecond) rebalanceAdjustment := leaseRebalancingAggressiveness.Get(&st.SV) * 0.1 * math.Log10(remoteWeight/sourceWeight) * math.Log1p(remoteLatencyMillis) // Start with twice the base rebalance threshold in order to fight more // strongly against thrashing caused by small variances in the distribution // of request weights. - rebalanceThreshold := (2 * baseLeaseRebalanceThreshold) - rebalanceAdjustment + rebalanceThreshold := baseLoadBasedLeaseRebalanceThreshold - rebalanceAdjustment overfullLeaseThreshold := int32(math.Ceil(meanLeases * (1 + rebalanceThreshold))) overfullScore := source.Capacity.LeaseCount - overfullLeaseThreshold @@ -995,7 +1041,7 @@ func loadBasedLeaseRebalanceScore( rebalanceThreshold, meanLeases, source.Capacity.LeaseCount, overfullLeaseThreshold, remoteStore.Capacity.LeaseCount, underfullLeaseThreshold, totalScore, ) - return totalScore + return totalScore, rebalanceAdjustment } func (a Allocator) shouldTransferLeaseWithoutStats( @@ -1004,9 +1050,14 @@ func (a Allocator) shouldTransferLeaseWithoutStats( source roachpb.StoreDescriptor, existing []roachpb.ReplicaDescriptor, ) bool { + // TODO(a-robinson): Should we disable this behavior when load-based lease + // rebalancing is enabled? In happy cases it's nice to keep this working + // to even out the number of leases in addition to the number of replicas, + // but it's certainly a blunt instrument that could undo what we want. + // Allow lease transfer if we're above the overfull threshold, which is - // mean*(1+baseLeaseRebalanceThreshold). - overfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 + baseLeaseRebalanceThreshold))) + // mean*(1+leaseRebalanceThreshold). + overfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 + leaseRebalanceThreshold))) minOverfullThreshold := int32(math.Ceil(sl.candidateLeases.mean + 5)) if overfullLeaseThreshold < minOverfullThreshold { overfullLeaseThreshold = minOverfullThreshold @@ -1016,7 +1067,7 @@ func (a Allocator) shouldTransferLeaseWithoutStats( } if float64(source.Capacity.LeaseCount) > sl.candidateLeases.mean { - underfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 - baseLeaseRebalanceThreshold))) + underfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 - leaseRebalanceThreshold))) minUnderfullThreshold := int32(math.Ceil(sl.candidateLeases.mean - 5)) if underfullLeaseThreshold > minUnderfullThreshold { underfullLeaseThreshold = minUnderfullThreshold diff --git a/pkg/storage/allocator_scorer.go b/pkg/storage/allocator_scorer.go index 9d24fadef684..3a9155692bf4 100644 --- a/pkg/storage/allocator_scorer.go +++ b/pkg/storage/allocator_scorer.go @@ -59,8 +59,8 @@ const ( // If disabled, rebalancing is done purely based on replica count. var EnableStatsBasedRebalancing = settings.RegisterBoolSetting( "kv.allocator.stat_based_rebalancing.enabled", - "set to enable rebalancing of range replicas based on write load and disk usage", - false, + "set to enable rebalancing range replicas and leases to more evenly distribute read and write load across the stores in a cluster", + false, // TODO(a-robinson): switch to true for v2.1 once the store-rebalancer is done ) // rangeRebalanceThreshold is the minimum ratio of a store's range count to diff --git a/pkg/storage/allocator_test.go b/pkg/storage/allocator_test.go index d93f7b4d2fc4..1a3ae4e70d23 100644 --- a/pkg/storage/allocator_test.go +++ b/pkg/storage/allocator_test.go @@ -1248,6 +1248,7 @@ func TestAllocatorRebalanceDifferentLocalitySizes(t *testing.T) { defer leaktest.AfterTest(t)() stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false) + EnableStatsBasedRebalancing.Override(&a.storePool.st.SV, false) ctx := context.Background() defer stopper.Stop(ctx) @@ -2040,6 +2041,7 @@ func TestAllocatorRebalanceTargetLocality(t *testing.T) { defer leaktest.AfterTest(t)() stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false) + EnableStatsBasedRebalancing.Override(&a.storePool.st.SV, false) defer stopper.Stop(context.Background()) stores := []*roachpb.StoreDescriptor{ @@ -3817,7 +3819,7 @@ func TestLoadBasedLeaseRebalanceScore(t *testing.T) { for _, c := range testCases { remoteStore.Capacity.LeaseCount = c.remoteLeases sourceStore.Capacity.LeaseCount = c.sourceLeases - score := loadBasedLeaseRebalanceScore( + score, _ := loadBasedLeaseRebalanceScore( context.Background(), st, c.remoteWeight, diff --git a/pkg/storage/replica_rankings.go b/pkg/storage/replica_rankings.go index f6b560085d4e..13b4a99f582d 100644 --- a/pkg/storage/replica_rankings.go +++ b/pkg/storage/replica_rankings.go @@ -21,6 +21,7 @@ import ( ) const ( + // TODO(a-robinson): Scale this up based on the number of replicas on a store? numTopReplicasToTrack = 128 ) diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go index 6ae97ae6bc5a..5578ed1bb528 100644 --- a/pkg/storage/replicate_queue.go +++ b/pkg/storage/replicate_queue.go @@ -374,7 +374,7 @@ func (rq *replicateQueue) processOneChange( // out of situations where this store is overfull and yet holds all the // leases. The fullness checks need to be ignored for cases where // a replica needs to be removed for constraint violations. - transferred, err := rq.transferLease( + transferred, err := rq.findTargetAndTransferLease( ctx, repl, desc, @@ -419,7 +419,7 @@ func (rq *replicateQueue) processOneChange( if dryRun { return false, nil } - transferred, err := rq.transferLease( + transferred, err := rq.findTargetAndTransferLease( ctx, repl, desc, @@ -503,7 +503,7 @@ func (rq *replicateQueue) processOneChange( if canTransferLease() { // We require the lease in order to process replicas, so // repl.store.StoreID() corresponds to the lease-holder's store ID. - transferred, err := rq.transferLease( + transferred, err := rq.findTargetAndTransferLease( ctx, repl, desc, @@ -537,7 +537,7 @@ type transferLeaseOptions struct { dryRun bool } -func (rq *replicateQueue) transferLease( +func (rq *replicateQueue) findTargetAndTransferLease( ctx context.Context, repl *Replica, desc *roachpb.RangeDescriptor, @@ -545,7 +545,7 @@ func (rq *replicateQueue) transferLease( opts transferLeaseOptions, ) (bool, error) { candidates := filterBehindReplicas(repl.RaftStatus(), desc.Replicas, 0 /* brandNewReplicaID */) - if target := rq.allocator.TransferLeaseTarget( + target := rq.allocator.TransferLeaseTarget( ctx, zone, candidates, @@ -555,24 +555,35 @@ func (rq *replicateQueue) transferLease( opts.checkTransferLeaseSource, opts.checkCandidateFullness, false, /* alwaysAllowDecisionWithoutStats */ - ); target != (roachpb.ReplicaDescriptor{}) { - rq.metrics.TransferLeaseCount.Inc(1) + ) + if target == (roachpb.ReplicaDescriptor{}) { + return false, nil + } + + if opts.dryRun { log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID) - if opts.dryRun { - return false, nil - } - avgQPS, qpsMeasurementDur := repl.leaseholderStats.avgQPS() - if err := repl.AdminTransferLease(ctx, target.StoreID); err != nil { - return false, errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, target.StoreID) - } - rq.lastLeaseTransfer.Store(timeutil.Now()) - if qpsMeasurementDur >= MinStatsDuration { - rq.allocator.storePool.updateLocalStoresAfterLeaseTransfer( - repl.store.StoreID(), target.StoreID, avgQPS) - } - return true, nil + return false, nil } - return false, nil + + err := rq.transferLease(ctx, repl, target) + return err == nil, err +} + +func (rq *replicateQueue) transferLease( + ctx context.Context, repl *Replica, target roachpb.ReplicaDescriptor, +) error { + rq.metrics.TransferLeaseCount.Inc(1) + log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID) + avgQPS, qpsMeasurementDur := repl.leaseholderStats.avgQPS() + if err := repl.AdminTransferLease(ctx, target.StoreID); err != nil { + return errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, target.StoreID) + } + rq.lastLeaseTransfer.Store(timeutil.Now()) + if qpsMeasurementDur >= MinStatsDuration { + rq.allocator.storePool.updateLocalStoresAfterLeaseTransfer( + repl.store.StoreID(), target.StoreID, avgQPS) + } + return nil } func (rq *replicateQueue) addReplica( diff --git a/pkg/storage/store.go b/pkg/storage/store.go index bba4bac225ee..5f130e31475f 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -362,6 +362,7 @@ type Store struct { tsCache tscache.Cache // Most recent timestamps for keys / key ranges allocator Allocator // Makes allocation decisions replRankings *replicaRankings + storeRebalancer *StoreRebalancer rangeIDAlloc *idalloc.Allocator // Range ID allocator gcQueue *gcQueue // Garbage collection queue mergeQueue *mergeQueue // Range merging queue @@ -992,6 +993,9 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript } } + s.storeRebalancer = NewStoreRebalancer( + s.cfg.AmbientCtx, cfg.Settings, s.replicateQueue, s.replRankings) + if cfg.TestingKnobs.DisableGCQueue { s.setGCQueueActive(false) } @@ -1124,7 +1128,7 @@ func (s *Store) SetDraining(drain bool) { log.Errorf(ctx, "could not get zone config for key %s when draining: %s", desc.StartKey, err) } } - leaseTransferred, err := s.replicateQueue.transferLease( + leaseTransferred, err := s.replicateQueue.findTargetAndTransferLease( ctx, r, desc, @@ -1514,6 +1518,8 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { s.startLeaseRenewer(ctx) } + s.storeRebalancer.Start(ctx, s.stopper, s.StoreID()) + // Start the storage engine compactor. if envutil.EnvOrDefaultBool("COCKROACH_ENABLE_COMPACTOR", true) { s.compactor.Start(s.AnnotateCtx(context.Background()), s.stopper) diff --git a/pkg/storage/store_rebalancer.go b/pkg/storage/store_rebalancer.go new file mode 100644 index 000000000000..5be226f33073 --- /dev/null +++ b/pkg/storage/store_rebalancer.go @@ -0,0 +1,309 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package storage + +import ( + "context" + "math" + "sort" + "time" + + "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" +) + +const ( + // storeRebalancerTimerDuration is how frequently to check the store-level + // balance of the cluster. + storeRebalancerTimerDuration = time.Minute + + // minQPSThresholdDifference is the minimum QPS difference from the cluster + // mean that this system should care about. In other words, we won't worry + // about rebalancing for QPS reasons if a store's QPS differs from the mean + // by less than this amount even if the amount is greater than the percentage + // threshold. This avoids too many lease transfers in lightly loaded clusters. + minQPSThresholdDifference = 100 +) + +// StoreRebalancer is responsible for examining how the associated store's load +// compares to the load on other stores in the cluster and transferring leases +// or replicas away if the local store is overloaded. +// +// This isn't implemented as a Queue because the Queues all operate on one +// replica at a time, making a local decision about each replica. Queues don't +// really know how the replica they're looking at compares to other replicas on +// the store. Our goal is balancing stores, though, so it's preferable to make +// decisions about each store and then carefully pick replicas to move that +// will best accomplish the store-level goals. +type StoreRebalancer struct { + log.AmbientContext + st *cluster.Settings + rq *replicateQueue + replRankings *replicaRankings +} + +// NewStoreRebalancer creates a StoreRebalancer to work in tandem with the +// provided replicateQueue. +func NewStoreRebalancer( + ambientCtx log.AmbientContext, + st *cluster.Settings, + rq *replicateQueue, + replRankings *replicaRankings, +) *StoreRebalancer { + ambientCtx.AddLogTag("store-rebalancer", nil) + return &StoreRebalancer{ + AmbientContext: ambientCtx, + st: st, + rq: rq, + replRankings: replRankings, + } +} + +// Start runs an infinite loop in a goroutine which regularly checks whether +// the store is overloaded along any important dimension (e.g. range count, +// QPS, disk usage), and if so attempts to correct that by moving leases or +// replicas elsewhere. +// +// This worker acts on store-level imbalances, whereas the replicate queue +// makes decisions based on the zone config constraints and diversity of +// individual ranges. This means that there are two different workers that +// could potentially be making decisions about a given range, so they have to +// be careful to avoid stepping on each others' toes. +// +// TODO(a-robinson): Expose metrics to make this understandable without having +// to dive into logspy. +func (sr *StoreRebalancer) Start( + ctx context.Context, stopper *stop.Stopper, storeID roachpb.StoreID, +) { + ctx = sr.AnnotateCtx(ctx) + + // Start a goroutine that watches and proactively renews certain + // expiration-based leases. + stopper.RunWorker(ctx, func(ctx context.Context) { + ticker := time.NewTicker(storeRebalancerTimerDuration) + defer ticker.Stop() + for { + // Wait out the first tick before doing anything since the store is still + // starting up and we might as well wait for some qps/wps stats to + // accumulate. + select { + case <-stopper.ShouldQuiesce(): + return + case <-ticker.C: + } + + if !EnableStatsBasedRebalancing.Get(&sr.st.SV) { + continue + } + + localDesc, found := sr.rq.allocator.storePool.getStoreDescriptor(storeID) + if !found { + log.Warningf(ctx, "StorePool missing descriptor for local store") + continue + } + storeList, _, _ := sr.rq.allocator.storePool.getStoreList(roachpb.RangeID(0), storeFilterNone) + sr.rebalanceStore(ctx, localDesc, storeList) + } + }) +} + +func (sr *StoreRebalancer) rebalanceStore( + ctx context.Context, localDesc roachpb.StoreDescriptor, storeList StoreList, +) { + + statThreshold := statRebalanceThreshold.Get(&sr.st.SV) + + // First check if we should transfer leases away to better balance QPS. + qpsMinThreshold := math.Min(storeList.candidateQueriesPerSecond.mean*(1-statThreshold), + storeList.candidateQueriesPerSecond.mean-minQPSThresholdDifference) + qpsMaxThreshold := math.Max(storeList.candidateQueriesPerSecond.mean*(1+statThreshold), + storeList.candidateQueriesPerSecond.mean+minQPSThresholdDifference) + + if !(localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold) { + log.VEventf(ctx, 1, "local QPS %.2f is below max threshold %.2f (mean=%.2f); no rebalancing needed", + localDesc.Capacity.QueriesPerSecond, qpsMaxThreshold, storeList.candidateQueriesPerSecond.mean) + return + } + + storeMap := storeListToMap(storeList) + sysCfg, cfgOk := sr.rq.allocator.storePool.gossip.GetSystemConfig() + if !cfgOk { + log.VEventf(ctx, 1, "no system config available, unable to choose lease transfer targets") + return + } + + log.Infof(ctx, "considering load-based lease transfers for s%d with %.2f qps (mean=%.2f, upperThreshold=%.2f)", + localDesc.StoreID, localDesc.Capacity.QueriesPerSecond, storeList.candidateQueriesPerSecond.mean, qpsMaxThreshold) + + hottestRanges := sr.replRankings.topQPS() + for localDesc.Capacity.QueriesPerSecond > qpsMaxThreshold { + replWithStats, target := sr.chooseLeaseToTransfer( + ctx, sysCfg, &hottestRanges, localDesc, storeList, storeMap, qpsMinThreshold, qpsMaxThreshold) + if replWithStats.repl == nil { + log.Infof(ctx, + "ran out of leases worth transferring and qps (%.2f) is still above desired threshold (%.2f)", + localDesc.Capacity.QueriesPerSecond, qpsMaxThreshold) + break + } + log.VEventf(ctx, 1, "transferring r%d (%.2f qps) to s%d to better balance load", + replWithStats.repl.RangeID, replWithStats.qps, target.StoreID) + replCtx := replWithStats.repl.AnnotateCtx(ctx) + if err := sr.rq.transferLease(replCtx, replWithStats.repl, target); err != nil { + log.Errorf(replCtx, "unable to transfer lease to s%d: %v", target.StoreID, err) + continue + } + // Finally, update our local copies of the descriptors so that if + // additional transfers are needed we'll be making the decisions with more + // up-to-date info. + localDesc.Capacity.LeaseCount-- + localDesc.Capacity.QueriesPerSecond -= replWithStats.qps + if otherDesc := storeMap[target.StoreID]; otherDesc != nil { + otherDesc.Capacity.LeaseCount++ + otherDesc.Capacity.QueriesPerSecond += replWithStats.qps + } + } +} + +// TODO(a-robinson): Should we take the number of leases on each store into +// account here or just continue to let that happen in allocator.go? +func (sr *StoreRebalancer) chooseLeaseToTransfer( + ctx context.Context, + sysCfg config.SystemConfig, + hottestRanges *[]replicaWithStats, + localDesc roachpb.StoreDescriptor, + storeList StoreList, + storeMap map[roachpb.StoreID]*roachpb.StoreDescriptor, + minQPS float64, + maxQPS float64, +) (replicaWithStats, roachpb.ReplicaDescriptor) { + now := sr.rq.store.Clock().Now() + for { + if len(*hottestRanges) == 0 { + return replicaWithStats{}, roachpb.ReplicaDescriptor{} + } + replWithStats := (*hottestRanges)[0] + *hottestRanges = (*hottestRanges)[1:] + + // We're all out of replicas. + if replWithStats.repl == nil { + return replicaWithStats{}, roachpb.ReplicaDescriptor{} + } + + if !replWithStats.repl.OwnsValidLease(now) { + log.VEventf(ctx, 3, "store doesn't own the lease for r%d", replWithStats.repl.RangeID) + continue + } + + if localDesc.Capacity.QueriesPerSecond-replWithStats.qps < minQPS { + log.VEventf(ctx, 3, "moving r%d's %.2f qps would bring s%d below the min threshold (%.2f)", + replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, minQPS) + continue + } + + // Don't bother moving leases whose QPS is below some small fraction of the + // store's QPS (unless the store has extra leases to spare anyway). It's + // just unnecessary churn with no benefit to move leases responsible for, + // for example, 1 qps on a store with 5000 qps. + const minQPSFraction = .001 + if replWithStats.qps < localDesc.Capacity.QueriesPerSecond*minQPSFraction && + float64(localDesc.Capacity.LeaseCount) <= storeList.candidateLeases.mean { + log.VEventf(ctx, 5, "r%d's %.2f qps is too little to matter relative to s%d's %.2f total qps", + replWithStats.repl.RangeID, replWithStats.qps, localDesc.StoreID, localDesc.Capacity.QueriesPerSecond) + continue + } + + desc := replWithStats.repl.Desc() + log.VEventf(ctx, 3, "considering lease transfer for r%d with %.2f qps", desc.RangeID, replWithStats.qps) + + // Check all the other replicas in order of increasing qps. + replicas := make([]roachpb.ReplicaDescriptor, len(desc.Replicas)) + copy(replicas, desc.Replicas) + sort.Slice(replicas, func(i, j int) bool { + var iQPS, jQPS float64 + if desc := storeMap[replicas[i].StoreID]; desc != nil { + iQPS = desc.Capacity.QueriesPerSecond + } + if desc := storeMap[replicas[j].StoreID]; desc != nil { + jQPS = desc.Capacity.QueriesPerSecond + } + return iQPS < jQPS + }) + + for _, candidate := range replicas { + if candidate.StoreID == localDesc.StoreID { + continue + } + storeDesc, ok := storeMap[candidate.StoreID] + if !ok { + log.VEventf(ctx, 3, "missing store descriptor for s%d", candidate.StoreID) + continue + } + + newCandidateQPS := storeDesc.Capacity.QueriesPerSecond + replWithStats.qps + if storeDesc.Capacity.QueriesPerSecond < minQPS { + if newCandidateQPS > maxQPS { + log.VEventf(ctx, 3, + "r%d's %.2f qps would push s%d over the max threshold (%.2f) with %.2f qps afterwards", + desc.RangeID, replWithStats.qps, candidate.StoreID, maxQPS, newCandidateQPS) + continue + } + } else if newCandidateQPS > storeList.candidateQueriesPerSecond.mean { + log.VEventf(ctx, 3, + "r%d's %.2f qps would push s%d over the mean (%.2f) with %.2f qps afterwards", + desc.RangeID, replWithStats.qps, candidate.StoreID, + storeList.candidateQueriesPerSecond.mean, newCandidateQPS) + continue + } + + zone, err := sysCfg.GetZoneConfigForKey(desc.StartKey) + if err != nil { + log.Error(ctx, err) + return replicaWithStats{}, roachpb.ReplicaDescriptor{} + } + preferred := sr.rq.allocator.preferredLeaseholders(zone, desc.Replicas) + if len(preferred) > 0 && !storeHasReplica(candidate.StoreID, preferred) { + log.VEventf(ctx, 3, "s%d not a preferred leaseholder; preferred: %v", candidate.StoreID, preferred) + continue + } + + filteredStoreList := storeList.filter(zone.Constraints) + if sr.rq.allocator.followTheWorkloadPrefersLocal( + ctx, + filteredStoreList, + localDesc, + candidate.StoreID, + desc.Replicas, + replWithStats.repl.leaseholderStats, + ) { + log.VEventf(ctx, 3, "r%d is on s%d due to follow-the-workload; skipping", + desc.RangeID, localDesc.StoreID) + continue + } + + return replWithStats, candidate + } + } +} + +func storeListToMap(sl StoreList) map[roachpb.StoreID]*roachpb.StoreDescriptor { + storeMap := make(map[roachpb.StoreID]*roachpb.StoreDescriptor) + for i := range sl.stores { + storeMap[sl.stores[i].StoreID] = &sl.stores[i] + } + return storeMap +} diff --git a/pkg/storage/store_rebalancer_test.go b/pkg/storage/store_rebalancer_test.go new file mode 100644 index 000000000000..bec59923cf9d --- /dev/null +++ b/pkg/storage/store_rebalancer_test.go @@ -0,0 +1,165 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package storage + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/stop" +) + +var ( + // noLocalityStores specifies a set of stores where one store is + // under-utilized in terms of QPS, three are in the middle, and one is + // over-utilized. + noLocalityStores = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1500, + }, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1100, + }, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 1000, + }, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 900, + }, + }, + { + StoreID: 5, + Node: roachpb.NodeDescriptor{NodeID: 5}, + Capacity: roachpb.StoreCapacity{ + QueriesPerSecond: 500, + }, + }, + } +) + +type testRange struct { + // The first storeID in the list will be the leaseholder. + storeIDs []roachpb.StoreID + qps float64 +} + +func loadRanges(rr *replicaRankings, s *Store, ranges []testRange) { + acc := rr.newAccumulator() + for _, r := range ranges { + repl := &Replica{store: s} + repl.mu.state.Desc = &roachpb.RangeDescriptor{} + for _, storeID := range r.storeIDs { + repl.mu.state.Desc.Replicas = append(repl.mu.state.Desc.Replicas, roachpb.ReplicaDescriptor{ + NodeID: roachpb.NodeID(storeID), + StoreID: storeID, + ReplicaID: roachpb.ReplicaID(storeID), + }) + } + repl.mu.state.Lease = &roachpb.Lease{ + Expiration: &hlc.MaxTimestamp, + Replica: repl.mu.state.Desc.Replicas[0], + } + acc.addReplica(replicaWithStats{ + repl: repl, + qps: r.qps, + }) + } + rr.update(acc) +} + +func TestChooseLeaseToTransfer(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false) + defer stopper.Stop(context.Background()) + gossiputil.NewStoreGossiper(g).GossipStores(noLocalityStores, t) + storeList, _, _ := a.storePool.getStoreList(firstRange, storeFilterThrottled) + storeMap := storeListToMap(storeList) + + const minQPS = 800 + const maxQPS = 1200 + + localDesc := *noLocalityStores[0] + cfg := TestStoreConfig(nil) + s := createTestStoreWithoutStart(t, stopper, &cfg) + s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID} + rq := newReplicateQueue(s, g, a) + rr := newReplicaRankings() + + sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr) + + testCases := []struct { + storeIDs []roachpb.StoreID + qps float64 + expectTarget roachpb.StoreID + }{ + {[]roachpb.StoreID{1}, 100, 0}, + {[]roachpb.StoreID{1, 2}, 100, 0}, + {[]roachpb.StoreID{1, 3}, 100, 0}, + {[]roachpb.StoreID{1, 4}, 100, 4}, + {[]roachpb.StoreID{1, 5}, 100, 5}, + {[]roachpb.StoreID{5, 1}, 100, 0}, + {[]roachpb.StoreID{1, 2}, 200, 0}, + {[]roachpb.StoreID{1, 3}, 200, 0}, + {[]roachpb.StoreID{1, 4}, 200, 0}, + {[]roachpb.StoreID{1, 5}, 200, 5}, + {[]roachpb.StoreID{1, 2}, 500, 0}, + {[]roachpb.StoreID{1, 3}, 500, 0}, + {[]roachpb.StoreID{1, 4}, 500, 0}, + {[]roachpb.StoreID{1, 5}, 500, 5}, + {[]roachpb.StoreID{1, 5}, 600, 5}, + {[]roachpb.StoreID{1, 5}, 700, 5}, + {[]roachpb.StoreID{1, 5}, 800, 0}, + {[]roachpb.StoreID{1, 4}, 1.5, 4}, + {[]roachpb.StoreID{1, 5}, 1.5, 5}, + {[]roachpb.StoreID{1, 4}, 1.49, 0}, + {[]roachpb.StoreID{1, 5}, 1.49, 0}, + } + + for _, tc := range testCases { + loadRanges(rr, s, []testRange{{storeIDs: tc.storeIDs, qps: tc.qps}}) + hottestRanges := rr.topQPS() + _, target := sr.chooseLeaseToTransfer( + ctx, config.SystemConfig{}, &hottestRanges, localDesc, storeList, storeMap, minQPS, maxQPS) + if target.StoreID != tc.expectTarget { + t.Errorf("got target store %d for range with replicas %v and %f qps; want %d", + target.StoreID, tc.storeIDs, tc.qps, tc.expectTarget) + } + } +} From 094bf14008ea48bdb60bce024c1d939c0dddee6e Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Tue, 7 Aug 2018 03:15:10 -0500 Subject: [PATCH 4/4] roachtest: Add test for load-based lease rebalancing It consistently passes with store-level load-based lease rebalancing, but fails more often than not without it. Release note: None --- pkg/cmd/roachtest/allocator.go | 2 +- pkg/cmd/roachtest/rebalance_load.go | 180 ++++++++++++++++++++++++++++ pkg/cmd/roachtest/registry.go | 1 + 3 files changed, 182 insertions(+), 1 deletion(-) create mode 100644 pkg/cmd/roachtest/rebalance_load.go diff --git a/pkg/cmd/roachtest/allocator.go b/pkg/cmd/roachtest/allocator.go index 7fac79ade1cb..f30b4b8a55f3 100644 --- a/pkg/cmd/roachtest/allocator.go +++ b/pkg/cmd/roachtest/allocator.go @@ -33,7 +33,7 @@ func registerAllocator(r *registry) { c.Put(ctx, workload, "./workload") // Start the first `start` nodes and restore the fixture - args := startArgs("--args=--vmodule=allocator=5,allocator_scorer=5,replicate_queue=5") + args := startArgs("--args=--vmodule=store_rebalancer=5,allocator=5,allocator_scorer=5,replicate_queue=5") c.Start(ctx, c.Range(1, start), args) db := c.Conn(ctx, 1) defer db.Close() diff --git a/pkg/cmd/roachtest/rebalance_load.go b/pkg/cmd/roachtest/rebalance_load.go new file mode 100644 index 000000000000..a2611e1e0e49 --- /dev/null +++ b/pkg/cmd/roachtest/rebalance_load.go @@ -0,0 +1,180 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. See the AUTHORS file +// for names of contributors. + +package main + +import ( + "context" + gosql "database/sql" + "fmt" + "io/ioutil" + "os" + "sort" + "strconv" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "golang.org/x/sync/errgroup" +) + +func registerRebalanceLoad(r *registry) { + // This test creates a single table for kv to use and splits the table to + // have one range for every node in the cluster. Because even brand new + // clusters start with 20+ ranges in them, the number of new ranges in kv's + // table is small enough that it typically won't trigger rebalancing of + // leases in the cluster based on lease count alone. We let kv generate a lot + // of load against the ranges such that when + // kv.allocator.stat_based_rebalancing.enabled is set to true, we'd expect + // load-based rebalancing to distribute the load evenly across the nodes in + // the cluster. Without that setting, the fact that the kv table has so few + // ranges means that they probablistically won't have their leases evenly + // spread across all the nodes (they'll often just end up staying on n1). + // + // In other words, this test should always pass with + // kv.allocator.stat_based_rebalancing.enabled set to true, while it should + // usually (but not always fail) with it set to false. + rebalanceLoadRun := func(ctx context.Context, t *test, c *cluster, duration time.Duration, concurrency int) { + roachNodes := c.Range(1, c.nodes-1) + appNode := c.Node(c.nodes) + + c.Put(ctx, cockroach, "./cockroach", roachNodes) + args := startArgs( + "--args=--vmodule=store_rebalancer=5,allocator=5,allocator_scorer=5,replicate_queue=5") + c.Start(ctx, roachNodes, args) + + c.Put(ctx, workload, "./workload", appNode) + c.Run(ctx, appNode, `./workload init kv --drop {pgurl:1}`) + + var m *errgroup.Group // see comment in version.go + m, ctx = errgroup.WithContext(ctx) + + m.Go(func() error { + c.l.printf("starting load generator\n") + + quietL, err := newLogger("run kv", strconv.Itoa(0), "workload"+strconv.Itoa(0), ioutil.Discard, os.Stderr) + if err != nil { + return err + } + splits := len(roachNodes) - 1 // n-1 splits => n ranges => 1 lease per node + return c.RunL(ctx, quietL, appNode, fmt.Sprintf( + "./workload run kv --read-percent=95 --splits=%d --tolerate-errors --concurrency=%d "+ + "--duration=%s {pgurl:1-3}", + splits, concurrency, duration.String())) + }) + + m.Go(func() error { + t.Status(fmt.Sprintf("starting checks for lease balance")) + + db := c.Conn(ctx, 1) + defer db.Close() + + if _, err := db.ExecContext( + ctx, `SET CLUSTER SETTING kv.allocator.stat_based_rebalancing.enabled=true`, + ); err != nil { + return err + } + + for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= duration; { + if done, err := isLoadEvenlyDistributed(c.l, db, len(roachNodes)); err != nil { + return err + } else if done { + c.l.printf("successfully achieved lease balance\n") + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(5 * time.Second): + } + } + + return fmt.Errorf("timed out before leases were evenly spread") + }) + if err := m.Wait(); err != nil { + t.Fatal(err) + } + } + + minutes := 2 * time.Minute + numNodes := 4 // the last node is just used to generate load + concurrency := 128 + + r.Add(testSpec{ + Name: `rebalance-leases-by-load`, + Nodes: nodes(numNodes), + Stable: false, // TODO(a-robinson): Promote to stable + Run: func(ctx context.Context, t *test, c *cluster) { + if local { + concurrency = 32 + fmt.Printf("lowering concurrency to %d in local testing\n", concurrency) + } + rebalanceLoadRun(ctx, t, c, minutes, concurrency) + }, + }) +} + +func isLoadEvenlyDistributed(l *logger, db *gosql.DB, numNodes int) (bool, error) { + rows, err := db.Query( + `select lease_holder, count(*) ` + + `from [show experimental_ranges from table kv.kv] ` + + `group by lease_holder;`) + if err != nil { + return false, err + } + defer rows.Close() + leaseCounts := make(map[int]int) + var rangeCount int + for rows.Next() { + var storeID, leaseCount int + if err := rows.Scan(&storeID, &leaseCount); err != nil { + return false, err + } + leaseCounts[storeID] = leaseCount + rangeCount += leaseCount + } + l.printf("numbers of test.kv leases on each store: %v\n", leaseCounts) + + if len(leaseCounts) < numNodes { + l.printf("not all nodes have a lease yet: %v\n", leaseCounts) + return false, nil + } + + // The simple case is when ranges haven't split. We can require that every + // store has one lease. + if rangeCount == numNodes { + for _, leaseCount := range leaseCounts { + if leaseCount != 1 { + l.printf("uneven lease distribution: %v\n", leaseCounts) + return false, nil + } + } + return true, nil + } + + // For completeness, if leases have split, verify the leases per store don't + // differ by any more than 1. + leases := make([]int, 0, numNodes) + for _, leaseCount := range leaseCounts { + leases = append(leases, leaseCount) + } + sort.Ints(leases) + if leases[0]+1 < leases[len(leases)-1] { + l.printf("leases per store differ by more than one: %v\n", leaseCounts) + return false, nil + } + + return true, nil +} diff --git a/pkg/cmd/roachtest/registry.go b/pkg/cmd/roachtest/registry.go index 5d9319b83cc2..e425b7bd0998 100644 --- a/pkg/cmd/roachtest/registry.go +++ b/pkg/cmd/roachtest/registry.go @@ -45,6 +45,7 @@ func registerTests(r *registry) { registerKVSplits(r) registerLargeRange(r) registerQueue(r) + registerRebalanceLoad(r) registerRestore(r) registerRoachmart(r) registerScaleData(r)