Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: introduce cpu rebalancing #95152

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ kv.closed_timestamp.follower_reads_enabled boolean true allow (all) replicas to
kv.log_range_and_node_events.enabled boolean true set to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog
kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconciling jobs with protected timestamp records
kv.range_split.by_load_enabled boolean true allow automatic splits of ranges based on where load is concentrated
kv.range_split.load_cpu_threshold duration 250ms the CPU use per second over which, the range becomes a candidate for load based splitting
kv.range_split.load_qps_threshold integer 2500 the QPS over which, the range becomes a candidate for load based splitting
kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled
kv.rangefeed.range_stuck_threshold duration 1m0s restart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.closed_timestamp.side_transport_interval takes precedence)
Expand Down Expand Up @@ -297,4 +298,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 1000022.2-34 set the active cluster version in the format '<major>.<minor>'
version version 1000022.2-36 set the active cluster version in the format '<major>.<minor>'
5 changes: 4 additions & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,19 @@
<tr><td><div id="setting-jobs-retention-time" class="anchored"><code>jobs.retention_time</code></div></td><td>duration</td><td><code>336h0m0s</code></td><td>the amount of time to retain records for completed jobs before</td></tr>
<tr><td><div id="setting-kv-allocator-load-based-lease-rebalancing-enabled" class="anchored"><code>kv.allocator.load_based_lease_rebalancing.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to enable rebalancing of range leases based on load and latency</td></tr>
<tr><td><div id="setting-kv-allocator-load-based-rebalancing" class="anchored"><code>kv.allocator.load_based_rebalancing</code></div></td><td>enumeration</td><td><code>leases and replicas</code></td><td>whether to rebalance based on the distribution of load across stores [off = 0, leases = 1, leases and replicas = 2]</td></tr>
<tr><td><div id="setting-kv-allocator-load-based-rebalancing-objective" class="anchored"><code>kv.allocator.load_based_rebalancing.objective</code></div></td><td>enumeration</td><td><code>qps</code></td><td>what objective does the cluster use to rebalance; if set to `qps` the cluster will attempt to balance qps among stores, if set to `cpu` the cluster will attempt to balance cpu usage among stores [qps = 0, cpu = 1]</td></tr>
<tr><td><div id="setting-kv-allocator-load-based-rebalancing-interval" class="anchored"><code>kv.allocator.load_based_rebalancing_interval</code></div></td><td>duration</td><td><code>1m0s</code></td><td>the rough interval at which each store will check for load-based lease / replica rebalancing opportunities</td></tr>
<tr><td><div id="setting-kv-allocator-qps-rebalance-threshold" class="anchored"><code>kv.allocator.qps_rebalance_threshold</code></div></td><td>float</td><td><code>0.1</code></td><td>minimum fraction away from the mean a store&#39;s QPS (such as queries per second) can be before it is considered overfull or underfull</td></tr>
<tr><td><div id="setting-kv-allocator-range-rebalance-threshold" class="anchored"><code>kv.allocator.range_rebalance_threshold</code></div></td><td>float</td><td><code>0.05</code></td><td>minimum fraction away from the mean a store&#39;s range count can be before it is considered overfull or underfull</td></tr>
<tr><td><div id="setting-kv-allocator-store-cpu-rebalance-threshold" class="anchored"><code>kv.allocator.store_cpu_rebalance_threshold</code></div></td><td>float</td><td><code>0.1</code></td><td>minimum fraction away from the mean a store&#39;s cpu usage can be before it is considered overfull or underfull</td></tr>
<tr><td><div id="setting-kv-bulk-io-write-max-rate" class="anchored"><code>kv.bulk_io_write.max_rate</code></div></td><td>byte size</td><td><code>1.0 TiB</code></td><td>the rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops</td></tr>
<tr><td><div id="setting-kv-bulk-sst-max-allowed-overage" class="anchored"><code>kv.bulk_sst.max_allowed_overage</code></div></td><td>byte size</td><td><code>64 MiB</code></td><td>if positive, allowed size in excess of target size for SSTs from export requests; export requests (i.e. BACKUP) may buffer up to the sum of kv.bulk_sst.target_size and kv.bulk_sst.max_allowed_overage in memory</td></tr>
<tr><td><div id="setting-kv-bulk-sst-target-size" class="anchored"><code>kv.bulk_sst.target_size</code></div></td><td>byte size</td><td><code>16 MiB</code></td><td>target size for SSTs emitted from export requests; export requests (i.e. BACKUP) may buffer up to the sum of kv.bulk_sst.target_size and kv.bulk_sst.max_allowed_overage in memory</td></tr>
<tr><td><div id="setting-kv-closed-timestamp-follower-reads-enabled" class="anchored"><code>kv.closed_timestamp.follower_reads_enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>allow (all) replicas to serve consistent historical reads based on closed timestamp information</td></tr>
<tr><td><div id="setting-kv-log-range-and-node-events-enabled" class="anchored"><code>kv.log_range_and_node_events.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog</td></tr>
<tr><td><div id="setting-kv-protectedts-reconciliation-interval" class="anchored"><code>kv.protectedts.reconciliation.interval</code></div></td><td>duration</td><td><code>5m0s</code></td><td>the frequency for reconciling jobs with protected timestamp records</td></tr>
<tr><td><div id="setting-kv-range-split-by-load-enabled" class="anchored"><code>kv.range_split.by_load_enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>allow automatic splits of ranges based on where load is concentrated</td></tr>
<tr><td><div id="setting-kv-range-split-load-cpu-threshold" class="anchored"><code>kv.range_split.load_cpu_threshold</code></div></td><td>duration</td><td><code>250ms</code></td><td>the CPU use per second over which, the range becomes a candidate for load based splitting</td></tr>
<tr><td><div id="setting-kv-range-split-load-qps-threshold" class="anchored"><code>kv.range_split.load_qps_threshold</code></div></td><td>integer</td><td><code>2500</code></td><td>the QPS over which, the range becomes a candidate for load based splitting</td></tr>
<tr><td><div id="setting-kv-rangefeed-enabled" class="anchored"><code>kv.rangefeed.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if set, rangefeed registration is enabled</td></tr>
<tr><td><div id="setting-kv-rangefeed-range-stuck-threshold" class="anchored"><code>kv.rangefeed.range_stuck_threshold</code></div></td><td>duration</td><td><code>1m0s</code></td><td>restart rangefeeds if they don&#39;t emit anything for the specified threshold; 0 disables (kv.closed_timestamp.side_transport_interval takes precedence)</td></tr>
Expand Down Expand Up @@ -235,6 +238,6 @@
<tr><td><div id="setting-trace-opentelemetry-collector" class="anchored"><code>trace.opentelemetry.collector</code></div></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 4317 will be used.</td></tr>
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000022.2-34</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000022.2-36</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
</tbody>
</table>
9 changes: 8 additions & 1 deletion pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ const (
// columnar scans in the KV layer.
V23_1_KVDirectColumnarScans

// V23_1AllocatorCPUBalancing adds balancing CPU usage among stores using
// the allocator and store rebalancer. It assumes that at this version,
// stores now include their CPU in the StoreCapacity proto when gossiping.
V23_1AllocatorCPUBalancing
// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -702,7 +706,10 @@ var rawVersionsSingleton = keyedVersions{
Key: V23_1_KVDirectColumnarScans,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 34},
},

{
Key: V23_1AllocatorCPUBalancing,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 36},
},
// *************************************************
// Step (2): Add new versions here.
// Do not add new versions to a patch release.
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"raft_transport_metrics.go",
"raft_truncator_replica.go",
"range_log.go",
"rebalance_objective.go",
"replica.go",
"replica_app_batch.go",
"replica_application_cmd.go",
Expand Down Expand Up @@ -273,6 +274,7 @@ go_test(
"raft_transport_test.go",
"raft_transport_unit_test.go",
"range_log_test.go",
"rebalance_objective_test.go",
"replica_application_cmd_buf_test.go",
"replica_application_state_machine_test.go",
"replica_batch_updates_test.go",
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2186,7 +2186,7 @@ func (a *Allocator) TransferLeaseTarget(
return candidates[a.randGen.Intn(len(candidates))]

case allocator.LoadConvergence:
leaseReplLoad := usageInfo.Load()
leaseReplLoad := usageInfo.TransferImpact()
candidates := make([]roachpb.StoreID, 0, len(existing)-1)
for _, repl := range existing {
if repl.StoreID != leaseRepl.StoreID() {
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/threshold.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ func getLoadThreshold(dim load.Dimension, sv *settings.Values) float64 {
switch dim {
case load.Queries:
return allocator.QPSRebalanceThreshold.Get(sv)
case load.CPU:
return allocator.CPURebalanceThreshold.Get(sv)
default:
panic(errors.AssertionFailedf("Unkown load dimension %d", dim))
}
Expand All @@ -51,6 +53,8 @@ func getLoadMinThreshold(dim load.Dimension) float64 {
switch dim {
case load.Queries:
return allocator.MinQPSThresholdDifference
case load.CPU:
return allocator.MinCPUThresholdDifference
default:
panic(errors.AssertionFailedf("Unkown load dimension %d", dim))
}
Expand All @@ -76,6 +80,8 @@ func getLoadRebalanceMinRequiredDiff(dim load.Dimension, sv *settings.Values) fl
switch dim {
case load.Queries:
return allocator.MinQPSDifferenceForTransfers.Get(sv)
case load.CPU:
return allocator.MinCPUDifferenceForTransfers
default:
panic(errors.AssertionFailedf("Unkown load dimension %d", dim))
}
Expand Down Expand Up @@ -117,3 +123,13 @@ func MakeQPSOnlyDim(v float64) load.Load {
dims[load.Queries] = v
return dims
}

// SetAllDims returns a load vector with all dimensions filled in with the
// value given.
func SetAllDims(v float64) load.Load {
dims := load.Vector{}
for i := range dims {
dims[i] = v
}
return dims
}
45 changes: 45 additions & 0 deletions pkg/kv/kvserver/allocator/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,30 @@ const (
// lightly loaded clusters.
MinQPSThresholdDifference = 100

// MinCPUThresholdDifference is the minimum CPU difference from the cluster
// mean that this system should care about. The system won't attempt to
// take action if a store's CPU differs from the mean by less than this
// amount even if it is greater than the percentage threshold. This
// prevents too many lease transfers or range rebalances in lightly loaded
// clusters.
//
// NB: This represents 5% (1/20) utilization of 1 cpu on average. This
// number was arrived at from testing to minimize thrashing. This number is
// set independent of processor speed and assumes identical value of cpu
// time across all stores. i.e. all cpu's are identical.
MinCPUThresholdDifference = float64(50 * time.Millisecond)

// MinCPUDifferenceForTransfers is the minimum CPU difference that a
// store rebalncer would care about to reconcile (via lease or replica
// rebalancing) between any two stores.
//
// NB: This is set to be two times the minimum threshold that a store needs
// to be above or below the mean to be considered overfull or underfull
// respectively. This is to make lease transfers and replica rebalances
// less sensistive to jitters in any given workload by introducing
// additional friction before taking these actions.
MinCPUDifferenceForTransfers = 2 * MinCPUThresholdDifference

// defaultLoadBasedRebalancingInterval is how frequently to check the store-level
// balance of the cluster.
defaultLoadBasedRebalancingInterval = time.Minute
Expand Down Expand Up @@ -107,6 +131,27 @@ var QPSRebalanceThreshold = func() *settings.FloatSetting {
return s
}()

// CPURebalanceThreshold is the minimum ratio of a store's cpu time to the mean
// cpu time at which that store is considered overfull or underfull of cpu
// usage.
var CPURebalanceThreshold = func() *settings.FloatSetting {
s := settings.RegisterFloatSetting(
settings.SystemOnly,
"kv.allocator.store_cpu_rebalance_threshold",
"minimum fraction away from the mean a store's cpu usage can be before it is considered overfull or underfull",
0.10,
settings.NonNegativeFloat,
func(f float64) error {
if f < 0.01 {
return errors.Errorf("cannot set kv.allocator.store_cpu_rebalance_threshold to less than 0.01")
}
return nil
},
)
s.SetVisibility(settings.Public)
return s
}()

// LoadBasedRebalanceInterval controls how frequently each store checks for
// load-base lease/replica rebalancing opportunties.
var LoadBasedRebalanceInterval = settings.RegisterPublicDurationSettingWithExplicitUnit(
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/allocator/load/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load",
visibility = ["//visibility:public"],
deps = ["//pkg/util/humanizeutil"],
)

get_x_data(name = "get_x_data")
13 changes: 12 additions & 1 deletion pkg/kv/kvserver/allocator/load/dimension.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,21 @@

package load

import "fmt"
import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
)

// Dimension is a singe dimension of load that a component may track.
type Dimension int

const (
// Queries refers to the number of queries.
Queries Dimension = iota
// CPU refers to the cpu time (ns) used in processing.
CPU

nDimensionsTyped
nDimensions = int(nDimensionsTyped)
Expand All @@ -28,6 +35,8 @@ func (d Dimension) String() string {
switch d {
case Queries:
return "queries-per-second"
case CPU:
return "cpu-per-second"
default:
panic(fmt.Sprintf("cannot name: unknown dimension with ordinal %d", d))
}
Expand All @@ -38,6 +47,8 @@ func (d Dimension) Format(value float64) string {
switch d {
case Queries:
return fmt.Sprintf("%.1f", value)
case CPU:
return string(humanizeutil.Duration(time.Duration(int64(value))))
default:
panic(fmt.Sprintf("cannot format value: unknown dimension with ordinal %d", d))
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/kv/kvserver/allocator/load/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,29 @@ func ElementWiseProduct(a, b Load) Load {
return bimap(a, b, func(ai, bi float64) float64 { return ai * bi })
}

// Scale applies the factor given against every dimension.
func Scale(l Load, factor float64) Load {
return nmap(l, func(_ Dimension, li float64) float64 { return li * factor })
}

// Set returns a new Load with every dimension equal to the value given.
func Set(val float64) Load {
l := Vector{}
return nmap(l, func(_ Dimension, li float64) float64 { return val })
}

func bimap(a, b Load, op func(ai, bi float64) float64) Load {
mapped := Vector{}
for dim := Dimension(0); dim < Dimension(nDimensions); dim++ {
mapped[dim] = op(a.Dim(dim), b.Dim(dim))
}
return mapped
}

func nmap(l Load, op func(d Dimension, li float64) float64) Load {
mapped := Vector{}
for dim := Dimension(0); dim < Dimension(nDimensions); dim++ {
mapped[dim] = op(dim, l.Dim(dim))
}
return mapped
}
15 changes: 15 additions & 0 deletions pkg/kv/kvserver/allocator/range_usage_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,20 @@ type RangeRequestLocalityInfo struct {
func (r RangeUsageInfo) Load() load.Load {
dims := load.Vector{}
dims[load.Queries] = r.QueriesPerSecond
dims[load.CPU] = r.RequestCPUNanosPerSecond + r.RaftCPUNanosPerSecond
return dims
}

// TransferImpact returns the impact of transferring the lease for the range,
// given the usage information.
func (r RangeUsageInfo) TransferImpact() load.Load {
dims := load.Vector{}
dims[load.Queries] = r.QueriesPerSecond
// Only use the request recorded cpu. This assumes that all replicas will
// use the same amount of raft cpu - which may be dubious.
//
// TODO(kvoli): Look to separate out leaseholder vs replica cpu usage in
// accounting to account for follower reads if able.
dims[load.CPU] = r.RequestCPUNanosPerSecond
return dims
}
Loading