Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: add cpu time rebalancing #91674

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@
<tr><td><code>feature.schema_change.enabled</code></td><td>boolean</td><td><code>true</code></td><td>set to true to enable schema changes, false to disable; default is true</td></tr>
<tr><td><code>feature.stats.enabled</code></td><td>boolean</td><td><code>true</code></td><td>set to true to enable CREATE STATISTICS/ANALYZE, false to disable; default is true</td></tr>
<tr><td><code>jobs.retention_time</code></td><td>duration</td><td><code>336h0m0s</code></td><td>the amount of time to retain records for completed jobs before</td></tr>
<tr><td><code>kv.allocator.cpu_rebalance_threshold</code></td><td>float</td><td><code>0.1</code></td><td>minimum fraction away from the mean a store's cpu time can be before it is considered overfull or underfull</td></tr>
<tr><td><code>kv.allocator.load_based_lease_rebalancing.enabled</code></td><td>boolean</td><td><code>true</code></td><td>set to enable rebalancing of range leases based on load and latency</td></tr>
<tr><td><code>kv.allocator.load_based_rebalancing</code></td><td>enumeration</td><td><code>leases and replicas</code></td><td>whether to rebalance based on the distribution of QPS across stores [off = 0, leases = 1, leases and replicas = 2]</td></tr>
<tr><td><code>kv.allocator.load_based_rebalancing</code></td><td>enumeration</td><td><code>leases and replicas</code></td><td>whether to rebalance based on the distribution of load across stores [off = 0, leases = 1, leases and replicas = 2]</td></tr>
<tr><td><code>kv.allocator.load_based_rebalancing_dimension</code></td><td>enumeration</td><td><code>qps</code></td><td>what dimension of load does rebalancing consider [qps = 0, cpu = 1]</td></tr>
<tr><td><code>kv.allocator.load_based_rebalancing_interval</code></td><td>duration</td><td><code>1m0s</code></td><td>the rough interval at which each store will check for load-based lease / replica rebalancing opportunities</td></tr>
<tr><td><code>kv.allocator.qps_rebalance_threshold</code></td><td>float</td><td><code>0.1</code></td><td>minimum fraction away from the mean a store's QPS (such as queries per second) can be before it is considered overfull or underfull</td></tr>
<tr><td><code>kv.allocator.range_rebalance_threshold</code></td><td>float</td><td><code>0.05</code></td><td>minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull</td></tr>
Expand Down
5 changes: 2 additions & 3 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ ALL_TESTS = [
"//pkg/kv/kvserver/allocator/storepool:storepool_test",
"//pkg/kv/kvserver/apply:apply_test",
"//pkg/kv/kvserver/asim/op:op_test",
"//pkg/kv/kvserver/asim/op:operator_test",
"//pkg/kv/kvserver/asim/queue:queue_test",
"//pkg/kv/kvserver/asim/state:state_test",
"//pkg/kv/kvserver/asim/storerebalancer:storerebalancer_test",
Expand Down Expand Up @@ -1122,6 +1121,7 @@ GO_TARGETS = [
"//pkg/kv/kvserver/abortspan:abortspan_test",
"//pkg/kv/kvserver/allocator/allocatorimpl:allocatorimpl",
"//pkg/kv/kvserver/allocator/allocatorimpl:allocatorimpl_test",
"//pkg/kv/kvserver/allocator/state:state",
"//pkg/kv/kvserver/allocator/storepool:storepool",
"//pkg/kv/kvserver/allocator/storepool:storepool_test",
"//pkg/kv/kvserver/allocator:allocator",
Expand All @@ -1130,8 +1130,6 @@ GO_TARGETS = [
"//pkg/kv/kvserver/asim/config:config",
"//pkg/kv/kvserver/asim/op:op",
"//pkg/kv/kvserver/asim/op:op_test",
"//pkg/kv/kvserver/asim/op:operator",
"//pkg/kv/kvserver/asim/op:operator_test",
"//pkg/kv/kvserver/asim/queue:queue",
"//pkg/kv/kvserver/asim/queue:queue_test",
"//pkg/kv/kvserver/asim/state:state",
Expand Down Expand Up @@ -2467,6 +2465,7 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/abortspan:get_x_data",
"//pkg/kv/kvserver/allocator:get_x_data",
"//pkg/kv/kvserver/allocator/allocatorimpl:get_x_data",
"//pkg/kv/kvserver/allocator/state:get_x_data",
"//pkg/kv/kvserver/allocator/storepool:get_x_data",
"//pkg/kv/kvserver/apply:get_x_data",
"//pkg/kv/kvserver/asim:get_x_data",
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ go_library(
"//pkg/kv/kvserver/abortspan",
"//pkg/kv/kvserver/allocator",
"//pkg/kv/kvserver/allocator/allocatorimpl",
"//pkg/kv/kvserver/allocator/state",
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/apply",
"//pkg/kv/kvserver/batcheval",
Expand Down Expand Up @@ -176,6 +177,7 @@ go_library(
"//pkg/util/envutil",
"//pkg/util/errorutil",
"//pkg/util/grpcutil",
"//pkg/util/grunning",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/iterutil",
Expand Down Expand Up @@ -336,6 +338,7 @@ go_test(
"//pkg/kv/kvserver/abortspan",
"//pkg/kv/kvserver/allocator",
"//pkg/kv/kvserver/allocator/allocatorimpl",
"//pkg/kv/kvserver/allocator/state",
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/apply",
"//pkg/kv/kvserver/batcheval",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/allocator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/allocator/state",
"//pkg/kv/kvserver/constraint",
"//pkg/roachpb",
"//pkg/settings",
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ go_library(
"allocator.go",
"allocator_scorer.go",
"test_helpers.go",
"threshold.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl",
visibility = ["//visibility:public"],
deps = [
"//pkg/gossip",
"//pkg/kv/kvserver/allocator",
"//pkg/kv/kvserver/allocator/state",
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/constraint",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/kv/kvserver/raftutil",
"//pkg/kv/kvserver/replicastats",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
Expand Down Expand Up @@ -46,6 +47,7 @@ go_test(
"//pkg/gossip",
"//pkg/keys",
"//pkg/kv/kvserver/allocator",
"//pkg/kv/kvserver/allocator/state",
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/constraint",
"//pkg/kv/kvserver/liveness/livenesspb",
Expand Down
145 changes: 79 additions & 66 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/state"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand All @@ -42,29 +42,31 @@ const (
minReplicaWeight = 0.001
)

// LeaseRebalanceThreshold is the minimum ratio of a store's lease surplus
// to the mean range/lease count that permits lease-transfers away from that
// store.
// Made configurable for the sake of testing.
var LeaseRebalanceThreshold = 0.05

// LeaseRebalanceThresholdMin is the absolute number of leases above/below the
// mean lease count that a store can have before considered overfull/underfull.
// Made configurable for the sake of testing.
var LeaseRebalanceThresholdMin = 5.0

// baseLoadBasedLeaseRebalanceThreshold is the equivalent of
// LeaseRebalanceThreshold for load-based lease rebalance decisions (i.e.
// "follow-the-workload"). It's the base threshold for decisions that get
// adjusted based on the load and latency of the involved ranges/nodes.
var baseLoadBasedLeaseRebalanceThreshold = 2 * LeaseRebalanceThreshold

// MinLeaseTransferStatsDuration configures the minimum amount of time a
// replica must wait for stats about request counts to accumulate before
// making decisions based on them. The higher this is, the less likely
// thrashing is (up to a point).
// Made configurable for the sake of testing.
var MinLeaseTransferStatsDuration = 30 * time.Second
var (
// LeaseRebalanceThreshold is the minimum ratio of a store's lease surplus
// to the mean range/lease count that permits lease-transfers away from that
// store.
// Made configurable for the sake of testing.
LeaseRebalanceThreshold = 0.05

// LeaseRebalanceThresholdMin is the absolute number of leases above/below the
// mean lease count that a store can have before considered overfull/underfull.
// Made configurable for the sake of testing.
LeaseRebalanceThresholdMin = 5.0

// baseLoadBasedLeaseRebalanceThreshold is the equivalent of
// LeaseRebalanceThreshold for load-based lease rebalance decisions (i.e.
// "follow-the-workload"). It's the base threshold for decisions that get
// adjusted based on the load and latency of the involved ranges/nodes.
baseLoadBasedLeaseRebalanceThreshold = 2 * LeaseRebalanceThreshold

// MinLeaseTransferStatsDuration configures the minimum amount of time a
// replica must wait for stats about request counts to accumulate before
// making decisions based on them. The higher this is, the less likely
// thrashing is (up to a point).
// Made configurable for the sake of testing.
MinLeaseTransferStatsDuration = 30 * time.Second
)

// EnableLoadBasedLeaseRebalancing controls whether lease rebalancing is done
// via the new heuristic based on request load and latency or via the simpler
Expand Down Expand Up @@ -1714,7 +1716,7 @@ func (a *Allocator) TransferLeaseTarget(
GetFirstIndex() uint64
Desc() *roachpb.RangeDescriptor
},
statSummary *replicastats.RatedSummary,
usageInfo allocator.RangeUsageInfo,
forceDecisionWithoutStats bool,
opts allocator.TransferLeaseOptions,
) roachpb.ReplicaDescriptor {
Expand Down Expand Up @@ -1756,7 +1758,7 @@ func (a *Allocator) TransferLeaseTarget(
// falls back to `leaseCountConvergence`. Rationalize this or refactor this
// logic to be more clear.
transferDec, repl := a.shouldTransferLeaseForAccessLocality(
ctx, source, existing, statSummary, nil, candidateLeasesMean,
ctx, source, existing, usageInfo, nil, candidateLeasesMean,
)
if !excludeLeaseRepl {
switch transferDec {
Expand Down Expand Up @@ -1822,8 +1824,8 @@ func (a *Allocator) TransferLeaseTarget(
defer a.randGen.Unlock()
return candidates[a.randGen.Intn(len(candidates))]

case allocator.QPSConvergence:
leaseReplQPS := statSummary.QPS
case allocator.LoadConvergence:
leaseReplLoad := usageInfo.Load()
candidates := make([]roachpb.StoreID, 0, len(existing)-1)
for _, repl := range existing {
if repl.StoreID != leaseRepl.StoreID() {
Expand All @@ -1841,16 +1843,19 @@ func (a *Allocator) TransferLeaseTarget(
// be true in all cases (some percentage of the leaseholder's traffic could
// be follower read traffic). See
// https://github.com/cockroachdb/cockroach/issues/75630.
bestStore, noRebalanceReason := bestStoreToMinimizeQPSDelta(
leaseReplQPS,
bestStore, noRebalanceReason := bestStoreToMinimizeLoadDelta(
leaseReplLoad,
leaseRepl.StoreID(),
candidates,
storeDescMap,
&QPSScorerOptions{
StoreHealthOptions: a.StoreHealthOptions(ctx),
DeprecatedRangeRebalanceThreshold: RangeRebalanceThreshold.Get(&a.StorePool.St.SV),
QPSRebalanceThreshold: allocator.QPSRebalanceThreshold.Get(&a.StorePool.St.SV),
MinRequiredQPSDiff: allocator.MinQPSDifferenceForTransfers.Get(&a.StorePool.St.SV),
&LoadScorerOptions{
StoreHealthOptions: a.StoreHealthOptions(ctx),
Deterministic: a.StorePool.Deterministic,
LoadDims: opts.LoadDimensions,
LoadThreshold: LoadThresholds(&a.StorePool.St.SV, opts.LoadDimensions...),
MinLoadThreshold: LoadMinThresholds(opts.LoadDimensions...),
MinRequiredRebalanceLoadDiff: LoadRebalanceRequiredMinDiff(&a.StorePool.St.SV, opts.LoadDimensions...),
RebalanceImpact: leaseReplLoad,
},
)

Expand Down Expand Up @@ -1886,13 +1891,13 @@ func (a *Allocator) TransferLeaseTarget(
log.KvDistribution.VEventf(
ctx,
5,
"r%d: should transfer lease (qps=%0.2f) from s%d (qps=%0.2f) to s%d (qps=%0.2f)",
"r%d: should transfer lease (%s) from s%d (%s) to s%d (%s)",
leaseRepl.GetRangeID(),
leaseReplQPS,
leaseReplLoad,
leaseRepl.StoreID(),
storeDescMap[leaseRepl.StoreID()].Capacity.QueriesPerSecond,
storeDescMap[leaseRepl.StoreID()].Capacity.Load(),
bestStore,
storeDescMap[bestStore].Capacity.QueriesPerSecond,
storeDescMap[bestStore].Capacity.Load(),
)
default:
log.KvDistribution.Fatalf(ctx, "unknown declineReason: %v", noRebalanceReason)
Expand All @@ -1910,44 +1915,53 @@ func (a *Allocator) TransferLeaseTarget(
panic("unreachable")
}

// getCandidateWithMinQPS returns the StoreID that belongs to the store serving
// the lowest QPS among all the `candidates` stores.
func getCandidateWithMinQPS(
storeQPSMap map[roachpb.StoreID]float64, candidates []roachpb.StoreID,
// getCandidateWithMinLoad returns the StoreID that belongs to the store
// serving the lowest load among all the `candidates` stores, given a single
// dimension of load e.g. QPS.
func getCandidateWithMinLoad(
storeLoadMap map[roachpb.StoreID]state.Load,
candidates []roachpb.StoreID,
dimension state.LoadDimension,
) (bestCandidate roachpb.StoreID) {
minCandidateQPS := math.MaxFloat64
minCandidateLoad := math.MaxFloat64
for _, store := range candidates {
candidateQPS, ok := storeQPSMap[store]
candidateLoad, ok := storeLoadMap[store]
if !ok {
continue
}
if minCandidateQPS > candidateQPS {
minCandidateQPS = candidateQPS
candidateLoadDim := candidateLoad.Dim(dimension)
if minCandidateLoad > candidateLoadDim {
minCandidateLoad = candidateLoadDim
bestCandidate = store
}
}
return bestCandidate
}

// getQPSDelta returns the difference between the store serving the highest QPS
// getLoadDelta returns the difference between the store serving the highest QPS
// and the store serving the lowest QPS, among the set of stores in the
// `domain`.
func getQPSDelta(storeQPSMap map[roachpb.StoreID]float64, domain []roachpb.StoreID) float64 {
maxCandidateQPS := float64(0)
minCandidateQPS := math.MaxFloat64
func getLoadDelta(
storeLoadMap map[roachpb.StoreID]state.Load,
domain []roachpb.StoreID,
dimension state.LoadDimension,
) float64 {
maxCandidateLoad := float64(0)
minCandidateLoad := math.MaxFloat64
for _, cand := range domain {
candidateQPS, ok := storeQPSMap[cand]
candidateLoad, ok := storeLoadMap[cand]
if !ok {
continue
}
if maxCandidateQPS < candidateQPS {
maxCandidateQPS = candidateQPS
candidateLoadDim := candidateLoad.Dim(dimension)
if maxCandidateLoad < candidateLoadDim {
maxCandidateLoad = candidateLoadDim
}
if minCandidateQPS > candidateQPS {
minCandidateQPS = candidateQPS
if minCandidateLoad > candidateLoadDim {
minCandidateLoad = candidateLoadDim
}
}
return maxCandidateQPS - minCandidateQPS
return maxCandidateLoad - minCandidateLoad
}

// ShouldTransferLease returns true if the specified store is overfull in terms
Expand All @@ -1963,7 +1977,7 @@ func (a *Allocator) ShouldTransferLease(
GetFirstIndex() uint64
Desc() *roachpb.RangeDescriptor
},
statSummary *replicastats.RatedSummary,
usageInfo allocator.RangeUsageInfo,
) bool {
if a.leaseholderShouldMoveDueToPreferences(ctx, conf, leaseRepl, existing) {
return true
Expand Down Expand Up @@ -1994,7 +2008,7 @@ func (a *Allocator) ShouldTransferLease(
ctx,
source,
existing,
statSummary,
usageInfo,
nil,
sl.CandidateLeases.Mean,
)
Expand Down Expand Up @@ -2024,10 +2038,10 @@ func (a Allocator) FollowTheWorkloadPrefersLocal(
source roachpb.StoreDescriptor,
candidate roachpb.StoreID,
existing []roachpb.ReplicaDescriptor,
statSummary *replicastats.RatedSummary,
usageInfo allocator.RangeUsageInfo,
) bool {
adjustments := make(map[roachpb.StoreID]float64)
decision, _ := a.shouldTransferLeaseForAccessLocality(ctx, source, existing, statSummary, adjustments, sl.CandidateLeases.Mean)
decision, _ := a.shouldTransferLeaseForAccessLocality(ctx, source, existing, usageInfo, adjustments, sl.CandidateLeases.Mean)
if decision == decideWithoutStats {
return false
}
Expand All @@ -2045,14 +2059,13 @@ func (a Allocator) shouldTransferLeaseForAccessLocality(
ctx context.Context,
source roachpb.StoreDescriptor,
existing []roachpb.ReplicaDescriptor,
statSummary *replicastats.RatedSummary,
usageInfo allocator.RangeUsageInfo,
rebalanceAdjustments map[roachpb.StoreID]float64,
candidateLeasesMean float64,
) (transferDecision, roachpb.ReplicaDescriptor) {
// Only use load-based rebalancing if it's enabled and we have both
// stats and locality information to base our decision on.
if statSummary == nil ||
statSummary.LocalityCounts == nil ||
if usageInfo.RequestLocality == nil ||
!EnableLoadBasedLeaseRebalancing.Get(&a.StorePool.St.SV) {
return decideWithoutStats, roachpb.ReplicaDescriptor{}
}
Expand All @@ -2063,8 +2076,8 @@ func (a Allocator) shouldTransferLeaseForAccessLocality(
}
}

qpsStats := statSummary.LocalityCounts
qpsStatsDur := statSummary.Duration
qpsStats := usageInfo.RequestLocality.Counts
qpsStatsDur := usageInfo.RequestLocality.Duration

// If we haven't yet accumulated enough data, avoid transferring for now,
// unless we've been explicitly asked otherwise. Do not fall back to the
Expand Down
Loading