Skip to content

Commit

Permalink
kvserver: add load dimensions
Browse files Browse the repository at this point in the history
Previously, disparate types were used when comparing `StoreCapacity`,
`XThreshold` and `RangeUsageInfo`. This patch introduces a uniform
intermediate type `Load`, which is used to perform arithmetic and
comparison between types representing load.

The purpose of this change is to decouple changes to inputs, enable
modifying existing dimensions and adding new ones with less code
modification.

The only load dimension added in this patch is `Queries`, which is then
used in place of `QueriesPerSecond` within the rebalancing logic.

Additionally, `RangeUsageInfo` is uniformly passed around in place of
any specific calls to the underlying tracking datastructure
`ReplicaStats`.

Part of cockroachdb#91152

Release note: None
  • Loading branch information
kvoli committed Jan 3, 2023
1 parent 15591ab commit aa55af9
Show file tree
Hide file tree
Showing 39 changed files with 1,057 additions and 583 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<tr><td><div id="setting-feature-stats-enabled" class="anchored"><code>feature.stats.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to true to enable CREATE STATISTICS/ANALYZE, false to disable; default is true</td></tr>
<tr><td><div id="setting-jobs-retention-time" class="anchored"><code>jobs.retention_time</code></div></td><td>duration</td><td><code>336h0m0s</code></td><td>the amount of time to retain records for completed jobs before</td></tr>
<tr><td><div id="setting-kv-allocator-load-based-lease-rebalancing-enabled" class="anchored"><code>kv.allocator.load_based_lease_rebalancing.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to enable rebalancing of range leases based on load and latency</td></tr>
<tr><td><div id="setting-kv-allocator-load-based-rebalancing" class="anchored"><code>kv.allocator.load_based_rebalancing</code></div></td><td>enumeration</td><td><code>leases and replicas</code></td><td>whether to rebalance based on the distribution of QPS across stores [off = 0, leases = 1, leases and replicas = 2]</td></tr>
<tr><td><div id="setting-kv-allocator-load-based-rebalancing" class="anchored"><code>kv.allocator.load_based_rebalancing</code></div></td><td>enumeration</td><td><code>leases and replicas</code></td><td>whether to rebalance based on the distribution of load across stores [off = 0, leases = 1, leases and replicas = 2]</td></tr>
<tr><td><div id="setting-kv-allocator-load-based-rebalancing-interval" class="anchored"><code>kv.allocator.load_based_rebalancing_interval</code></div></td><td>duration</td><td><code>1m0s</code></td><td>the rough interval at which each store will check for load-based lease / replica rebalancing opportunities</td></tr>
<tr><td><div id="setting-kv-allocator-qps-rebalance-threshold" class="anchored"><code>kv.allocator.qps_rebalance_threshold</code></div></td><td>float</td><td><code>0.1</code></td><td>minimum fraction away from the mean a store&#39;s QPS (such as queries per second) can be before it is considered overfull or underfull</td></tr>
<tr><td><div id="setting-kv-allocator-range-rebalance-threshold" class="anchored"><code>kv.allocator.range_rebalance_threshold</code></div></td><td>float</td><td><code>0.05</code></td><td>minimum fraction away from the mean a store&#39;s range count can be before it is considered overfull or underfull</td></tr>
Expand Down
5 changes: 2 additions & 3 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ ALL_TESTS = [
"//pkg/kv/kvserver/allocator/storepool:storepool_test",
"//pkg/kv/kvserver/apply:apply_test",
"//pkg/kv/kvserver/asim/op:op_test",
"//pkg/kv/kvserver/asim/op:operator_test",
"//pkg/kv/kvserver/asim/queue:queue_test",
"//pkg/kv/kvserver/asim/state:state_test",
"//pkg/kv/kvserver/asim/storerebalancer:storerebalancer_test",
Expand Down Expand Up @@ -1147,6 +1146,7 @@ GO_TARGETS = [
"//pkg/kv/kvserver/abortspan:abortspan_test",
"//pkg/kv/kvserver/allocator/allocatorimpl:allocatorimpl",
"//pkg/kv/kvserver/allocator/allocatorimpl:allocatorimpl_test",
"//pkg/kv/kvserver/allocator/load:load",
"//pkg/kv/kvserver/allocator/storepool:storepool",
"//pkg/kv/kvserver/allocator/storepool:storepool_test",
"//pkg/kv/kvserver/allocator:allocator",
Expand All @@ -1155,8 +1155,6 @@ GO_TARGETS = [
"//pkg/kv/kvserver/asim/config:config",
"//pkg/kv/kvserver/asim/op:op",
"//pkg/kv/kvserver/asim/op:op_test",
"//pkg/kv/kvserver/asim/op:operator",
"//pkg/kv/kvserver/asim/op:operator_test",
"//pkg/kv/kvserver/asim/queue:queue",
"//pkg/kv/kvserver/asim/queue:queue_test",
"//pkg/kv/kvserver/asim/state:state",
Expand Down Expand Up @@ -2526,6 +2524,7 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/abortspan:get_x_data",
"//pkg/kv/kvserver/allocator:get_x_data",
"//pkg/kv/kvserver/allocator/allocatorimpl:get_x_data",
"//pkg/kv/kvserver/allocator/load:get_x_data",
"//pkg/kv/kvserver/allocator/storepool:get_x_data",
"//pkg/kv/kvserver/apply:get_x_data",
"//pkg/kv/kvserver/asim:get_x_data",
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ go_library(
"//pkg/kv/kvserver/abortspan",
"//pkg/kv/kvserver/allocator",
"//pkg/kv/kvserver/allocator/allocatorimpl",
"//pkg/kv/kvserver/allocator/load",
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/apply",
"//pkg/kv/kvserver/batcheval",
Expand Down Expand Up @@ -342,6 +343,7 @@ go_test(
"//pkg/kv/kvserver/abortspan",
"//pkg/kv/kvserver/allocator",
"//pkg/kv/kvserver/allocator/allocatorimpl",
"//pkg/kv/kvserver/allocator/load",
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/apply",
"//pkg/kv/kvserver/batcheval",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/allocator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/allocator/load",
"//pkg/kv/kvserver/constraint",
"//pkg/roachpb",
"//pkg/settings",
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/allocator/allocatorimpl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ go_library(
"allocator.go",
"allocator_scorer.go",
"test_helpers.go",
"threshold.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl",
visibility = ["//visibility:public"],
deps = [
"//pkg/gossip",
"//pkg/kv/kvserver/allocator",
"//pkg/kv/kvserver/allocator/load",
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/constraint",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/kv/kvserver/raftutil",
"//pkg/kv/kvserver/replicastats",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
Expand Down Expand Up @@ -46,6 +47,7 @@ go_test(
"//pkg/gossip",
"//pkg/keys",
"//pkg/kv/kvserver/allocator",
"//pkg/kv/kvserver/allocator/load",
"//pkg/kv/kvserver/allocator/storepool",
"//pkg/kv/kvserver/constraint",
"//pkg/kv/kvserver/liveness/livenesspb",
Expand Down
94 changes: 52 additions & 42 deletions pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/constraint"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -1777,7 +1777,7 @@ func (a *Allocator) TransferLeaseTarget(
GetFirstIndex() uint64
Desc() *roachpb.RangeDescriptor
},
statSummary *replicastats.RatedSummary,
usageInfo allocator.RangeUsageInfo,
forceDecisionWithoutStats bool,
opts allocator.TransferLeaseOptions,
) roachpb.ReplicaDescriptor {
Expand Down Expand Up @@ -1819,7 +1819,7 @@ func (a *Allocator) TransferLeaseTarget(
// falls back to `leaseCountConvergence`. Rationalize this or refactor this
// logic to be more clear.
transferDec, repl := a.shouldTransferLeaseForAccessLocality(
ctx, storePool, source, existing, statSummary, nil, candidateLeasesMean,
ctx, storePool, source, existing, usageInfo, nil, candidateLeasesMean,
)
if !excludeLeaseRepl {
switch transferDec {
Expand Down Expand Up @@ -1890,8 +1890,8 @@ func (a *Allocator) TransferLeaseTarget(
defer a.randGen.Unlock()
return candidates[a.randGen.Intn(len(candidates))]

case allocator.QPSConvergence:
leaseReplQPS := statSummary.QPS
case allocator.LoadConvergence:
leaseReplLoad := usageInfo.Load()
candidates := make([]roachpb.StoreID, 0, len(existing)-1)
for _, repl := range existing {
if repl.StoreID != leaseRepl.StoreID() {
Expand All @@ -1909,15 +1909,19 @@ func (a *Allocator) TransferLeaseTarget(
// be true in all cases (some percentage of the leaseholder's traffic could
// be follower read traffic). See
// https://github.com/cockroachdb/cockroach/issues/75630.
bestStore, noRebalanceReason := bestStoreToMinimizeQPSDelta(
leaseReplQPS,
bestStore, noRebalanceReason := bestStoreToMinimizeLoadDelta(
leaseReplLoad,
leaseRepl.StoreID(),
candidates,
storeDescMap,
&QPSScorerOptions{
StoreHealthOptions: a.StoreHealthOptions(ctx),
QPSRebalanceThreshold: allocator.QPSRebalanceThreshold.Get(&a.st.SV),
MinRequiredQPSDiff: allocator.MinQPSDifferenceForTransfers.Get(&a.st.SV),
&LoadScorerOptions{
StoreHealthOptions: a.StoreHealthOptions(ctx),
Deterministic: a.deterministic,
LoadDims: opts.LoadDimensions,
LoadThreshold: LoadThresholds(&a.st.SV, opts.LoadDimensions...),
MinLoadThreshold: LoadMinThresholds(opts.LoadDimensions...),
MinRequiredRebalanceLoadDiff: LoadRebalanceRequiredMinDiff(&a.st.SV, opts.LoadDimensions...),
RebalanceImpact: leaseReplLoad,
},
)

Expand Down Expand Up @@ -1953,13 +1957,13 @@ func (a *Allocator) TransferLeaseTarget(
log.KvDistribution.VEventf(
ctx,
5,
"r%d: should transfer lease (qps=%0.2f) from s%d (qps=%0.2f) to s%d (qps=%0.2f)",
"r%d: should transfer lease load=%s from s%d load=%s to s%d load=%s",
leaseRepl.GetRangeID(),
leaseReplQPS,
leaseReplLoad,
leaseRepl.StoreID(),
storeDescMap[leaseRepl.StoreID()].Capacity.QueriesPerSecond,
storeDescMap[leaseRepl.StoreID()].Capacity.Load(),
bestStore,
storeDescMap[bestStore].Capacity.QueriesPerSecond,
storeDescMap[bestStore].Capacity.Load(),
)
default:
log.KvDistribution.Fatalf(ctx, "unknown declineReason: %v", noRebalanceReason)
Expand All @@ -1977,44 +1981,51 @@ func (a *Allocator) TransferLeaseTarget(
panic("unreachable")
}

// getCandidateWithMinQPS returns the StoreID that belongs to the store serving
// the lowest QPS among all the `candidates` stores.
func getCandidateWithMinQPS(
storeQPSMap map[roachpb.StoreID]float64, candidates []roachpb.StoreID,
// getCandidateWithMinLoad returns the StoreID that belongs to the store
// serving the lowest load among all the `candidates` stores, given a single
// dimension of load e.g. QPS.
func getCandidateWithMinLoad(
storeLoadMap map[roachpb.StoreID]load.Load,
candidates []roachpb.StoreID,
dimension load.Dimension,
) (bestCandidate roachpb.StoreID) {
minCandidateQPS := math.MaxFloat64
minCandidateLoad := math.MaxFloat64
for _, store := range candidates {
candidateQPS, ok := storeQPSMap[store]
candidateLoad, ok := storeLoadMap[store]
if !ok {
continue
}
if minCandidateQPS > candidateQPS {
minCandidateQPS = candidateQPS
candidateLoadDim := candidateLoad.Dim(dimension)
if minCandidateLoad > candidateLoadDim {
minCandidateLoad = candidateLoadDim
bestCandidate = store
}
}
return bestCandidate
}

// getQPSDelta returns the difference between the store serving the highest QPS
// getLoadDelta returns the difference between the store serving the highest QPS
// and the store serving the lowest QPS, among the set of stores in the
// `domain`.
func getQPSDelta(storeQPSMap map[roachpb.StoreID]float64, domain []roachpb.StoreID) float64 {
maxCandidateQPS := float64(0)
minCandidateQPS := math.MaxFloat64
func getLoadDelta(
storeLoadMap map[roachpb.StoreID]load.Load, domain []roachpb.StoreID, dimension load.Dimension,
) float64 {
maxCandidateLoad := float64(0)
minCandidateLoad := math.MaxFloat64
for _, cand := range domain {
candidateQPS, ok := storeQPSMap[cand]
candidateLoad, ok := storeLoadMap[cand]
if !ok {
continue
}
if maxCandidateQPS < candidateQPS {
maxCandidateQPS = candidateQPS
candidateLoadDim := candidateLoad.Dim(dimension)
if maxCandidateLoad < candidateLoadDim {
maxCandidateLoad = candidateLoadDim
}
if minCandidateQPS > candidateQPS {
minCandidateQPS = candidateQPS
if minCandidateLoad > candidateLoadDim {
minCandidateLoad = candidateLoadDim
}
}
return maxCandidateQPS - minCandidateQPS
return maxCandidateLoad - minCandidateLoad
}

// ShouldTransferLease returns true if the specified store is overfull in terms
Expand All @@ -2031,7 +2042,7 @@ func (a *Allocator) ShouldTransferLease(
GetFirstIndex() uint64
Desc() *roachpb.RangeDescriptor
},
statSummary *replicastats.RatedSummary,
usageInfo allocator.RangeUsageInfo,
) bool {
if a.leaseholderShouldMoveDueToPreferences(ctx, storePool, conf, leaseRepl, existing) {
return true
Expand Down Expand Up @@ -2064,7 +2075,7 @@ func (a *Allocator) ShouldTransferLease(
storePool,
source,
existing,
statSummary,
usageInfo,
nil,
sl.CandidateLeases.Mean,
)
Expand Down Expand Up @@ -2095,10 +2106,10 @@ func (a Allocator) FollowTheWorkloadPrefersLocal(
source roachpb.StoreDescriptor,
candidate roachpb.StoreID,
existing []roachpb.ReplicaDescriptor,
statSummary *replicastats.RatedSummary,
usageInfo allocator.RangeUsageInfo,
) bool {
adjustments := make(map[roachpb.StoreID]float64)
decision, _ := a.shouldTransferLeaseForAccessLocality(ctx, storePool, source, existing, statSummary, adjustments, sl.CandidateLeases.Mean)
decision, _ := a.shouldTransferLeaseForAccessLocality(ctx, storePool, source, existing, usageInfo, adjustments, sl.CandidateLeases.Mean)
if decision == decideWithoutStats {
return false
}
Expand All @@ -2117,14 +2128,13 @@ func (a Allocator) shouldTransferLeaseForAccessLocality(
storePool storepool.AllocatorStorePool,
source roachpb.StoreDescriptor,
existing []roachpb.ReplicaDescriptor,
statSummary *replicastats.RatedSummary,
usageInfo allocator.RangeUsageInfo,
rebalanceAdjustments map[roachpb.StoreID]float64,
candidateLeasesMean float64,
) (transferDecision, roachpb.ReplicaDescriptor) {
// Only use load-based rebalancing if it's enabled and we have both
// stats and locality information to base our decision on.
if statSummary == nil ||
statSummary.LocalityCounts == nil ||
if usageInfo.RequestLocality == nil ||
!EnableLoadBasedLeaseRebalancing.Get(&a.st.SV) {
return decideWithoutStats, roachpb.ReplicaDescriptor{}
}
Expand All @@ -2135,8 +2145,8 @@ func (a Allocator) shouldTransferLeaseForAccessLocality(
}
}

qpsStats := statSummary.LocalityCounts
qpsStatsDur := statSummary.Duration
qpsStats := usageInfo.RequestLocality.Counts
qpsStatsDur := usageInfo.RequestLocality.Duration

// If we haven't yet accumulated enough data, avoid transferring for now,
// unless we've been explicitly asked otherwise. Do not fall back to the
Expand Down
Loading

0 comments on commit aa55af9

Please sign in to comment.