Skip to content

Commit

Permalink
kvserver: add load dimensions
Browse files Browse the repository at this point in the history
Previously, disparate types were used when comparing `StoreCapacity`,
`XThreshold` and `RangeUsageInfo`. This patch introduces a uniform
intermediate type `Load`, which is used to perform arithmetic and
comparison between types representing load.

The purpose of this change is to decouple changes to the underlying
inputs in these components to enable modifying existing load dimensions
and adding new ones with less code modification.

Additionally, `RangeUsageInfo` is uniformly passed around in place of
any specific calls to the underlying accounting `ReplicaStats`, to a
tight dependency.

Part of cockroachdb#91152

Release note: None
  • Loading branch information
kvoli committed Nov 9, 2022
1 parent 15369db commit aaacd6f
Show file tree
Hide file tree
Showing 35 changed files with 992 additions and 565 deletions.
3 changes: 2 additions & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
<tr><td><code>feature.stats.enabled</code></td><td>boolean</td><td><code>true</code></td><td>set to true to enable CREATE STATISTICS/ANALYZE, false to disable; default is true</td></tr>
<tr><td><code>jobs.retention_time</code></td><td>duration</td><td><code>336h0m0s</code></td><td>the amount of time to retain records for completed jobs before</td></tr>
<tr><td><code>kv.allocator.load_based_lease_rebalancing.enabled</code></td><td>boolean</td><td><code>true</code></td><td>set to enable rebalancing of range leases based on load and latency</td></tr>
<tr><td><code>kv.allocator.load_based_rebalancing</code></td><td>enumeration</td><td><code>leases and replicas</code></td><td>whether to rebalance based on the distribution of QPS across stores [off = 0, leases = 1, leases and replicas = 2]</td></tr>
<tr><td><code>kv.allocator.load_based_rebalancing</code></td><td>enumeration</td><td><code>leases and replicas</code></td><td>whether to rebalance based on the distribution of load across stores [off = 0, leases = 1, leases and replicas = 2]</td></tr>
<tr><td><code>kv.allocator.load_based_rebalancing_dimension</code></td><td>enumeration</td><td><code>qps</code></td><td>what dimension of load does rebalancing consider [qps = 0]</td></tr>
<tr><td><code>kv.allocator.load_based_rebalancing_interval</code></td><td>duration</td><td><code>1m0s</code></td><td>the rough interval at which each store will check for load-based lease / replica rebalancing opportunities</td></tr>
<tr><td><code>kv.allocator.qps_rebalance_threshold</code></td><td>float</td><td><code>0.1</code></td><td>minimum fraction away from the mean a store's QPS (such as queries per second) can be before it is considered overfull or underfull</td></tr>
<tr><td><code>kv.allocator.range_rebalance_threshold</code></td><td>float</td><td><code>0.05</code></td><td>minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull</td></tr>
Expand Down
5 changes: 2 additions & 3 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ ALL_TESTS = [
"//pkg/kv/kvserver/allocator/storepool:storepool_test",
"//pkg/kv/kvserver/apply:apply_test",
"//pkg/kv/kvserver/asim/op:op_test",
"//pkg/kv/kvserver/asim/op:operator_test",
"//pkg/kv/kvserver/asim/queue:queue_test",
"//pkg/kv/kvserver/asim/state:state_test",
"//pkg/kv/kvserver/asim/storerebalancer:storerebalancer_test",
Expand Down Expand Up @@ -1122,6 +1121,7 @@ GO_TARGETS = [
"//pkg/kv/kvserver/abortspan:abortspan_test",
"//pkg/kv/kvserver/allocator/allocatorimpl:allocatorimpl",
"//pkg/kv/kvserver/allocator/allocatorimpl:allocatorimpl_test",
"//pkg/kv/kvserver/allocator/state:state",
"//pkg/kv/kvserver/allocator/storepool:storepool",
"//pkg/kv/kvserver/allocator/storepool:storepool_test",
"//pkg/kv/kvserver/allocator:allocator",
Expand All @@ -1130,8 +1130,6 @@ GO_TARGETS = [
"//pkg/kv/kvserver/asim/config:config",
"//pkg/kv/kvserver/asim/op:op",
"//pkg/kv/kvserver/asim/op:op_test",
"//pkg/kv/kvserver/asim/op:operator",
"//pkg/kv/kvserver/asim/op:operator_test",
"//pkg/kv/kvserver/asim/queue:queue",
"//pkg/kv/kvserver/asim/queue:queue_test",
"//pkg/kv/kvserver/asim/state:state",
Expand Down Expand Up @@ -2467,6 +2465,7 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/abortspan:get_x_data",
"//pkg/kv/kvserver/allocator:get_x_data",
"//pkg/kv/kvserver/allocator/allocatorimpl:get_x_data",
"//pkg/kv/kvserver/allocator/state:get_x_data",
"//pkg/kv/kvserver/allocator/storepool:get_x_data",
"//pkg/kv/kvserver/apply:get_x_data",
"//pkg/kv/kvserver/asim:get_x_data",
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ go_library(
"//pkg/kv/kvserver/abortspan",
"//pkg/kv/kvserver/allocator",
"//pkg/kv/kvserver/allocator/allocatorimpl",
"//pkg/kv/kvserver/allocator/state",
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/apply",
"//pkg/kv/kvserver/batcheval",
Expand Down Expand Up @@ -336,6 +337,7 @@ go_test(
"//pkg/kv/kvserver/abortspan",
"//pkg/kv/kvserver/allocator",
"//pkg/kv/kvserver/allocator/allocatorimpl",
"//pkg/kv/kvserver/allocator/state",
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/apply",
"//pkg/kv/kvserver/batcheval",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/allocator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/allocator/state",
"//pkg/kv/kvserver/constraint",
"//pkg/roachpb",
"//pkg/settings",
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ go_library(
"allocator.go",
"allocator_scorer.go",
"test_helpers.go",
"threshold.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl",
visibility = ["//visibility:public"],
deps = [
"//pkg/gossip",
"//pkg/kv/kvserver/allocator",
"//pkg/kv/kvserver/allocator/state",
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/constraint",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/kv/kvserver/raftutil",
"//pkg/kv/kvserver/replicastats",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
Expand Down Expand Up @@ -46,6 +47,7 @@ go_test(
"//pkg/gossip",
"//pkg/keys",
"//pkg/kv/kvserver/allocator",
"//pkg/kv/kvserver/allocator/state",
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/constraint",
"//pkg/kv/kvserver/liveness/livenesspb",
Expand Down
145 changes: 79 additions & 66 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/state"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand All @@ -42,29 +42,31 @@ const (
minReplicaWeight = 0.001
)

// LeaseRebalanceThreshold is the minimum ratio of a store's lease surplus
// to the mean range/lease count that permits lease-transfers away from that
// store.
// Made configurable for the sake of testing.
var LeaseRebalanceThreshold = 0.05

// LeaseRebalanceThresholdMin is the absolute number of leases above/below the
// mean lease count that a store can have before considered overfull/underfull.
// Made configurable for the sake of testing.
var LeaseRebalanceThresholdMin = 5.0

// baseLoadBasedLeaseRebalanceThreshold is the equivalent of
// LeaseRebalanceThreshold for load-based lease rebalance decisions (i.e.
// "follow-the-workload"). It's the base threshold for decisions that get
// adjusted based on the load and latency of the involved ranges/nodes.
var baseLoadBasedLeaseRebalanceThreshold = 2 * LeaseRebalanceThreshold

// MinLeaseTransferStatsDuration configures the minimum amount of time a
// replica must wait for stats about request counts to accumulate before
// making decisions based on them. The higher this is, the less likely
// thrashing is (up to a point).
// Made configurable for the sake of testing.
var MinLeaseTransferStatsDuration = 30 * time.Second
var (
// LeaseRebalanceThreshold is the minimum ratio of a store's lease surplus
// to the mean range/lease count that permits lease-transfers away from that
// store.
// Made configurable for the sake of testing.
LeaseRebalanceThreshold = 0.05

// LeaseRebalanceThresholdMin is the absolute number of leases above/below the
// mean lease count that a store can have before considered overfull/underfull.
// Made configurable for the sake of testing.
LeaseRebalanceThresholdMin = 5.0

// baseLoadBasedLeaseRebalanceThreshold is the equivalent of
// LeaseRebalanceThreshold for load-based lease rebalance decisions (i.e.
// "follow-the-workload"). It's the base threshold for decisions that get
// adjusted based on the load and latency of the involved ranges/nodes.
baseLoadBasedLeaseRebalanceThreshold = 2 * LeaseRebalanceThreshold

// MinLeaseTransferStatsDuration configures the minimum amount of time a
// replica must wait for stats about request counts to accumulate before
// making decisions based on them. The higher this is, the less likely
// thrashing is (up to a point).
// Made configurable for the sake of testing.
MinLeaseTransferStatsDuration = 30 * time.Second
)

// EnableLoadBasedLeaseRebalancing controls whether lease rebalancing is done
// via the new heuristic based on request load and latency or via the simpler
Expand Down Expand Up @@ -1714,7 +1716,7 @@ func (a *Allocator) TransferLeaseTarget(
GetFirstIndex() uint64
Desc() *roachpb.RangeDescriptor
},
statSummary *replicastats.RatedSummary,
usageInfo allocator.RangeUsageInfo,
forceDecisionWithoutStats bool,
opts allocator.TransferLeaseOptions,
) roachpb.ReplicaDescriptor {
Expand Down Expand Up @@ -1756,7 +1758,7 @@ func (a *Allocator) TransferLeaseTarget(
// falls back to `leaseCountConvergence`. Rationalize this or refactor this
// logic to be more clear.
transferDec, repl := a.shouldTransferLeaseForAccessLocality(
ctx, source, existing, statSummary, nil, candidateLeasesMean,
ctx, source, existing, usageInfo, nil, candidateLeasesMean,
)
if !excludeLeaseRepl {
switch transferDec {
Expand Down Expand Up @@ -1822,8 +1824,8 @@ func (a *Allocator) TransferLeaseTarget(
defer a.randGen.Unlock()
return candidates[a.randGen.Intn(len(candidates))]

case allocator.QPSConvergence:
leaseReplQPS := statSummary.QPS
case allocator.LoadConvergence:
leaseReplLoad := usageInfo.Load()
candidates := make([]roachpb.StoreID, 0, len(existing)-1)
for _, repl := range existing {
if repl.StoreID != leaseRepl.StoreID() {
Expand All @@ -1841,16 +1843,19 @@ func (a *Allocator) TransferLeaseTarget(
// be true in all cases (some percentage of the leaseholder's traffic could
// be follower read traffic). See
// https://github.com/cockroachdb/cockroach/issues/75630.
bestStore, noRebalanceReason := bestStoreToMinimizeQPSDelta(
leaseReplQPS,
bestStore, noRebalanceReason := bestStoreToMinimizeLoadDelta(
leaseReplLoad,
leaseRepl.StoreID(),
candidates,
storeDescMap,
&QPSScorerOptions{
StoreHealthOptions: a.StoreHealthOptions(ctx),
DeprecatedRangeRebalanceThreshold: RangeRebalanceThreshold.Get(&a.StorePool.St.SV),
QPSRebalanceThreshold: allocator.QPSRebalanceThreshold.Get(&a.StorePool.St.SV),
MinRequiredQPSDiff: allocator.MinQPSDifferenceForTransfers.Get(&a.StorePool.St.SV),
&LoadScorerOptions{
StoreHealthOptions: a.StoreHealthOptions(ctx),
Deterministic: a.StorePool.Deterministic,
LoadDims: opts.LoadDimensions,
LoadThreshold: LoadThresholds(&a.StorePool.St.SV, opts.LoadDimensions...),
MinLoadThreshold: LoadMinThresholds(opts.LoadDimensions...),
MinRequiredRebalanceLoadDiff: LoadRebalanceRequiredMinDiff(&a.StorePool.St.SV, opts.LoadDimensions...),
RebalanceImpact: leaseReplLoad,
},
)

Expand Down Expand Up @@ -1886,13 +1891,13 @@ func (a *Allocator) TransferLeaseTarget(
log.KvDistribution.VEventf(
ctx,
5,
"r%d: should transfer lease (qps=%0.2f) from s%d (qps=%0.2f) to s%d (qps=%0.2f)",
"r%d: should transfer lease (%s) from s%d (%s) to s%d (%s)",
leaseRepl.GetRangeID(),
leaseReplQPS,
leaseReplLoad,
leaseRepl.StoreID(),
storeDescMap[leaseRepl.StoreID()].Capacity.QueriesPerSecond,
storeDescMap[leaseRepl.StoreID()].Capacity.Load(),
bestStore,
storeDescMap[bestStore].Capacity.QueriesPerSecond,
storeDescMap[bestStore].Capacity.Load(),
)
default:
log.KvDistribution.Fatalf(ctx, "unknown declineReason: %v", noRebalanceReason)
Expand All @@ -1910,44 +1915,53 @@ func (a *Allocator) TransferLeaseTarget(
panic("unreachable")
}

// getCandidateWithMinQPS returns the StoreID that belongs to the store serving
// the lowest QPS among all the `candidates` stores.
func getCandidateWithMinQPS(
storeQPSMap map[roachpb.StoreID]float64, candidates []roachpb.StoreID,
// getCandidateWithMinLoad returns the StoreID that belongs to the store
// serving the lowest load among all the `candidates` stores, given a single
// dimension of load e.g. QPS.
func getCandidateWithMinLoad(
storeLoadMap map[roachpb.StoreID]state.Load,
candidates []roachpb.StoreID,
dimension state.LoadDimension,
) (bestCandidate roachpb.StoreID) {
minCandidateQPS := math.MaxFloat64
minCandidateLoad := math.MaxFloat64
for _, store := range candidates {
candidateQPS, ok := storeQPSMap[store]
candidateLoad, ok := storeLoadMap[store]
if !ok {
continue
}
if minCandidateQPS > candidateQPS {
minCandidateQPS = candidateQPS
candidateLoadDim := candidateLoad.Dim(dimension)
if minCandidateLoad > candidateLoadDim {
minCandidateLoad = candidateLoadDim
bestCandidate = store
}
}
return bestCandidate
}

// getQPSDelta returns the difference between the store serving the highest QPS
// getLoadDelta returns the difference between the store serving the highest QPS
// and the store serving the lowest QPS, among the set of stores in the
// `domain`.
func getQPSDelta(storeQPSMap map[roachpb.StoreID]float64, domain []roachpb.StoreID) float64 {
maxCandidateQPS := float64(0)
minCandidateQPS := math.MaxFloat64
func getLoadDelta(
storeLoadMap map[roachpb.StoreID]state.Load,
domain []roachpb.StoreID,
dimension state.LoadDimension,
) float64 {
maxCandidateLoad := float64(0)
minCandidateLoad := math.MaxFloat64
for _, cand := range domain {
candidateQPS, ok := storeQPSMap[cand]
candidateLoad, ok := storeLoadMap[cand]
if !ok {
continue
}
if maxCandidateQPS < candidateQPS {
maxCandidateQPS = candidateQPS
candidateLoadDim := candidateLoad.Dim(dimension)
if maxCandidateLoad < candidateLoadDim {
maxCandidateLoad = candidateLoadDim
}
if minCandidateQPS > candidateQPS {
minCandidateQPS = candidateQPS
if minCandidateLoad > candidateLoadDim {
minCandidateLoad = candidateLoadDim
}
}
return maxCandidateQPS - minCandidateQPS
return maxCandidateLoad - minCandidateLoad
}

// ShouldTransferLease returns true if the specified store is overfull in terms
Expand All @@ -1963,7 +1977,7 @@ func (a *Allocator) ShouldTransferLease(
GetFirstIndex() uint64
Desc() *roachpb.RangeDescriptor
},
statSummary *replicastats.RatedSummary,
usageInfo allocator.RangeUsageInfo,
) bool {
if a.leaseholderShouldMoveDueToPreferences(ctx, conf, leaseRepl, existing) {
return true
Expand Down Expand Up @@ -1994,7 +2008,7 @@ func (a *Allocator) ShouldTransferLease(
ctx,
source,
existing,
statSummary,
usageInfo,
nil,
sl.CandidateLeases.Mean,
)
Expand Down Expand Up @@ -2024,10 +2038,10 @@ func (a Allocator) FollowTheWorkloadPrefersLocal(
source roachpb.StoreDescriptor,
candidate roachpb.StoreID,
existing []roachpb.ReplicaDescriptor,
statSummary *replicastats.RatedSummary,
usageInfo allocator.RangeUsageInfo,
) bool {
adjustments := make(map[roachpb.StoreID]float64)
decision, _ := a.shouldTransferLeaseForAccessLocality(ctx, source, existing, statSummary, adjustments, sl.CandidateLeases.Mean)
decision, _ := a.shouldTransferLeaseForAccessLocality(ctx, source, existing, usageInfo, adjustments, sl.CandidateLeases.Mean)
if decision == decideWithoutStats {
return false
}
Expand All @@ -2045,14 +2059,13 @@ func (a Allocator) shouldTransferLeaseForAccessLocality(
ctx context.Context,
source roachpb.StoreDescriptor,
existing []roachpb.ReplicaDescriptor,
statSummary *replicastats.RatedSummary,
usageInfo allocator.RangeUsageInfo,
rebalanceAdjustments map[roachpb.StoreID]float64,
candidateLeasesMean float64,
) (transferDecision, roachpb.ReplicaDescriptor) {
// Only use load-based rebalancing if it's enabled and we have both
// stats and locality information to base our decision on.
if statSummary == nil ||
statSummary.LocalityCounts == nil ||
if usageInfo.RequestLocality == nil ||
!EnableLoadBasedLeaseRebalancing.Get(&a.StorePool.St.SV) {
return decideWithoutStats, roachpb.ReplicaDescriptor{}
}
Expand All @@ -2063,8 +2076,8 @@ func (a Allocator) shouldTransferLeaseForAccessLocality(
}
}

qpsStats := statSummary.LocalityCounts
qpsStatsDur := statSummary.Duration
qpsStats := usageInfo.RequestLocality.Counts
qpsStatsDur := usageInfo.RequestLocality.Duration

// If we haven't yet accumulated enough data, avoid transferring for now,
// unless we've been explicitly asked otherwise. Do not fall back to the
Expand Down
Loading

0 comments on commit aaacd6f

Please sign in to comment.