From cb4d32f671db8b37f440298ca89ccf36290f1bbd Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Tue, 24 Jan 2023 23:39:46 +0000 Subject: [PATCH] kvserver: introduce cpu rebalancing This patch allows the store rebalancer to use CPU in place of QPS when balancing load on a cluster. This patch adds `cpu` as an option with the cluster setting: `kv.allocator.load_based_rebalancing.objective` When set to `cpu`, rather than `qps`. The store rebalancer will perform a mostly identical function, however, it will target balancing the sum of all replica's cpu time on each store, rather than qps. The default remains as `qps` here. Similar to QPS, the rebalance threshold can be set to allow controlling the range above and below the mean store CPU is considered imbalanced, either overfull or underfull respectively: `kv.allocator.cpu_rebalance_threshold`: 0.1 In order to manage with mixed versions during upgrade and some architectures not supporting the cpu sampling method, a rebalance objective manager is introduced in `rebalance_objective.go`. The manager mediates access to the rebalance objective and overwrites it in cases where the objective set in the cluster setting cannot be supported. resolves: cockroachdb#95380 Release note (ops change): Add option to balance cpu time (cpu) instead of queries per second (qps) among stores in a cluster. This is done by setting `kv.allocator.load_based_rebalancing.objective='cpu'`. `kv.allocator.cpu_rebalance_threshold` is also added, similar to `kv.allocator.qps_rebalance_threshold` to control the target range for store cpu above and below the cluster mean. --- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 4 +- pkg/clusterversion/cockroach_versions.go | 9 +- pkg/kv/kvserver/BUILD.bazel | 2 + .../allocator/allocatorimpl/threshold.go | 16 + pkg/kv/kvserver/allocator/base.go | 45 ++ pkg/kv/kvserver/allocator/load/BUILD.bazel | 1 + pkg/kv/kvserver/allocator/load/dimension.go | 13 +- pkg/kv/kvserver/allocator/load/load.go | 19 + pkg/kv/kvserver/allocator/range_usage_info.go | 7 + .../storepool/override_store_pool.go | 7 + .../allocator/storepool/store_pool.go | 117 ++++- pkg/kv/kvserver/asim/config/settings.go | 8 +- .../asim/storerebalancer/store_rebalancer.go | 13 +- pkg/kv/kvserver/rebalance_objective.go | 279 ++++++++++ pkg/kv/kvserver/rebalance_objective_test.go | 267 ++++++++++ pkg/kv/kvserver/store.go | 70 ++- pkg/kv/kvserver/store_pool_test.go | 14 + pkg/kv/kvserver/store_rebalancer.go | 104 ++-- pkg/kv/kvserver/store_rebalancer_test.go | 493 +++++++++++------- pkg/roachpb/metadata.go | 1 + pkg/roachpb/metadata.proto | 4 + 22 files changed, 1225 insertions(+), 270 deletions(-) create mode 100644 pkg/kv/kvserver/rebalance_objective.go create mode 100644 pkg/kv/kvserver/rebalance_objective_test.go diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 9ef86cc407e2..1f05e57e5828 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -295,4 +295,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 1000022.2-38 set the active cluster version in the format '.' +version version 1000022.2-40 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 3d4de9bdf728..4dee1dc4ece0 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -45,9 +45,11 @@
jobs.retention_time
duration336h0m0sthe amount of time to retain records for completed jobs before
kv.allocator.load_based_lease_rebalancing.enabled
booleantrueset to enable rebalancing of range leases based on load and latency
kv.allocator.load_based_rebalancing
enumerationleases and replicaswhether to rebalance based on the distribution of load across stores [off = 0, leases = 1, leases and replicas = 2] +
kv.allocator.load_based_rebalancing.objective
enumerationqpswhat objective does the cluster use to rebalance; if set to `qps` the cluster will attempt to balance qps among stores, if set to `cpu` the cluster will attempt to balance cpu usage among stores [qps = 0, cpu = 1]
kv.allocator.load_based_rebalancing_interval
duration1m0sthe rough interval at which each store will check for load-based lease / replica rebalancing opportunities
kv.allocator.qps_rebalance_threshold
float0.1minimum fraction away from the mean a store's QPS (such as queries per second) can be before it is considered overfull or underfull
kv.allocator.range_rebalance_threshold
float0.05minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull +
kv.allocator.store_cpu_rebalance_threshold
float0.1minimum fraction away from the mean a store's cpu usage can be before it is considered overfull or underfull
kv.bulk_io_write.max_rate
byte size1.0 TiBthe rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops
kv.bulk_sst.max_allowed_overage
byte size64 MiBif positive, allowed size in excess of target size for SSTs from export requests; export requests (i.e. BACKUP) may buffer up to the sum of kv.bulk_sst.target_size and kv.bulk_sst.max_allowed_overage in memory
kv.bulk_sst.target_size
byte size16 MiBtarget size for SSTs emitted from export requests; export requests (i.e. BACKUP) may buffer up to the sum of kv.bulk_sst.target_size and kv.bulk_sst.max_allowed_overage in memory @@ -236,6 +238,6 @@
trace.opentelemetry.collector
stringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled
booleantrueif set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector
stringthe address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used. -
version
version1000022.2-38set the active cluster version in the format '<major>.<minor>' +
version
version1000022.2-40set the active cluster version in the format '<major>.<minor>' diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 69804ad68259..64a0e1f3d664 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -411,6 +411,10 @@ const ( // responsible for polling the jobs table for metrics. V23_1_CreateJobsMetricsPollingJob + // V23_1AllocatorCPUBalancing adds balancing CPU usage among stores using + // the allocator and store rebalancer. It assumes that at this version, + // stores now include their CPU in the StoreCapacity proto when gossiping. + V23_1AllocatorCPUBalancing // ************************************************* // Step (1): Add new versions here. // Do not add new versions to a patch release. @@ -708,7 +712,10 @@ var rawVersionsSingleton = keyedVersions{ Key: V23_1_CreateJobsMetricsPollingJob, Version: roachpb.Version{Major: 22, Minor: 2, Internal: 38}, }, - + { + Key: V23_1AllocatorCPUBalancing, + Version: roachpb.Version{Major: 22, Minor: 2, Internal: 40}, + }, // ************************************************* // Step (2): Add new versions here. // Do not add new versions to a patch release. diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 90f59c72125a..52badce4e5ce 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "raft_transport_metrics.go", "raft_truncator_replica.go", "range_log.go", + "rebalance_objective.go", "replica.go", "replica_app_batch.go", "replica_application_cmd.go", @@ -273,6 +274,7 @@ go_test( "raft_transport_test.go", "raft_transport_unit_test.go", "range_log_test.go", + "rebalance_objective_test.go", "replica_application_cmd_buf_test.go", "replica_application_state_machine_test.go", "replica_batch_updates_test.go", diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/threshold.go b/pkg/kv/kvserver/allocator/allocatorimpl/threshold.go index e570ed471a77..e93b1dc5b060 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/threshold.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/threshold.go @@ -27,6 +27,8 @@ func getLoadThreshold(dim load.Dimension, sv *settings.Values) float64 { switch dim { case load.Queries: return allocator.QPSRebalanceThreshold.Get(sv) + case load.CPU: + return allocator.CPURebalanceThreshold.Get(sv) default: panic(errors.AssertionFailedf("Unkown load dimension %d", dim)) } @@ -51,6 +53,8 @@ func getLoadMinThreshold(dim load.Dimension) float64 { switch dim { case load.Queries: return allocator.MinQPSThresholdDifference + case load.CPU: + return allocator.MinCPUThresholdDifference default: panic(errors.AssertionFailedf("Unkown load dimension %d", dim)) } @@ -76,6 +80,8 @@ func getLoadRebalanceMinRequiredDiff(dim load.Dimension, sv *settings.Values) fl switch dim { case load.Queries: return allocator.MinQPSDifferenceForTransfers.Get(sv) + case load.CPU: + return allocator.MinCPUDifferenceForTransfers default: panic(errors.AssertionFailedf("Unkown load dimension %d", dim)) } @@ -117,3 +123,13 @@ func MakeQPSOnlyDim(v float64) load.Load { dims[load.Queries] = v return dims } + +// WithAllDims returns a load vector with all dimensions filled in with the +// value given. +func WithAllDims(v float64) load.Load { + dims := load.Vector{} + for i := range dims { + dims[i] = v + } + return dims +} diff --git a/pkg/kv/kvserver/allocator/base.go b/pkg/kv/kvserver/allocator/base.go index 0f70f5b9a3bc..d9ddd122868b 100644 --- a/pkg/kv/kvserver/allocator/base.go +++ b/pkg/kv/kvserver/allocator/base.go @@ -37,6 +37,30 @@ const ( // lightly loaded clusters. MinQPSThresholdDifference = 100 + // MinCPUThresholdDifference is the minimum CPU difference from the cluster + // mean that this system should care about. The system won't attempt to + // take action if a store's CPU differs from the mean by less than this + // amount even if it is greater than the percentage threshold. This + // prevents too many lease transfers or range rebalances in lightly loaded + // clusters. + // + // NB: This represents 5% (1/20) utilization of 1 cpu on average. This + // number was arrived at from testing to minimize thrashing. This number is + // set independent of processor speed and assumes identical value of cpu + // time across all stores. i.e. all cpu's are identical. + MinCPUThresholdDifference = float64(50 * time.Millisecond) + + // MinCPUDifferenceForTransfers is the minimum CPU difference that a + // store rebalncer would care about to reconcile (via lease or replica + // rebalancing) between any two stores. + // + // NB: This is set to be two times the minimum threshold that a store needs + // to be above or below the mean to be considered overfull or underfull + // respectively. This is to make lease transfers and replica rebalances + // less sensistive to jitters in any given workload by introducing + // additional friction before taking these actions. + MinCPUDifferenceForTransfers = 2 * MinCPUThresholdDifference + // defaultLoadBasedRebalancingInterval is how frequently to check the store-level // balance of the cluster. defaultLoadBasedRebalancingInterval = time.Minute @@ -107,6 +131,27 @@ var QPSRebalanceThreshold = func() *settings.FloatSetting { return s }() +// CPURebalanceThreshold is the minimum ratio of a store's cpu time to the mean +// cpu time at which that store is considered overfull or underfull of cpu +// usage. +var CPURebalanceThreshold = func() *settings.FloatSetting { + s := settings.RegisterFloatSetting( + settings.SystemOnly, + "kv.allocator.store_cpu_rebalance_threshold", + "minimum fraction away from the mean a store's cpu usage can be before it is considered overfull or underfull", + 0.10, + settings.NonNegativeFloat, + func(f float64) error { + if f < 0.01 { + return errors.Errorf("cannot set kv.allocator.store_cpu_rebalance_threshold to less than 0.01") + } + return nil + }, + ) + s.SetVisibility(settings.Public) + return s +}() + // LoadBasedRebalanceInterval controls how frequently each store checks for // load-base lease/replica rebalancing opportunties. var LoadBasedRebalanceInterval = settings.RegisterPublicDurationSettingWithExplicitUnit( diff --git a/pkg/kv/kvserver/allocator/load/BUILD.bazel b/pkg/kv/kvserver/allocator/load/BUILD.bazel index 05873c3c89bf..a9cf0b67deaa 100644 --- a/pkg/kv/kvserver/allocator/load/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/load/BUILD.bazel @@ -10,6 +10,7 @@ go_library( ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load", visibility = ["//visibility:public"], + deps = ["//pkg/util/humanizeutil"], ) get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/allocator/load/dimension.go b/pkg/kv/kvserver/allocator/load/dimension.go index 3241f2ee1dae..b6d53c1d6bb1 100644 --- a/pkg/kv/kvserver/allocator/load/dimension.go +++ b/pkg/kv/kvserver/allocator/load/dimension.go @@ -10,7 +10,12 @@ package load -import "fmt" +import ( + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" +) // Dimension is a singe dimension of load that a component may track. type Dimension int @@ -18,6 +23,8 @@ type Dimension int const ( // Queries refers to the number of queries. Queries Dimension = iota + // CPU refers to the cpu time (ns) used in processing. + CPU nDimensionsTyped nDimensions = int(nDimensionsTyped) @@ -28,6 +35,8 @@ func (d Dimension) String() string { switch d { case Queries: return "queries-per-second" + case CPU: + return "cpu-per-second" default: panic(fmt.Sprintf("cannot name: unknown dimension with ordinal %d", d)) } @@ -38,6 +47,8 @@ func (d Dimension) Format(value float64) string { switch d { case Queries: return fmt.Sprintf("%.1f", value) + case CPU: + return string(humanizeutil.Duration(time.Duration(int64(value)))) default: panic(fmt.Sprintf("cannot format value: unknown dimension with ordinal %d", d)) } diff --git a/pkg/kv/kvserver/allocator/load/load.go b/pkg/kv/kvserver/allocator/load/load.go index 1fbc9f6f3064..3280e628e0ac 100644 --- a/pkg/kv/kvserver/allocator/load/load.go +++ b/pkg/kv/kvserver/allocator/load/load.go @@ -73,6 +73,17 @@ func ElementWiseProduct(a, b Load) Load { return bimap(a, b, func(ai, bi float64) float64 { return ai * bi }) } +// Scale applies the factor given against every dimension. +func Scale(l Load, factor float64) Load { + return nmap(l, func(_ Dimension, li float64) float64 { return li * factor }) +} + +// Set returns a new Load with every dimension equal to the value given. +func Set(val float64) Load { + l := Vector{} + return nmap(l, func(_ Dimension, li float64) float64 { return val }) +} + func bimap(a, b Load, op func(ai, bi float64) float64) Load { mapped := Vector{} for dim := Dimension(0); dim < Dimension(nDimensions); dim++ { @@ -80,3 +91,11 @@ func bimap(a, b Load, op func(ai, bi float64) float64) Load { } return mapped } + +func nmap(l Load, op func(d Dimension, li float64) float64) Load { + mapped := Vector{} + for dim := Dimension(0); dim < Dimension(nDimensions); dim++ { + mapped[dim] = op(dim, l.Dim(dim)) + } + return mapped +} diff --git a/pkg/kv/kvserver/allocator/range_usage_info.go b/pkg/kv/kvserver/allocator/range_usage_info.go index 7dbc75af6181..0ff3e28c4f1d 100644 --- a/pkg/kv/kvserver/allocator/range_usage_info.go +++ b/pkg/kv/kvserver/allocator/range_usage_info.go @@ -39,6 +39,7 @@ type RangeRequestLocalityInfo struct { func (r RangeUsageInfo) Load() load.Load { dims := load.Vector{} dims[load.Queries] = r.QueriesPerSecond + dims[load.CPU] = r.RequestCPUNanosPerSecond + r.RaftCPUNanosPerSecond return dims } @@ -50,5 +51,11 @@ func (r RangeUsageInfo) Load() load.Load { func (r RangeUsageInfo) TransferImpact() load.Load { dims := load.Vector{} dims[load.Queries] = r.QueriesPerSecond + // Only use the request recorded cpu. This assumes that all replicas will + // use the same amount of raft cpu - which may be dubious. + // + // TODO(kvoli): Look to separate out leaseholder vs replica cpu usage in + // accounting to account for follower reads if able. + dims[load.CPU] = r.RequestCPUNanosPerSecond return dims } diff --git a/pkg/kv/kvserver/allocator/storepool/override_store_pool.go b/pkg/kv/kvserver/allocator/storepool/override_store_pool.go index 3be89b60eed5..cc6338fc5700 100644 --- a/pkg/kv/kvserver/allocator/storepool/override_store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/override_store_pool.go @@ -216,3 +216,10 @@ func (o *OverrideStorePool) UpdateLocalStoreAfterRelocate( _ allocator.RangeUsageInfo, ) { } + +// SetOnCapacityChange installs a callback to be called when any store +// capacity changes in the storepool. This currently doesn't consider local +// updates (UpdateLocalStoreAfterRelocate, UpdateLocalStoreAfterRebalance, +// UpdateLocalStoresAfterLeaseTransfer) as capacity changes. +func (o *OverrideStorePool) SetOnCapacityChange(fn CapacityChangeFn) { +} diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index 1de1e0280e3b..ba7ba32131df 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -331,6 +331,13 @@ type localityWithString struct { str string } +// CapacityChangeFn is a function which may be called on capacity changes, by +// the storepool. +type CapacityChangeFn func( + storeID roachpb.StoreID, + old, cur roachpb.StoreCapacity, +) + // AllocatorStorePool provides an interface for use by the allocator to a list // of all known stores in the cluster and information on their health. type AllocatorStorePool interface { @@ -426,6 +433,12 @@ type AllocatorStorePool interface { localStore roachpb.StoreID, rangeUsageInfo allocator.RangeUsageInfo, ) + + // SetOnCapacityChange installs a callback to be called when any store + // capacity changes in the storepool. This currently doesn't consider local + // updates (UpdateLocalStoreAfterRelocate, UpdateLocalStoreAfterRebalance, + // UpdateLocalStoresAfterLeaseTransfer) as capacity changes. + SetOnCapacityChange(fn CapacityChangeFn) } // StorePool maintains a list of all known stores in the cluster and @@ -455,6 +468,11 @@ type StorePool struct { nodeLocalities map[roachpb.NodeID]localityWithString } + changeMu struct { + syncutil.Mutex + onChange []CapacityChangeFn + } + // OverrideIsStoreReadyForRoutineReplicaTransferFn, if set, is used in // IsStoreReadyForRoutineReplicaTransfer. This is defined as a closure reference here instead // of a regular method so it can be overridden in tests. @@ -487,6 +505,7 @@ func NewStorePool( } sp.DetailsMu.StoreDetails = make(map[roachpb.StoreID]*StoreDetail) sp.localitiesMu.nodeLocalities = make(map[roachpb.NodeID]localityWithString) + sp.changeMu.onChange = []CapacityChangeFn{} // Enable redundant callbacks for the store keys because we use these // callbacks as a clock to determine when a store was last updated even if it @@ -539,14 +558,24 @@ func (sp *StorePool) statusString(nl NodeLivenessFunc) string { // storeGossipUpdate is the Gossip callback used to keep the StorePool up to date. func (sp *StorePool) storeGossipUpdate(_ string, content roachpb.Value) { var storeDesc roachpb.StoreDescriptor + // We keep copies of the capacity and storeID to pass into the + // capacityChanged callback. + var oldCapacity, curCapacity roachpb.StoreCapacity + var storeID roachpb.StoreID + if err := content.GetProto(&storeDesc); err != nil { ctx := sp.AnnotateCtx(context.TODO()) log.Errorf(ctx, "%v", err) return } + storeID = storeDesc.StoreID + curCapacity = storeDesc.Capacity sp.DetailsMu.Lock() - detail := sp.GetStoreDetailLocked(storeDesc.StoreID) + detail := sp.GetStoreDetailLocked(storeID) + if detail.Desc != nil { + oldCapacity = detail.Desc.Capacity + } detail.Desc = &storeDesc detail.LastUpdatedTime = sp.clock.PhysicalTime() sp.DetailsMu.Unlock() @@ -555,6 +584,10 @@ func (sp *StorePool) storeGossipUpdate(_ string, content roachpb.Value) { sp.localitiesMu.nodeLocalities[storeDesc.Node.NodeID] = localityWithString{storeDesc.Node.Locality, storeDesc.Node.Locality.String()} sp.localitiesMu.Unlock() + + if oldCapacity != curCapacity { + sp.capacityChanged(storeID, curCapacity, oldCapacity) + } } // UpdateLocalStoreAfterRebalance is used to update the local copy of the @@ -573,11 +606,16 @@ func (sp *StorePool) UpdateLocalStoreAfterRebalance( // network). We can't update the local store at this time. return } + // Only apply the raft cpu delta on rebalance. This estimate assumes that + // the raft cpu usage is approximately equal across replicas for a range. switch changeType { case roachpb.ADD_VOTER, roachpb.ADD_NON_VOTER: detail.Desc.Capacity.RangeCount++ detail.Desc.Capacity.LogicalBytes += rangeUsageInfo.LogicalBytes detail.Desc.Capacity.WritesPerSecond += rangeUsageInfo.WritesPerSecond + if detail.Desc.Capacity.CPUPerSecond >= 0 { + detail.Desc.Capacity.CPUPerSecond += rangeUsageInfo.RaftCPUNanosPerSecond + } case roachpb.REMOVE_VOTER, roachpb.REMOVE_NON_VOTER: detail.Desc.Capacity.RangeCount-- if detail.Desc.Capacity.LogicalBytes <= rangeUsageInfo.LogicalBytes { @@ -590,6 +628,15 @@ func (sp *StorePool) UpdateLocalStoreAfterRebalance( } else { detail.Desc.Capacity.WritesPerSecond -= rangeUsageInfo.WritesPerSecond } + // When CPU attribution is unsupported, the store will set the + // CPUPerSecond of its store capacity to be -1. + if detail.Desc.Capacity.CPUPerSecond >= 0 { + if detail.Desc.Capacity.CPUPerSecond <= rangeUsageInfo.RaftCPUNanosPerSecond { + detail.Desc.Capacity.CPUPerSecond = 0 + } else { + detail.Desc.Capacity.CPUPerSecond -= rangeUsageInfo.RaftCPUNanosPerSecond + } + } default: return } @@ -618,10 +665,17 @@ func (sp *StorePool) UpdateLocalStoreAfterRelocate( sp.DetailsMu.Lock() defer sp.DetailsMu.Unlock() + // Only apply the raft cpu delta on rebalance. This estimate assumes that + // the raft cpu usage is approximately equal across replicas for a range. + // TODO(kvoli): Separate into LH vs Replica, similar to the comment on + // range_usage_info. updateTargets := func(targets []roachpb.ReplicationTarget) { for _, target := range targets { if toDetail := sp.GetStoreDetailLocked(target.StoreID); toDetail.Desc != nil { toDetail.Desc.Capacity.RangeCount++ + if toDetail.Desc.Capacity.CPUPerSecond >= 0 { + toDetail.Desc.Capacity.CPUPerSecond += rangeUsageInfo.RaftCPUNanosPerSecond + } } } } @@ -629,6 +683,16 @@ func (sp *StorePool) UpdateLocalStoreAfterRelocate( for _, old := range previous { if toDetail := sp.GetStoreDetailLocked(old.StoreID); toDetail.Desc != nil { toDetail.Desc.Capacity.RangeCount-- + // When CPU attribution is unsupported, the store will set the + // CPUPerSecond of its store capacity to be -1. + if toDetail.Desc.Capacity.CPUPerSecond < 0 { + continue + } + if toDetail.Desc.Capacity.CPUPerSecond <= rangeUsageInfo.RaftCPUNanosPerSecond { + toDetail.Desc.Capacity.CPUPerSecond = 0 + } else { + toDetail.Desc.Capacity.CPUPerSecond -= rangeUsageInfo.RaftCPUNanosPerSecond + } } } } @@ -655,6 +719,20 @@ func (sp *StorePool) UpdateLocalStoresAfterLeaseTransfer( } else { fromDetail.Desc.Capacity.QueriesPerSecond -= rangeUsageInfo.QueriesPerSecond } + // When CPU attribution is unsupported, the store will set the + // CPUPerSecond of its store capacity to be -1. + if fromDetail.Desc.Capacity.CPUPerSecond >= 0 { + // Only apply the request cpu (leaseholder + follower-reads) delta on + // transfers. Note this does not correctly account for follower reads + // remaining on the prior leaseholder after lease transfer. Instead, + // only a cpu delta specific to the lease should be applied. + if fromDetail.Desc.Capacity.CPUPerSecond <= rangeUsageInfo.RequestCPUNanosPerSecond { + fromDetail.Desc.Capacity.CPUPerSecond = 0 + } else { + fromDetail.Desc.Capacity.CPUPerSecond -= rangeUsageInfo.RequestCPUNanosPerSecond + } + } + sp.DetailsMu.StoreDetails[from] = &fromDetail } @@ -662,6 +740,11 @@ func (sp *StorePool) UpdateLocalStoresAfterLeaseTransfer( if toDetail.Desc != nil { toDetail.Desc.Capacity.LeaseCount++ toDetail.Desc.Capacity.QueriesPerSecond += rangeUsageInfo.QueriesPerSecond + // When CPU attribution is unsupported, the store will set the + // CPUPerSecond of its store capacity to be -1. + if toDetail.Desc.Capacity.CPUPerSecond >= 0 { + toDetail.Desc.Capacity.CPUPerSecond += rangeUsageInfo.RequestCPUNanosPerSecond + } sp.DetailsMu.StoreDetails[to] = &toDetail } } @@ -919,6 +1002,26 @@ func (sp *StorePool) liveAndDeadReplicasWithLiveness( return } +// SetOnCapacityChange installs a callback to be called when any store +// capacity changes in the storepool. This currently doesn't consider local +// updates (UpdateLocalStoreAfterRelocate, UpdateLocalStoreAfterRebalance, +// UpdateLocalStoresAfterLeaseTransfer) as capacity changes. +func (sp *StorePool) SetOnCapacityChange(fn CapacityChangeFn) { + sp.changeMu.Lock() + defer sp.changeMu.Unlock() + + sp.changeMu.onChange = append(sp.changeMu.onChange, fn) +} + +func (sp *StorePool) capacityChanged(storeID roachpb.StoreID, prev, cur roachpb.StoreCapacity) { + sp.changeMu.Lock() + defer sp.changeMu.Unlock() + + for _, fn := range sp.changeMu.onChange { + fn(storeID, prev, cur) + } +} + // Stat provides a running sample size and running stats. type Stat struct { n, Mean float64 @@ -948,6 +1051,10 @@ type StoreList struct { // to be rebalance targets. candidateLogicalBytes Stat + // CandidateCPU tracks store-cpu-per-second stats for Stores that are + // eligible to be rebalance targets. + CandidateCPU Stat + // CandidateQueriesPerSecond tracks queries-per-second stats for Stores that // are eligible to be rebalance targets. CandidateQueriesPerSecond Stat @@ -974,6 +1081,7 @@ func MakeStoreList(descriptors []roachpb.StoreDescriptor) StoreList { sl.CandidateQueriesPerSecond.update(desc.Capacity.QueriesPerSecond) sl.candidateWritesPerSecond.update(desc.Capacity.WritesPerSecond) sl.CandidateL0Sublevels.update(float64(desc.Capacity.L0Sublevels)) + sl.CandidateCPU.update(desc.Capacity.CPUPerSecond) } return sl } @@ -981,11 +1089,12 @@ func MakeStoreList(descriptors []roachpb.StoreDescriptor) StoreList { func (sl StoreList) String() string { var buf bytes.Buffer fmt.Fprintf(&buf, - " candidate: avg-ranges=%v avg-leases=%v avg-disk-usage=%v avg-queries-per-second=%v", + " candidate: avg-ranges=%v avg-leases=%v avg-disk-usage=%v avg-queries-per-second=%v avg-store-cpu-per-second=%v", sl.CandidateRanges.Mean, sl.CandidateLeases.Mean, humanizeutil.IBytes(int64(sl.candidateLogicalBytes.Mean)), sl.CandidateQueriesPerSecond.Mean, + humanizeutil.Duration(time.Duration(int64(sl.CandidateCPU.Mean))), ) if len(sl.Stores) > 0 { fmt.Fprintf(&buf, "\n") @@ -993,10 +1102,11 @@ func (sl StoreList) String() string { fmt.Fprintf(&buf, " ") } for _, desc := range sl.Stores { - fmt.Fprintf(&buf, " %d: ranges=%d leases=%d disk-usage=%s queries-per-second=%.2f l0-sublevels=%d\n", + fmt.Fprintf(&buf, " %d: ranges=%d leases=%d disk-usage=%s queries-per-second=%.2f store-cpu-per-second=%s l0-sublevels=%d\n", desc.StoreID, desc.Capacity.RangeCount, desc.Capacity.LeaseCount, humanizeutil.IBytes(desc.Capacity.LogicalBytes), desc.Capacity.QueriesPerSecond, + humanizeutil.Duration(time.Duration(int64(desc.Capacity.CPUPerSecond))), desc.Capacity.L0Sublevels, ) } @@ -1023,6 +1133,7 @@ func (sl StoreList) ExcludeInvalid(constraints []roachpb.ConstraintsConjunction) func (sl StoreList) LoadMeans() load.Load { dims := load.Vector{} dims[load.Queries] = sl.CandidateQueriesPerSecond.Mean + dims[load.CPU] = sl.CandidateCPU.Mean return dims } diff --git a/pkg/kv/kvserver/asim/config/settings.go b/pkg/kv/kvserver/asim/config/settings.go index 59a7d637af85..1206c4ed6d2a 100644 --- a/pkg/kv/kvserver/asim/config/settings.go +++ b/pkg/kv/kvserver/asim/config/settings.go @@ -30,7 +30,7 @@ const ( defaultLBRebalancingInterval = time.Minute defaultLBRebalanceQPSThreshold = 0.1 defaultLBMinRequiredQPSDiff = 200 - defaultLBRebalancingDimension = 0 // QPS + defaultLBRebalancingObjective = 0 // QPS ) var ( @@ -95,8 +95,8 @@ type SimulationSettings struct { // LBRebalancingMode controls if and when we do store-level rebalancing // based on load. It maps to kvserver.LBRebalancingMode. LBRebalancingMode int64 - // LBRebalancingDimension is the load dimension to balance. - LBRebalancingDimension int64 + // LBRebalancingObjective is the load objective to balance. + LBRebalancingObjective int64 // LBRebalancingInterval controls how often the store rebalancer will // consider opportunities for rebalancing. LBRebalancingInterval time.Duration @@ -127,7 +127,7 @@ func DefaultSimulationSettings() *SimulationSettings { SplitQPSThreshold: defaultSplitQPSThreshold, SplitQPSRetention: defaultSplitQPSRetention, LBRebalancingMode: defaultLBRebalancingMode, - LBRebalancingDimension: defaultLBRebalancingDimension, + LBRebalancingObjective: defaultLBRebalancingObjective, LBRebalancingInterval: defaultLBRebalancingInterval, LBRebalanceQPSThreshold: defaultLBRebalanceQPSThreshold, LBMinRequiredQPSDiff: defaultLBMinRequiredQPSDiff, diff --git a/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer.go b/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer.go index fb997e93baad..3c72760c5e24 100644 --- a/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer.go +++ b/pkg/kv/kvserver/asim/storerebalancer/store_rebalancer.go @@ -103,6 +103,7 @@ func newStoreRebalancerControl( allocator, storePool, getRaftStatusFn, + simRebalanceObjectiveProvider{settings}, ) sr.AddLogTag("s", storeID) @@ -118,7 +119,17 @@ func newStoreRebalancerControl( storepool: storePool, controller: controller, } +} + +// simRebalanceObjectiveProvider implements the +// kvserver.RebalanceObjectiveProvider interface. +type simRebalanceObjectiveProvider struct { + settings *config.SimulationSettings +} +// Objective returns the current rebalance objective. +func (s simRebalanceObjectiveProvider) Objective() kvserver.LBRebalancingObjective { + return kvserver.LBRebalancingObjective(s.settings.LBRebalancingObjective) } func (src *storeRebalancerControl) scorerOptions() *allocatorimpl.LoadScorerOptions { @@ -179,7 +190,7 @@ func (src *storeRebalancerControl) phasePrologue( ctx, src.scorerOptions(), hottestRanges( s, src.storeID, - kvserver.LBRebalancingDimension(src.settings.LBRebalancingDimension).ToDimension(), + kvserver.LBRebalancingObjective(src.settings.LBRebalancingObjective).ToDimension(), ), kvserver.LBRebalancingMode(src.settings.LBRebalancingMode), ) diff --git a/pkg/kv/kvserver/rebalance_objective.go b/pkg/kv/kvserver/rebalance_objective.go new file mode 100644 index 000000000000..4e159f17a3c7 --- /dev/null +++ b/pkg/kv/kvserver/rebalance_objective.go @@ -0,0 +1,279 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/grunning" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// LBRebalancingObjective controls the objective of load based rebalancing. +// This is used to both (1) define the types of load considered when +// determining how balanced the cluster is, and (2) select actions that improve +// balancing the given objective. Currently there are only two possible +// objectives: +// - qps which is the original default setting and looks at the number of batch +// requests on a range and store. +// - cpu which is added in 23.1 and looks at the cpu usage of a range and +// store. +type LBRebalancingObjective int64 + +const ( + // LBRebalancingQueries is a rebalancing objective that aims to balances + // queries (QPS) among stores in the cluster. The QPS per-store is + // calculated as the sum of every replica's QPS on the store. The QPS value + // per-replica is calculated as the average number of batch requests per + // second, the replica received over the last 30 minutes, or replica + // lifetime, whichever is shorter. A special case for the QPS calculation + // of a batch request exists for requests that contain AddSST requests, + // which are weighted by the size of the SST to be added (see #76252). When + // there are multiple stores per-node, the behavior doesn't change in + // comparison to single store per-node. + // + // When searching for rebalance actions, this objective estimates the + // impact of an action by using the QPS of the leaseholder replica invovled + // e.g. the impact of lease transfers on the stores invovled is + // +leaseholder replica QPS on the store that receives the lease and + // -leaseholder replica QPS on the store that removes the lease. + // + // This rebalancing objective tends to works well when the load of + // different batch requests in the cluster is uniform. e.g. there are only + // few types of requests which all exert approx the same load on the + // system. This rebalancing objective tends to perform poorly when the load + // of different batch requests in the cluster is non-uniform as balancing + // QPS does not correlate well with balancing load. + LBRebalancingQueries LBRebalancingObjective = iota + + // LBRebalancingCPU is a rebalance objective that aims balances the store + // CPU usage. The store CPU usage is calculated as the sum of replicas' cpu + // usage on the store. The CPU value per-replica is calculated as the + // average cpu usage per second, the replica used in processing over the + // last 30 minutes, or replica lifetime, whichever is shorter. When there + // are multiple stores per-node, the behavior doesn't change in comparison + // to single store per-node. That is, despite multiple stores sharing the + // same underling CPU, the objective attempts to balance CPU usage of each + // store on a node e.g. In a cluster where there is 1 node and 8 stores on + // the 1 node, the rebalance objective will rebalance leases and replicas + // so that the CPU usage is balanced between the 8 stores. + // + // When searching for rebalance actions, this objective estimates the + // impact of an action by either using all of the leaseholder replicas' CPU + // usage for transfer+rebalance and the foreground request cpu usage for + // just lease transfers. See allocator/range_usage_info.go. + // + // One alternative approach that was considered for the LBRebalancingCPU + // objective was to use the process CPU usage and balance each stores' + // process usage. The measured replica cpu usage is used only to determine + // which replica to rebalance, but not when to rebalance or who to + // rebalance to. This approach benefits from observing the "true" cpu + // usage, rather than just the sum of replica's usage. However, unlike the + // implemented approach, the estimated impact of actions was less reliable + // and had to be scaled to account for multi-store and missing cpu + // attribution. The implemented approach composes well in comparison to the + // process cpu approach. The sum of impact over available actions is equal + // to the store value being balanced, similar to LBRebalancingQueries. + LBRebalancingCPU +) + +// LoadBasedRebalancingObjective is a cluster setting that defines the load +// balancing objective of the cluster. +var LoadBasedRebalancingObjective = settings.RegisterEnumSetting( + settings.SystemOnly, + "kv.allocator.load_based_rebalancing.objective", + "what objective does the cluster use to rebalance; if set to `qps` "+ + "the cluster will attempt to balance qps among stores, if set to "+ + "`cpu` the cluster will attempt to balance cpu usage among stores", + "qps", + map[int64]string{ + int64(LBRebalancingQueries): "qps", + int64(LBRebalancingCPU): "cpu", + }, +).WithPublic() + +// ToDimension returns the equivalent allocator load dimension of a rebalancing +// objective. +// +// TODO(kvoli): It is currently the case that every LBRebalancingObjective maps +// uniquely to a load.Dimension. However, in the future it is forseeable that +// LBRebalancingObjective could be a value that encompassese many different +// dimensions within a single objective e.g. bytes written, cpu usage and +// storage availability. If this occurs, this ToDimension fn will no longer be +// appropriate for multi-dimension objectives. +func (d LBRebalancingObjective) ToDimension() load.Dimension { + switch d { + case LBRebalancingQueries: + return load.Queries + case LBRebalancingCPU: + return load.CPU + default: + panic("unknown dimension") + } +} + +// RebalanceObjectiveManager provides a method to get the rebalance objective +// of the cluster. It is possible that the cluster setting objective may not be +// the objective returned, when the cluster environment is unsupported or mixed +// versions exist. +type RebalanceObjectiveProvider interface { + // Objective returns the current rebalance objective. + Objective() LBRebalancingObjective +} + +// gossipStoreDescriptorProvider provides a method to get the store descriptors +// from the storepool, received via gossip. Expose a thin interface for the +// objective manager to use for easier testing. +type gossipStoreDescriptorProvider interface { + // GetStores returns information on all the stores with descriptor that + // have been recently seen in gossip. + GetStores() map[roachpb.StoreID]roachpb.StoreDescriptor +} + +// gossipStoreCapacityChangeNotifier provides a method to install a callback +// that will be called whenever the capacity of a store changes. Expose a thin +// interface for the objective manager to use for easier testing. +type gossipStoreCapacityChangeNotifier interface { + // SetOnCapacityChange installs a callback to be called when the store + // capacity changes. + SetOnCapacityChange(fn storepool.CapacityChangeFn) +} + +// RebalanceObjectiveManager implements the RebalanceObjectiveProvider +// interface and registers a callback at creation time, that will be called on +// a reblanace objective change. +type RebalanceObjectiveManager struct { + st *cluster.Settings + storeDescProvider gossipStoreDescriptorProvider + + mu struct { + syncutil.RWMutex + obj LBRebalancingObjective + // onChange callback registered will execute synchronously on the + // cluster settings thread that triggers an objective check. This is + // not good for large blocking operations. + onChange func(ctx context.Context, obj LBRebalancingObjective) + } +} + +func newRebalanceObjectiveManager( + ctx context.Context, + st *cluster.Settings, + onChange func(ctx context.Context, obj LBRebalancingObjective), + storeDescProvider gossipStoreDescriptorProvider, + capacityChangeNotifier gossipStoreCapacityChangeNotifier, +) *RebalanceObjectiveManager { + rom := &RebalanceObjectiveManager{st: st, storeDescProvider: storeDescProvider} + rom.mu.obj = ResolveLBRebalancingObjective(ctx, st, storeDescProvider.GetStores()) + rom.mu.onChange = onChange + + LoadBasedRebalancingObjective.SetOnChange(&rom.st.SV, func(ctx context.Context) { + rom.maybeUpdateRebalanceObjective(ctx) + }) + rom.st.Version.SetOnChange(func(ctx context.Context, _ clusterversion.ClusterVersion) { + rom.maybeUpdateRebalanceObjective(ctx) + }) + // Rather than caching each capacity locally, use the callback as a trigger + // to recalculate the objective. This is less expensive than recacluating + // the objective on every call to Objective, which would need to be done + // otherwise, just in case a new capacity has come in. This approach does + // have the downside of using the gossip callback goroutine to trigger the + // onChange callback, which iterates through every replica on the store. It + // is unlikely though that the conditions are satisfied (some node begins + // not supporting grunning or begin supporting grunning) to trigger the + // onChange callback here. + capacityChangeNotifier.SetOnCapacityChange( + func(storeID roachpb.StoreID, old, cur roachpb.StoreCapacity) { + if (old.CPUPerSecond < 0) != (cur.CPUPerSecond < 0) { + rom.maybeUpdateRebalanceObjective(ctx) + } + }) + + return rom +} + +// Objective returns the current rebalance objective. +func (rom *RebalanceObjectiveManager) Objective() LBRebalancingObjective { + rom.mu.RLock() + defer rom.mu.RUnlock() + + return rom.mu.obj +} + +func (rom *RebalanceObjectiveManager) maybeUpdateRebalanceObjective(ctx context.Context) { + rom.mu.Lock() + defer rom.mu.Unlock() + + prev := rom.mu.obj + new := ResolveLBRebalancingObjective(ctx, rom.st, rom.storeDescProvider.GetStores()) + // Nothing to do when the objective hasn't changed. + if prev == new { + return + } + + log.Infof(ctx, "Updating the rebalance objective from %s to %s", prev.ToDimension(), new.ToDimension()) + + rom.mu.obj = new + rom.mu.onChange(ctx, rom.mu.obj) +} + +// ResolveLBRebalancingObjective returns the load based rebalancing objective +// for the cluster. In cases where a first objective cannot be used, it will +// return a fallback. +func ResolveLBRebalancingObjective( + ctx context.Context, st *cluster.Settings, descs map[roachpb.StoreID]roachpb.StoreDescriptor, +) LBRebalancingObjective { + set := LoadBasedRebalancingObjective.Get(&st.SV) + // Queries should always be supported, return early if set. + if set == int64(LBRebalancingQueries) { + return LBRebalancingQueries + } + // When the cluster version hasn't finalized to 23.1, some unupgraded + // stores will not be populating additional fields in their StoreCapacity, + // in such cases we cannot balance another objective since the data may not + // exist. Fall back to QPS balancing. + if !st.Version.IsActive(ctx, clusterversion.V23_1AllocatorCPUBalancing) { + log.Infof(ctx, "version doesn't support cpu objective, reverting to qps balance objective") + return LBRebalancingQueries + } + // When the cpu timekeeping utility is unsupported on this aarch, the cpu + // usage cannot be gathered. Fall back to QPS balancing. + if !grunning.Supported() { + log.Infof(ctx, "cpu timekeeping unavailable on host, reverting to qps balance objective") + return LBRebalancingQueries + } + + // It is possible that the cputime utility isn't supported on a remote + // node's architecture, yet is supported locally on this node. If that is + // the case, the store's on the node will publish the cpu per second as -1 + // for their capacity to gossip. The -1 is special cased here and + // disallows any other store using the cpu balancing objective. + for _, desc := range descs { + if desc.Capacity.CPUPerSecond == -1 { + log.Warningf(ctx, + "cpu timekeeping unavailable on node %d but available locally, reverting to qps balance objective", + desc.Node.NodeID) + return LBRebalancingQueries + } + } + + // The cluster is on a supported version and this local store is on aarch + // which supported the cpu timekeeping utility, return the cluster setting + // as is. + return LBRebalancingObjective(set) +} diff --git a/pkg/kv/kvserver/rebalance_objective_test.go b/pkg/kv/kvserver/rebalance_objective_test.go new file mode 100644 index 000000000000..b16dc9c1480a --- /dev/null +++ b/pkg/kv/kvserver/rebalance_objective_test.go @@ -0,0 +1,267 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +// testingNotifierProvider implements the gossipStoreDescriptorProvider +// interface and gossipStoreCapacityChangeNotifier interface. +type testingProviderNotifier struct { + descMap map[roachpb.StoreID]roachpb.StoreDescriptor + onChange []func(roachpb.StoreID, roachpb.StoreCapacity, roachpb.StoreCapacity) +} + +// GetStores returns information on all the stores with descriptor in the pool. +func (tpn *testingProviderNotifier) GetStores() map[roachpb.StoreID]roachpb.StoreDescriptor { + return tpn.descMap +} + +func (tpn *testingProviderNotifier) set(desc roachpb.StoreDescriptor) { + old := tpn.descMap[desc.StoreID].Capacity + cur := desc.Capacity + tpn.descMap[desc.StoreID] = desc + + for _, fn := range tpn.onChange { + fn(desc.StoreID, old, cur) + } +} + +// SetOnCapacityChange installs a callback to be called when any store capacity +// changes. +func (tpn *testingProviderNotifier) SetOnCapacityChange(fn storepool.CapacityChangeFn) { + tpn.onChange = append(tpn.onChange, fn) +} + +func testMakeStoreDescWithCPU(storeID roachpb.StoreID, cpu float64) roachpb.StoreDescriptor { + return roachpb.StoreDescriptor{ + Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(storeID)}, + StoreID: storeID, + Capacity: roachpb.StoreCapacity{CPUPerSecond: cpu}, + } +} + +// testMakeProviderNotifier creates a mock implementation of the +// gossipStoreDescriptorProvider interface suitable for testing. +func testMakeProviderNotifier( + gossipStoreCPUValues map[roachpb.StoreID]float64, +) *testingProviderNotifier { + descMap := map[roachpb.StoreID]roachpb.StoreDescriptor{} + for storeID, cpu := range gossipStoreCPUValues { + descMap[storeID] = testMakeStoreDescWithCPU(storeID, cpu) + } + return &testingProviderNotifier{ + descMap, + []func(roachpb.StoreID, roachpb.StoreCapacity, roachpb.StoreCapacity){}, + } +} + +var allPositiveCPUMap = map[roachpb.StoreID]float64{1: 1, 2: 2, 3: 3} +var oneNegativeCPUMap = map[roachpb.StoreID]float64{1: 1, 2: 2, 3: -1} + +func TestLoadBasedRebalancingObjective(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + t.Run("latest version supports all rebalance objectives", func(t *testing.T) { + st := cluster.MakeTestingClusterSettings() + gossipStoreDescProvider := testMakeProviderNotifier(allPositiveCPUMap) + LoadBasedRebalancingObjective.Override(ctx, &st.SV, int64(LBRebalancingQueries)) + require.Equal(t, + LBRebalancingQueries, + ResolveLBRebalancingObjective(ctx, st, gossipStoreDescProvider.GetStores()), + ) + + LoadBasedRebalancingObjective.Override(ctx, &st.SV, int64(LBRebalancingCPU)) + require.Equal(t, + LBRebalancingCPU, + ResolveLBRebalancingObjective(ctx, st, gossipStoreDescProvider.GetStores()), + ) + + LoadBasedRebalancingObjective.Override(ctx, &st.SV, int64(LBRebalancingQueries)) + require.Equal(t, + LBRebalancingQueries, + ResolveLBRebalancingObjective(ctx, st, gossipStoreDescProvider.GetStores()), + ) + }) + + t.Run("older version only supports QPS", func(t *testing.T) { + st := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.ByKey(clusterversion.V22_2), + clusterversion.ByKey(clusterversion.V22_2), true) + gossipStoreDescProvider := testMakeProviderNotifier(allPositiveCPUMap) + LoadBasedRebalancingObjective.Override(ctx, &st.SV, int64(LBRebalancingQueries)) + require.Equal(t, + LBRebalancingQueries, + ResolveLBRebalancingObjective(ctx, st, gossipStoreDescProvider.GetStores()), + ) + + // Despite setting to CPU, only QPS should be returned. + LoadBasedRebalancingObjective.Override(ctx, &st.SV, int64(LBRebalancingCPU)) + require.Equal(t, + LBRebalancingQueries, + ResolveLBRebalancingObjective(ctx, st, gossipStoreDescProvider.GetStores()), + ) + }) + + t.Run("remote node set cpu to -1, signalling no support", func(t *testing.T) { + st := cluster.MakeTestingClusterSettings() + // The store with StoreID 3 has a -1 CPUPerSecond value, this indicates + // no support for grunning and the objective should revert to + // LBRebalancingQueries if the cluster setting is set to + // LBRebalancingCPU. + gossipStoreDescProvider := testMakeProviderNotifier(oneNegativeCPUMap) + LoadBasedRebalancingObjective.Override(ctx, &st.SV, int64(LBRebalancingQueries)) + require.Equal(t, + LBRebalancingQueries, + ResolveLBRebalancingObjective(ctx, st, gossipStoreDescProvider.GetStores()), + ) + + LoadBasedRebalancingObjective.Override(ctx, &st.SV, int64(LBRebalancingCPU)) + require.Equal(t, + LBRebalancingQueries, + ResolveLBRebalancingObjective(ctx, st, gossipStoreDescProvider.GetStores()), + ) + + LoadBasedRebalancingObjective.Override(ctx, &st.SV, int64(LBRebalancingQueries)) + require.Equal(t, + LBRebalancingQueries, + ResolveLBRebalancingObjective(ctx, st, gossipStoreDescProvider.GetStores()), + ) + }) +} + +func TestRebalanceObjectiveManager(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + makeTestManager := func( + st *cluster.Settings, + providerNotifier *testingProviderNotifier, + ) (*RebalanceObjectiveManager, *[]LBRebalancingObjective) { + callbacks := make([]LBRebalancingObjective, 0, 1) + cb := func(ctx context.Context, obj LBRebalancingObjective) { + callbacks = append(callbacks, obj) + } + return newRebalanceObjectiveManager( + ctx, st, cb, providerNotifier, providerNotifier, + ), &callbacks + } + + t.Run("latest version", func(t *testing.T) { + st := cluster.MakeTestingClusterSettings() + LoadBasedRebalancingObjective.Override(ctx, &st.SV, int64(LBRebalancingCPU)) + providerNotifier := testMakeProviderNotifier(allPositiveCPUMap) + manager, callbacks := makeTestManager(st, providerNotifier) + + // Initially expect the manager to have CPU set as the cluster + // setting as it is overriden above. + require.Equal(t, LBRebalancingCPU, manager.Objective()) + require.Len(t, *callbacks, 0) + + // Override the setting to be QPS, which will trigger a callback. + LoadBasedRebalancingObjective.Override(ctx, &st.SV, int64(LBRebalancingQueries)) + require.Equal(t, LBRebalancingQueries, manager.Objective()) + require.Len(t, *callbacks, 1) + require.Equal(t, LBRebalancingQueries, (*callbacks)[0]) + + // Override the setting again back to CPU, which will trigger a + // callback. + LoadBasedRebalancingObjective.Override(ctx, &st.SV, int64(LBRebalancingCPU)) + require.Equal(t, LBRebalancingCPU, manager.Objective()) + require.Len(t, *callbacks, 2) + require.Equal(t, LBRebalancingCPU, (*callbacks)[1]) + }) + + // After updating the active cluster version to an earlier version, the + // objective should change from CPU to QPS as CPU is not + // supported on this version. + t.Run("store cpu unsupported version", func(t *testing.T) { + st := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryVersion, + clusterversion.ByKey(clusterversion.V22_2), false) + require.NoError(t, st.Version.SetActiveVersion(ctx, clusterversion.ClusterVersion{ + Version: clusterversion.ByKey(clusterversion.V22_2), + })) + LoadBasedRebalancingObjective.Override(ctx, &st.SV, int64(LBRebalancingCPU)) + providerNotifier := testMakeProviderNotifier(allPositiveCPUMap) + manager, callbacks := makeTestManager(st, providerNotifier) + + // Initially expect the manager to have QPS set, despite the cluster + // setting being overriden to CPU. + require.Equal(t, LBRebalancingQueries, manager.Objective()) + require.Len(t, *callbacks, 0) + + // Override the setting to be CPU, which shouldn't trigger a + // callback as it isn't supported in this version. + LoadBasedRebalancingObjective.Override(ctx, &st.SV, int64(LBRebalancingCPU)) + require.Equal(t, LBRebalancingQueries, manager.Objective()) + require.Len(t, *callbacks, 0) + + // Update the active version to match the minimum version required for + // allowing CPU objective. This should trigger a callback and also + // update the objective to CPU. + require.NoError(t, st.Version.SetActiveVersion(ctx, clusterversion.ClusterVersion{ + Version: clusterversion.ByKey(clusterversion.V23_1AllocatorCPUBalancing), + })) + require.Equal(t, LBRebalancingCPU, manager.Objective()) + require.Len(t, *callbacks, 1) + require.Equal(t, LBRebalancingCPU, (*callbacks)[0]) + + // Override the setting to be QPS, which will trigger a callback after + // switching from CPU. + LoadBasedRebalancingObjective.Override(ctx, &st.SV, int64(LBRebalancingQueries)) + require.Equal(t, LBRebalancingQueries, manager.Objective()) + require.Len(t, *callbacks, 2) + require.Equal(t, LBRebalancingQueries, (*callbacks)[1]) + }) + + t.Run("latest version, remote node no cpu support", func(t *testing.T) { + st := cluster.MakeTestingClusterSettings() + LoadBasedRebalancingObjective.Override(ctx, &st.SV, int64(LBRebalancingCPU)) + providerNotifier := testMakeProviderNotifier(allPositiveCPUMap) + manager, callbacks := makeTestManager(st, providerNotifier) + + // Initially expect the manager to have CPU set as the cluster + // setting as it is overriden above. + require.Equal(t, LBRebalancingCPU, manager.Objective()) + require.Len(t, *callbacks, 0) + + // Add a new store to the map which has a CPU value set that indicates + // no support. This should trigger a callback as the objective reverts + // to LBRebalancingQueries which is supported. + providerNotifier.set(testMakeStoreDescWithCPU(4, -1)) + require.Equal(t, LBRebalancingQueries, manager.Objective()) + require.Len(t, *callbacks, 1) + require.Equal(t, LBRebalancingQueries, (*callbacks)[0]) + + // Update the store added above to have a non-negative cpu value + // indicating that it now supports cpu (not likely to happen in real + // life but possible). This should trigger another callback as the + // objective switches to LBRebalancingCPU. + providerNotifier.set(testMakeStoreDescWithCPU(4, 1)) + require.Equal(t, LBRebalancingCPU, manager.Objective()) + require.Len(t, *callbacks, 2) + require.Equal(t, LBRebalancingCPU, (*callbacks)[1]) + }) +} diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 72d8b09ebf6b..246016ba023d 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -69,6 +69,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/envutil" + "github.com/cockroachdb/cockroach/pkg/util/grunning" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/limit" @@ -739,23 +740,24 @@ type Store struct { raftLogQueue *raftLogQueue // Raft log truncation queue // Carries out truncations proposed by the raft log queue, and "replicated" // via raft, when they are safe. Created in Store.Start. - raftTruncator *raftLogTruncator - raftSnapshotQueue *raftSnapshotQueue // Raft repair queue - tsMaintenanceQueue *timeSeriesMaintenanceQueue // Time series maintenance queue - scanner *replicaScanner // Replica scanner - consistencyQueue *consistencyQueue // Replica consistency check queue - consistencyLimiter *quotapool.RateLimiter // Rate limits consistency checks - metrics *StoreMetrics - intentResolver *intentresolver.IntentResolver - recoveryMgr txnrecovery.Manager - syncWaiter *logstore.SyncWaiterLoop - raftEntryCache *raftentry.Cache - limiters batcheval.Limiters - txnWaitMetrics *txnwait.Metrics - sstSnapshotStorage SSTSnapshotStorage - protectedtsReader spanconfig.ProtectedTSReader - ctSender *sidetransport.Sender - storeGossip *StoreGossip + raftTruncator *raftLogTruncator + raftSnapshotQueue *raftSnapshotQueue // Raft repair queue + tsMaintenanceQueue *timeSeriesMaintenanceQueue // Time series maintenance queue + scanner *replicaScanner // Replica scanner + consistencyQueue *consistencyQueue // Replica consistency check queue + consistencyLimiter *quotapool.RateLimiter // Rate limits consistency checks + metrics *StoreMetrics + intentResolver *intentresolver.IntentResolver + recoveryMgr txnrecovery.Manager + syncWaiter *logstore.SyncWaiterLoop + raftEntryCache *raftentry.Cache + limiters batcheval.Limiters + txnWaitMetrics *txnwait.Metrics + sstSnapshotStorage SSTSnapshotStorage + protectedtsReader spanconfig.ProtectedTSReader + ctSender *sidetransport.Sender + storeGossip *StoreGossip + rebalanceObjManager *RebalanceObjectiveManager coalescedMu struct { syncutil.Mutex @@ -1201,6 +1203,18 @@ func NewStore( // store pool in those cases. allocatorStorePool = cfg.StorePool storePoolIsDeterministic = allocatorStorePool.IsDeterministic() + + s.rebalanceObjManager = newRebalanceObjectiveManager(ctx, s.cfg.Settings, + func(ctx context.Context, obj LBRebalancingObjective) { + s.VisitReplicas(func(r *Replica) (wantMore bool) { + r.loadBasedSplitter.Reset(s.Clock().PhysicalTime()) + return true + }) + }, + allocatorStorePool, /* storeDescProvider */ + allocatorStorePool, /* capacityChangeNotifier */ + ) + } if cfg.RPCContext != nil { s.allocator = allocatorimpl.MakeAllocator( @@ -1224,7 +1238,6 @@ func NewStore( } s.replRankings = NewReplicaRankings() - s.replRankingsByTenant = NewReplicaRankingsMap() s.raftRecvQueues.mon = mon.NewUnlimitedMonitor( @@ -1981,7 +1994,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { if s.replicateQueue != nil { s.storeRebalancer = NewStoreRebalancer( - s.cfg.AmbientCtx, s.cfg.Settings, s.replicateQueue, s.replRankings) + s.cfg.AmbientCtx, s.cfg.Settings, s.replicateQueue, s.replRankings, s.rebalanceObjManager) s.storeRebalancer.Start(ctx, s.stopper) } @@ -2563,11 +2576,11 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa var l0SublevelsMax int64 var totalQueriesPerSecond float64 var totalWritesPerSecond float64 + var totalStoreCPUTimePerSecond float64 replicaCount := s.metrics.ReplicaCount.Value() bytesPerReplica := make([]float64, 0, replicaCount) writesPerReplica := make([]float64, 0, replicaCount) - rankingsAccumulator := NewReplicaAccumulator( - LBRebalancingDimension(LoadBasedRebalancingDimension.Get(&s.ClusterSettings().SV)).ToDimension()) + rankingsAccumulator := NewReplicaAccumulator(s.rebalanceObjManager.Objective().ToDimension()) rankingsByTenantAccumulator := NewTenantReplicaAccumulator() // Query the current L0 sublevels and record the updated maximum to metrics. @@ -2585,6 +2598,7 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa // starts? We can't easily have a countdown as its value changes like for // leases/replicas. // TODO(a-robinson): Calculate percentiles for qps? Get rid of other percentiles? + totalStoreCPUTimePerSecond += usage.RequestCPUNanosPerSecond + usage.RaftCPUNanosPerSecond totalQueriesPerSecond += usage.QueriesPerSecond totalWritesPerSecond += usage.WritesPerSecond writesPerReplica = append(writesPerReplica, usage.WritesPerSecond) @@ -2596,9 +2610,23 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa rankingsByTenantAccumulator.AddReplica(cr) return true }) + + // It is possible that the cputime utility isn't supported on this node's + // architecture. If that is the case, we publish the cpu per second as -1 + // which is special cased on the receiving end and controls whether the cpu + // balancing objective is permitted. If this is not updated, the cpu per + // second will be zero and other stores will likely begin rebalancing + // towards this store as it will appear underfull. + if !grunning.Supported() { + totalStoreCPUTimePerSecond = -1 + } else { + totalStoreCPUTimePerSecond = math.Max(totalStoreCPUTimePerSecond, 0) + } + capacity.RangeCount = rangeCount capacity.LeaseCount = leaseCount capacity.LogicalBytes = logicalBytes + capacity.CPUPerSecond = totalStoreCPUTimePerSecond capacity.QueriesPerSecond = totalQueriesPerSecond capacity.WritesPerSecond = totalWritesPerSecond capacity.L0Sublevels = l0SublevelsMax diff --git a/pkg/kv/kvserver/store_pool_test.go b/pkg/kv/kvserver/store_pool_test.go index 8a05320d9d54..62891b30d8b7 100644 --- a/pkg/kv/kvserver/store_pool_test.go +++ b/pkg/kv/kvserver/store_pool_test.go @@ -78,7 +78,21 @@ func TestStorePoolUpdateLocalStore(t *testing.T) { }, }, } + callbacks := []roachpb.StoreID{} + sp.SetOnCapacityChange(func( + storeID roachpb.StoreID, + _, _ roachpb.StoreCapacity, + ) { + callbacks = append(callbacks, storeID) + }) + // Gossip the initial stores. There should trigger two callbacks as the + // capacity has changed from no capacity to a new capacity. sg.GossipStores(stores, t) + require.Len(t, callbacks, 2) + // Gossip the initial stores again, with the same capacity. This shouldn't + // trigger any callbacks as the capacity hasn't changed. + sg.GossipStores(stores, t) + require.Len(t, callbacks, 2) replica := Replica{RangeID: 1} replica.mu.Lock() diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index ee704a6687c5..14a4fabe11ae 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -83,20 +83,6 @@ var LoadBasedRebalancingMode = settings.RegisterEnumSetting( }, ).WithPublic() -// LoadBasedRebalancingDimension controls what dimension rebalancing takes into -// account. -// NB: This value is set to private on purpose, as this cluster setting is a -// noop at the moment. -var LoadBasedRebalancingDimension = settings.RegisterEnumSetting( - settings.SystemOnly, - "kv.allocator.load_based_rebalancing_dimension", - "what dimension of load does rebalancing consider", - "qps", - map[int64]string{ - int64(LBRebalancingQueries): "qps", - }, -) - // LBRebalancingMode controls if and when we do store-level rebalancing // based on load. type LBRebalancingMode int64 @@ -113,23 +99,6 @@ const ( LBRebalancingLeasesAndReplicas ) -// LBRebalancingDimension is the load dimension to balance. -type LBRebalancingDimension int64 - -const ( - // LBRebalancingQueries is a rebalancing mode that balances queries (QPS). - LBRebalancingQueries LBRebalancingDimension = iota -) - -func (d LBRebalancingDimension) ToDimension() load.Dimension { - switch d { - case LBRebalancingQueries: - return load.Queries - default: - panic("unknown dimension") - } -} - // RebalanceSearchOutcome returns the result of a rebalance target search. It // is used to determine whether to transition from lease to range based // rebalancing, exit early or apply a rebalancing action if a target is found. @@ -160,21 +129,26 @@ const ( // will best accomplish the store-level goals. type StoreRebalancer struct { log.AmbientContext - metrics StoreRebalancerMetrics - st *cluster.Settings - storeID roachpb.StoreID - allocator allocatorimpl.Allocator - storePool storepool.AllocatorStorePool - rr RangeRebalancer - replicaRankings *ReplicaRankings - getRaftStatusFn func(replica CandidateReplica) *raft.Status - processTimeoutFn func(replica CandidateReplica) time.Duration + metrics StoreRebalancerMetrics + st *cluster.Settings + storeID roachpb.StoreID + allocator allocatorimpl.Allocator + storePool storepool.AllocatorStorePool + rr RangeRebalancer + replicaRankings *ReplicaRankings + getRaftStatusFn func(replica CandidateReplica) *raft.Status + processTimeoutFn func(replica CandidateReplica) time.Duration + objectiveProvider RebalanceObjectiveProvider } // NewStoreRebalancer creates a StoreRebalancer to work in tandem with the // provided replicateQueue. func NewStoreRebalancer( - ambientCtx log.AmbientContext, st *cluster.Settings, rq *replicateQueue, rr *ReplicaRankings, + ambientCtx log.AmbientContext, + st *cluster.Settings, + rq *replicateQueue, + rr *ReplicaRankings, + objectiveProvider RebalanceObjectiveProvider, ) *StoreRebalancer { var storePool storepool.AllocatorStorePool if rq.store.cfg.StorePool != nil { @@ -195,6 +169,7 @@ func NewStoreRebalancer( processTimeoutFn: func(replica CandidateReplica) time.Duration { return rq.processTimeoutFunc(st, replica.Repl()) }, + objectiveProvider: objectiveProvider, } sr.AddLogTag("store-rebalancer", nil) rq.store.metrics.registry.AddMetricStruct(&sr.metrics) @@ -208,15 +183,17 @@ func SimulatorStoreRebalancer( alocator allocatorimpl.Allocator, storePool storepool.AllocatorStorePool, getRaftStatusFn func(replica CandidateReplica) *raft.Status, + objectiveProvider RebalanceObjectiveProvider, ) *StoreRebalancer { sr := &StoreRebalancer{ - AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(), - metrics: makeStoreRebalancerMetrics(), - st: &cluster.Settings{}, - storeID: storeID, - allocator: alocator, - storePool: storePool, - getRaftStatusFn: getRaftStatusFn, + AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(), + metrics: makeStoreRebalancerMetrics(), + st: &cluster.Settings{}, + storeID: storeID, + allocator: alocator, + storePool: storePool, + getRaftStatusFn: getRaftStatusFn, + objectiveProvider: objectiveProvider, } return sr } @@ -233,6 +210,17 @@ type RebalanceContext struct { hottestRanges, rebalanceCandidates []CandidateReplica } +// RebalanceMode returns the mode of the store rebalancer. See +// LoadBasedRebalancingMode. +func (sr *StoreRebalancer) RebalanceMode() LBRebalancingMode { + return LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV)) +} + +// RebalanceDimension returns the dimension the store rebalancer is balancing. +func (sr *StoreRebalancer) RebalanceObjective() LBRebalancingObjective { + return sr.objectiveProvider.Objective() +} + // LessThanMaxThresholds returns true if the local store is below the maximum // threshold w.r.t the balanced load dimension, false otherwise. func (r *RebalanceContext) LessThanMaxThresholds() bool { @@ -270,14 +258,18 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { timer.Reset(jitteredInterval(allocator.LoadBasedRebalanceInterval.Get(&sr.st.SV))) } - mode := LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV)) + // Once the rebalance mode and rebalance objective are defined for + // this loop, they are immutable and do not change. This avoids + // inconsistency where the rebalance objective changes and very + // different or contradicting actions are then taken. + mode := sr.RebalanceMode() if mode == LBRebalancingOff { continue } - hottestRanges := sr.replicaRankings.TopLoad() - options := sr.scorerOptions(ctx) - rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV))) + objective := sr.RebalanceObjective() + options := sr.scorerOptions(ctx, objective.ToDimension()) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, mode) sr.rebalanceStore(ctx, rctx) } }) @@ -288,8 +280,9 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { // `scorerOptions` here, which sets the range count rebalance threshold. // Instead, we use our own implementation of `scorerOptions` that promotes load // balance. -func (sr *StoreRebalancer) scorerOptions(ctx context.Context) *allocatorimpl.LoadScorerOptions { - lbDimension := LBRebalancingDimension(LoadBasedRebalancingDimension.Get(&sr.st.SV)).ToDimension() +func (sr *StoreRebalancer) scorerOptions( + ctx context.Context, lbDimension load.Dimension, +) *allocatorimpl.LoadScorerOptions { return &allocatorimpl.LoadScorerOptions{ StoreHealthOptions: sr.allocator.StoreHealthOptions(ctx), Deterministic: sr.storePool.IsDeterministic(), @@ -323,10 +316,9 @@ func (sr *StoreRebalancer) NewRebalanceContext( return nil } - dims := LBRebalancingDimension(LoadBasedRebalancingDimension.Get(&sr.st.SV)).ToDimension() return &RebalanceContext{ LocalDesc: localDesc, - loadDimension: dims, + loadDimension: options.LoadDims[0], options: options, mode: rebalancingMode, maxThresholds: allocatorimpl.OverfullLoadThresholds( diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go index 66089fabf487..8578fdf6c77b 100644 --- a/pkg/kv/kvserver/store_rebalancer_test.go +++ b/pkg/kv/kvserver/store_rebalancer_test.go @@ -17,6 +17,7 @@ import ( "reflect" "strings" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" @@ -57,6 +58,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 3000, + CPUPerSecond: 3000 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 10, }, }, @@ -75,6 +77,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 2800, + CPUPerSecond: 2800 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 5, }, }, @@ -93,6 +96,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 2600, + CPUPerSecond: 2600 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 2, }, }, @@ -111,6 +115,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 2400, + CPUPerSecond: 2400 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 10, }, }, @@ -129,6 +134,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 2200, + CPUPerSecond: 2200 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 3, }, }, @@ -147,6 +153,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 2000, + CPUPerSecond: 2000 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 2, }, }, @@ -165,6 +172,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1800, + CPUPerSecond: 1800 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 10, }, }, @@ -183,6 +191,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1600, + CPUPerSecond: 1600 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 5, }, }, @@ -201,6 +210,7 @@ var ( }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1400, + CPUPerSecond: 1400 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 3, }, }, @@ -215,6 +225,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 1}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1500, + CPUPerSecond: 1500 * float64(time.Millisecond), }, }, { @@ -222,6 +233,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 2}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1300, + CPUPerSecond: 1300 * float64(time.Millisecond), }, }, { @@ -229,6 +241,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 3}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1000, + CPUPerSecond: 1000 * float64(time.Millisecond), }, }, { @@ -236,6 +249,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 4}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 900, + CPUPerSecond: 900 * float64(time.Millisecond), }, }, { @@ -243,6 +257,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 5}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 500, + CPUPerSecond: 500 * float64(time.Millisecond), }, }, } @@ -257,6 +272,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 1}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1500, + CPUPerSecond: 1500 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 15, }, }, @@ -265,6 +281,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 2}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1300, + CPUPerSecond: 1300 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 10, }, }, @@ -273,6 +290,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 3}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1000, + CPUPerSecond: 1000 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 5, }, }, @@ -281,6 +299,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 4}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 900, + CPUPerSecond: 900 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 20, }, }, @@ -289,6 +308,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 5}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 500, + CPUPerSecond: 500 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 25, }, }, @@ -302,6 +322,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 1}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1000, + CPUPerSecond: 1000 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, }, }, @@ -310,6 +331,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 2}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1000, + CPUPerSecond: 1000 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 15, }, }, @@ -318,6 +340,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 3}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1000, + CPUPerSecond: 1000 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, }, }, @@ -326,6 +349,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 4}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1000, + CPUPerSecond: 1000 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold - 15, }, }, @@ -334,6 +358,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 5}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1000, + CPUPerSecond: 1000 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, }, }, @@ -347,6 +372,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 1}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1500, + CPUPerSecond: 1500 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, }, }, @@ -355,6 +381,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 2}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1300, + CPUPerSecond: 1300 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, }, }, @@ -363,6 +390,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 3}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1000, + CPUPerSecond: 1000 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, }, }, @@ -371,6 +399,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 4}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 900, + CPUPerSecond: 900 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, }, }, @@ -379,6 +408,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 5}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 500, + CPUPerSecond: 500 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, }, }, @@ -392,6 +422,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 1}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1500, + CPUPerSecond: 1500 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 1, }, }, @@ -400,6 +431,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 2}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1300, + CPUPerSecond: 1300 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 10, }, }, @@ -408,6 +440,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 3}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 1000, + CPUPerSecond: 1000 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 50, }, }, @@ -416,6 +449,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 4}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 900, + CPUPerSecond: 900 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, }, }, @@ -424,6 +458,7 @@ var ( Node: roachpb.NodeDescriptor{NodeID: 5}, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 500, + CPUPerSecond: 500 * float64(time.Millisecond), L0Sublevels: allocatorimpl.MaxL0SublevelThreshold + 100, }, }, @@ -433,7 +468,7 @@ var ( type testRange struct { // The first storeID in the list will be the leaseholder. voters, nonVoters []roachpb.StoreID - qps float64 + qps, reqCPU float64 } func loadRanges(rr *ReplicaRankings, s *Store, ranges []testRange, loadDimension load.Dimension) { @@ -469,6 +504,7 @@ func loadRanges(rr *ReplicaRankings, s *Store, ranges []testRange, loadDimension repl.mu.state.Stats = &enginepb.MVCCStats{} repl.loadStats = rload.NewReplicaLoad(s.Clock(), nil) repl.loadStats.TestingSetStat(rload.Queries, r.qps) + repl.loadStats.TestingSetStat(rload.ReqCPUNanos, r.reqCPU) acc.AddReplica(candidateReplica{ Replica: repl, @@ -478,6 +514,36 @@ func loadRanges(rr *ReplicaRankings, s *Store, ranges []testRange, loadDimension rr.Update(acc) } +// testRebalanceObjectiveProvider implements the RebalanceObjectiveProvider +// interface and should be used in testing only. +type testRebalanceObjectiveProvider struct { + objective LBRebalancingObjective +} + +func (t *testRebalanceObjectiveProvider) Objective() LBRebalancingObjective { + return t.objective +} + +func testWithDifferentRebalanceObjectives( + t *testing.T, + f func(t *testing.T), + objectiveProvider *testRebalanceObjectiveProvider, + objectives ...LBRebalancingObjective, +) func(*testing.T) { + return func(t *testing.T) { + for _, objective := range objectives { + objectiveProvider.objective = objective + t.Run(objective.ToDimension().String(), f) + } + } +} + +func withQPSCPU( + t *testing.T, objectiveProvider *testRebalanceObjectiveProvider, f func(t *testing.T), +) func(*testing.T) { + return testWithDifferentRebalanceObjectives(t, f, objectiveProvider, LBRebalancingQueries, LBRebalancingCPU) +} + func TestChooseLeaseToTransfer(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -495,6 +561,7 @@ func TestChooseLeaseToTransfer(t *testing.T) { }, ) defer stopper.Stop(context.Background()) + objectiveProvider := &testRebalanceObjectiveProvider{} gossiputil.NewStoreGossiper(g).GossipStores(noLocalityStores, t) localDesc := *noLocalityStores[0] cfg := TestStoreConfig(nil) @@ -505,7 +572,7 @@ func TestChooseLeaseToTransfer(t *testing.T) { rq := newReplicateQueue(s, a) rr := NewReplicaRankings() - sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr) + sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr, objectiveProvider) // Rather than trying to populate every Replica with a real raft group in // order to pass replicaIsBehind checks, fake out the function for getting @@ -516,12 +583,13 @@ func TestChooseLeaseToTransfer(t *testing.T) { testCases := []struct { storeIDs []roachpb.StoreID - qps float64 + qps, reqCPU float64 expectTarget roachpb.StoreID }{ { storeIDs: []roachpb.StoreID{1}, qps: 100, + reqCPU: 100 * float64(time.Millisecond), expectTarget: 0, }, @@ -529,29 +597,37 @@ func TestChooseLeaseToTransfer(t *testing.T) { // (1300 for stores 1 and 2) is close enough to the current leaseholder's // QPS (1500). { - storeIDs: []roachpb.StoreID{1, 2}, - qps: 100, + storeIDs: []roachpb.StoreID{1, 2}, + qps: 100, + // NB: This is set +50 above qps, as the minimum threshold + // difference for store cpu is 50ms, while it is 100 qps for + // queries per second. + reqCPU: 150 * float64(time.Millisecond), expectTarget: 0, }, { storeIDs: []roachpb.StoreID{1, 2}, qps: 1000, + reqCPU: 1000 * float64(time.Millisecond), expectTarget: 0, }, { storeIDs: []roachpb.StoreID{1, 4}, qps: 100, + reqCPU: 100 * float64(time.Millisecond), expectTarget: 4, }, { storeIDs: []roachpb.StoreID{1, 5}, qps: 100, + reqCPU: 100 * float64(time.Millisecond), expectTarget: 5, }, { storeIDs: []roachpb.StoreID{5, 1}, qps: 100, + reqCPU: 100 * float64(time.Millisecond), expectTarget: 0, }, { @@ -559,43 +635,54 @@ func TestChooseLeaseToTransfer(t *testing.T) { // be projected to have 1300 and 1200 qps respectively. storeIDs: []roachpb.StoreID{1, 3}, qps: 200, + reqCPU: 200 * float64(time.Millisecond), expectTarget: 3, }, { storeIDs: []roachpb.StoreID{1, 4}, qps: 200, + reqCPU: 200 * float64(time.Millisecond), expectTarget: 4, }, { storeIDs: []roachpb.StoreID{1, 5}, qps: 200, + reqCPU: 200 * float64(time.Millisecond), expectTarget: 5, }, { storeIDs: []roachpb.StoreID{1, 2}, qps: 500, + reqCPU: 500 * float64(time.Millisecond), expectTarget: 0, }, { storeIDs: []roachpb.StoreID{1, 3}, qps: 500, + reqCPU: 500 * float64(time.Millisecond), expectTarget: 0, }, // s1 without the lease would be projected to have 1000 qps, which is close // enough to s4's 900 qps. { - storeIDs: []roachpb.StoreID{1, 4}, - qps: 500, + storeIDs: []roachpb.StoreID{1, 4}, + qps: 500, + // NB: This is set +50 above qps, as the minimum threshold + // difference for store cpu is 50ms, while it is 100 qps for + // queries per second. + reqCPU: 550 * float64(time.Millisecond), expectTarget: 0, }, { storeIDs: []roachpb.StoreID{1, 5}, qps: 500, + reqCPU: 500 * float64(time.Millisecond), expectTarget: 5, }, { storeIDs: []roachpb.StoreID{1, 5}, qps: 600, + reqCPU: 600 * float64(time.Millisecond), expectTarget: 5, }, @@ -606,16 +693,19 @@ func TestChooseLeaseToTransfer(t *testing.T) { { storeIDs: []roachpb.StoreID{1, 5}, qps: 800, + reqCPU: 800 * float64(time.Millisecond), expectTarget: 5, }, { storeIDs: []roachpb.StoreID{1, 4, 5}, qps: 800, + reqCPU: 800 * float64(time.Millisecond), expectTarget: 5, }, { storeIDs: []roachpb.StoreID{1, 3, 4, 5}, qps: 800, + reqCPU: 800 * float64(time.Millisecond), expectTarget: 5, }, // NB: However, if s1 is projected to have 750 qps, we would expect a lease @@ -623,47 +713,56 @@ func TestChooseLeaseToTransfer(t *testing.T) { { storeIDs: []roachpb.StoreID{1, 3, 4, 5}, qps: 750, + reqCPU: 750 * float64(time.Millisecond), expectTarget: 5, }, { storeIDs: []roachpb.StoreID{1, 4}, qps: 1.5, + reqCPU: 1.5 * float64(time.Millisecond), expectTarget: 4, }, { storeIDs: []roachpb.StoreID{1, 5}, qps: 1.5, + reqCPU: 1.5 * float64(time.Millisecond), expectTarget: 5, }, { storeIDs: []roachpb.StoreID{1, 4}, qps: 1.49, + reqCPU: 1.49 * float64(time.Millisecond), expectTarget: 0, }, { storeIDs: []roachpb.StoreID{1, 5}, qps: 1.49, + reqCPU: 1.49 * float64(time.Millisecond), expectTarget: 0, }, { storeIDs: []roachpb.StoreID{1, 2, 3, 4}, qps: 1500, + reqCPU: 1500 * float64(time.Millisecond), expectTarget: 0, }, { storeIDs: []roachpb.StoreID{1, 2, 3, 4, 5}, qps: 1500, + reqCPU: 1500 * float64(time.Millisecond), expectTarget: 0, }, } for _, tc := range testCases { - t.Run("", func(t *testing.T) { - loadRanges(rr, s, []testRange{{voters: tc.storeIDs, qps: tc.qps}}, load.Queries) + t.Run("", withQPSCPU(t, objectiveProvider, func(t *testing.T) { + lbRebalanceDimension := objectiveProvider.Objective().ToDimension() + loadRanges(rr, s, []testRange{{voters: tc.storeIDs, qps: tc.qps, reqCPU: tc.reqCPU}}, lbRebalanceDimension) hottestRanges := sr.replicaRankings.TopLoad() - options := sr.scorerOptions(ctx) - rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV))) + options := sr.scorerOptions(ctx, lbRebalanceDimension) + options.LoadThreshold = allocatorimpl.WithAllDims(0.1) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) _, target, _ := sr.chooseLeaseToTransfer( ctx, rctx, @@ -672,35 +771,33 @@ func TestChooseLeaseToTransfer(t *testing.T) { t.Errorf("got target store %d for range with replicas %v and %f qps; want %d", target.StoreID, tc.storeIDs, tc.qps, tc.expectTarget) } - }) + })) } } func randomNoLocalityStores( - numNodes int, qpsMultiplier float64, -) (stores []*roachpb.StoreDescriptor, qpsMean float64) { - var totalQPS float64 + numNodes int, loadMultiplier float64, +) (stores []*roachpb.StoreDescriptor) { for i := 1; i <= numNodes; i++ { - qps := rand.Float64() * qpsMultiplier + load := rand.Float64() * loadMultiplier stores = append( stores, &roachpb.StoreDescriptor{ StoreID: roachpb.StoreID(i), Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)}, - Capacity: roachpb.StoreCapacity{QueriesPerSecond: qps}, + Capacity: roachpb.StoreCapacity{QueriesPerSecond: load, CPUPerSecond: load * float64(time.Millisecond)}, }, ) - totalQPS = totalQPS + qps } - return stores, totalQPS / float64(numNodes) + return stores } func logSummary( - ctx context.Context, allStores, deadStores []*roachpb.StoreDescriptor, meanQPS float64, + ctx context.Context, allStores, deadStores []*roachpb.StoreDescriptor, mean load.Load, ) { var summary strings.Builder for _, store := range allStores { summary.WriteString( - fmt.Sprintf("s%d: %.2f qps", store.StoreID, store.Capacity.QueriesPerSecond), + fmt.Sprintf("s%d: %s", store.StoreID, store.Capacity.Load()), ) for _, dead := range deadStores { if dead.StoreID == store.StoreID { @@ -709,49 +806,57 @@ func logSummary( } summary.WriteString("\n") } - summary.WriteString(fmt.Sprintf("overall-mean: %.2f", meanQPS)) + summary.WriteString(fmt.Sprintf("overall-mean: %s", mean)) log.Infof(ctx, "generated random store list:\n%s", summary.String()) } func TestChooseRangeToRebalanceRandom(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + objectiveProvider := &testRebalanceObjectiveProvider{} const ( numIterations = 10 - qpsMultiplier = 2000 - numVoters = 3 - numNonVoters = 3 - numNodes = 12 - numDeadNodes = 3 - perReplicaQPS = 100 - qpsRebalanceThreshold = 0.25 - - epsilon = 1 + qpsMultiplier = 2000 + numVoters = 3 + numNonVoters = 3 + numNodes = 12 + numDeadNodes = 3 + perReplicaQPS = 100 + perReplicaReqCPU = 100 * float64(time.Millisecond) + rebalanceThreshold = 0.25 ) + epsilon := load.Set(1) for i := 0; i < numIterations; i++ { - t.Run(fmt.Sprintf("%d", i+1), func(t *testing.T) { + t.Run(fmt.Sprintf("%d", i+1), withQPSCPU(t, objectiveProvider, func(t *testing.T) { ctx := context.Background() stopper, g, sp, a, _ := allocatorimpl.CreateTestAllocator(ctx, numNodes, false /* deterministic */) defer stopper.Stop(context.Background()) - stores, actualQPSMean := randomNoLocalityStores(numNodes, qpsMultiplier) - deadStores := stores[len(stores)-numDeadNodes:] - logSummary(ctx, stores, deadStores, actualQPSMean) - meanQPS := func(targets []roachpb.StoreID) float64 { - var totalQPS float64 + stores := randomNoLocalityStores(numNodes, qpsMultiplier) + meanLoad := func(targets []roachpb.StoreID) load.Load { + total := load.Vector{} for _, store := range stores { for _, target := range targets { if target == store.StoreID { - totalQPS = totalQPS + store.Capacity.QueriesPerSecond + targetLoad := store.Capacity.Load() + total[load.Queries] += targetLoad.Dim(load.Queries) + total[load.CPU] += targetLoad.Dim(load.CPU) break } } } - return totalQPS / float64(len(stores)) + return load.Scale(total, 1/float64(len(stores))) + } + + storeIDs := make([]roachpb.StoreID, len(stores)) + for i := range stores { + storeIDs[i] = stores[i].StoreID } + deadStores := stores[len(stores)-numDeadNodes:] + logSummary(ctx, stores, deadStores, meanLoad(storeIDs)) // Test setup boilerplate. gossiputil.NewStoreGossiper(g).GossipStores(stores, t) @@ -762,7 +867,8 @@ func TestChooseRangeToRebalanceRandom(t *testing.T) { s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID} rq := newReplicateQueue(s, a) rr := NewReplicaRankings() - sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr) + sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr, objectiveProvider) + // Rather than trying to populate every Replica with a real raft group in // order to pass replicaIsBehind checks, fake out the function for getting // raft status with one that always returns all replicas as up to date. @@ -789,19 +895,19 @@ func TestChooseRangeToRebalanceRandom(t *testing.T) { for i := numVoters; i < numVoters+numNonVoters; i++ { nonVoterStores = append(nonVoterStores, stores[i].StoreID) } + + lbRebalanceDimension := sr.RebalanceObjective().ToDimension() loadRanges( rr, s, []testRange{ - {voters: voterStores, nonVoters: nonVoterStores, qps: perReplicaQPS}, - }, load.Queries, + {voters: voterStores, nonVoters: nonVoterStores, qps: perReplicaQPS, reqCPU: perReplicaReqCPU}, + }, lbRebalanceDimension, ) hottestRanges := sr.replicaRankings.TopLoad() - options := sr.scorerOptions(ctx) - rctx := sr.NewRebalanceContext( - ctx, options, hottestRanges, - LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV))) + options := sr.scorerOptions(ctx, lbRebalanceDimension) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) rctx.options.StoreHealthOptions = allocatorimpl.StoreHealthOptions{EnforcementLevel: allocatorimpl.StoreHealthNoAction} - rctx.options.LoadThreshold = allocatorimpl.MakeQPSOnlyDim(qpsRebalanceThreshold) + rctx.options.LoadThreshold = allocatorimpl.WithAllDims(rebalanceThreshold) _, voterTargets, nonVoterTargets := sr.chooseRangeToRebalance(ctx, rctx) var rebalancedVoterStores, rebalancedNonVoterStores []roachpb.StoreID @@ -813,35 +919,35 @@ func TestChooseRangeToRebalanceRandom(t *testing.T) { } log.Infof( ctx, - "rebalanced voters from %v to %v: %.2f qps -> %.2f qps", + "rebalanced voters from %v to %v: %s -> %s", voterStores, voterTargets, - meanQPS(voterStores), - meanQPS(rebalancedVoterStores), + meanLoad(voterStores), + meanLoad(rebalancedVoterStores), ) log.Infof( ctx, - "rebalanced non-voters from %v to %v: %.2f qps -> %.2f qps", + "rebalanced non-voters from %v to %v: %s -> %s", nonVoterStores, nonVoterTargets, - meanQPS(nonVoterStores), - meanQPS(rebalancedNonVoterStores), + meanLoad(nonVoterStores), + meanLoad(rebalancedNonVoterStores), ) - if r, o := meanQPS(rebalancedVoterStores), meanQPS(voterStores); r-o > epsilon { - t.Errorf("voters were rebalanced onto a set of stores with higher QPS (%.2f to %.2f)", o, r) + if r, o := meanLoad(rebalancedVoterStores), meanLoad(voterStores); load.Greater(load.Sub(r, o), epsilon, objectiveProvider.Objective().ToDimension()) { + t.Errorf("voters were rebalanced onto a set of stores with higher load (%s to %s)", o, r) } - previousMean := meanQPS(append(voterStores, nonVoterStores...)) - newMean := meanQPS(append(rebalancedVoterStores, rebalancedNonVoterStores...)) + previousMean := meanLoad(append(voterStores, nonVoterStores...)) + newMean := meanLoad(append(rebalancedVoterStores, rebalancedNonVoterStores...)) log.Infof( ctx, - "rebalanced range from stores with %.2f average qps to %.2f average qps", + "rebalanced range from stores with %s average load to %s average load", previousMean, newMean, ) - if newMean-previousMean > epsilon { - t.Errorf("replicas were rebalanced onto a set of stores with higher QPS (%.2f to %.2f)", previousMean, newMean) + if load.Greater(load.Sub(newMean, previousMean), epsilon, objectiveProvider.Objective().ToDimension()) { + t.Errorf("replicas were rebalanced onto a set of stores with higher load (%.2f to %.2f)", previousMean, newMean) } - }) + })) } } @@ -853,6 +959,8 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) + objectiveProvider := &testRebalanceObjectiveProvider{} + constraint := func(region string) roachpb.Constraint { return roachpb.Constraint{ Type: roachpb.Constraint_REQUIRED, @@ -1091,7 +1199,7 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { }, } for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { + t.Run(tc.name, withQPSCPU(t, objectiveProvider, func(t *testing.T) { // Boilerplate for test setup. testingKnobs := allocator.TestingKnobs{RaftStatusFn: TestingRaftStatusFn} stopper, g, sp, a, _ := allocatorimpl.CreateTestAllocatorWithKnobs(ctx, 10, false /* deterministic */, &testingKnobs) @@ -1111,7 +1219,7 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { rq := newReplicateQueue(s, a) rr := NewReplicaRankings() - sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr) + sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr, objectiveProvider) // Rather than trying to populate every Replica with a real raft group in // order to pass replicaIsBehind checks, fake out the function for getting @@ -1125,18 +1233,21 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { s.cfg.DefaultSpanConfig.VoterConstraints = tc.voterConstraints s.cfg.DefaultSpanConfig.LeasePreferences = tc.leasePreferences const testingQPS = float64(60) + const testingReqCPU = 60 * float64(time.Millisecond) + + lbRebalanceDimension := sr.RebalanceObjective().ToDimension() loadRanges( rr, s, []testRange{ - {voters: tc.voters, nonVoters: tc.nonVoters, qps: testingQPS}, - }, load.Queries, + {voters: tc.voters, nonVoters: tc.nonVoters, qps: testingQPS, reqCPU: testingReqCPU}, + }, lbRebalanceDimension, ) hottestRanges := sr.replicaRankings.TopLoad() - options := sr.scorerOptions(ctx) + options := sr.scorerOptions(ctx, lbRebalanceDimension) rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, LBRebalancingLeasesAndReplicas) rctx.options.StoreHealthOptions = allocatorimpl.StoreHealthOptions{ EnforcementLevel: allocatorimpl.StoreHealthBlockRebalanceTo} - rctx.options.LoadThreshold = allocatorimpl.MakeQPSOnlyDim(0.05) + rctx.options.LoadThreshold = allocatorimpl.WithAllDims(0.05) _, voterTargets, nonVoterTargets := sr.chooseRangeToRebalance( ctx, @@ -1145,8 +1256,8 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { require.Len(t, voterTargets, len(tc.expRebalancedVoters)) if len(voterTargets) > 0 && voterTargets[0].StoreID != tc.expRebalancedVoters[0] { - t.Errorf("chooseRangeToRebalance(existing=%v, qps=%f) chose s%d as leaseholder; want s%v", - tc.voters, testingQPS, voterTargets[0], tc.expRebalancedVoters[0]) + t.Errorf("chooseRangeToRebalance(existing=%v, qps=%f, cpu=%f) chose s%d as leaseholder; want s%v", + tc.voters, testingQPS, testingReqCPU, voterTargets[0], tc.expRebalancedVoters[0]) } voterStoreIDs := make([]roachpb.StoreID, len(voterTargets)) @@ -1166,7 +1277,7 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { nonVoterStoreIDs[i] = target.StoreID } require.ElementsMatch(t, nonVoterStoreIDs, tc.expRebalancedNonVoters) - }) + })) } } @@ -1191,40 +1302,49 @@ func TestChooseRangeToRebalanceIgnoresRangeOnBestStores(t *testing.T) { ) defer stopper.Stop(context.Background()) - localDesc := *noLocalityStores[len(noLocalityStores)-1] - cfg := TestStoreConfig(nil) - cfg.Gossip = g - cfg.StorePool = sp - cfg.DefaultSpanConfig.NumVoters = 1 - cfg.DefaultSpanConfig.NumReplicas = 1 - s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) - gossiputil.NewStoreGossiper(cfg.Gossip).GossipStores(noLocalityStores, t) - s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID} - rq := newReplicateQueue(s, a) - rr := NewReplicaRankings() + objectiveProvider := &testRebalanceObjectiveProvider{} + withQPSCPU(t, objectiveProvider, func(t *testing.T) { + localDesc := *noLocalityStores[len(noLocalityStores)-1] + cfg := TestStoreConfig(nil) + cfg.Gossip = g + cfg.StorePool = sp + cfg.DefaultSpanConfig.NumVoters = 1 + cfg.DefaultSpanConfig.NumReplicas = 1 + s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) + gossiputil.NewStoreGossiper(cfg.Gossip).GossipStores(noLocalityStores, t) + s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID} + rq := newReplicateQueue(s, a) + rr := NewReplicaRankings() + + sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr, objectiveProvider) + lbRebalanceDimension := sr.RebalanceObjective().ToDimension() + + // Load a fake hot range that's already on the best stores. We want to ensure + // that the store rebalancer doesn't attempt to rebalance ranges that it + // cannot find better rebalance opportunities for. + loadRanges(rr, s, + []testRange{ + {voters: []roachpb.StoreID{localDesc.StoreID}, + qps: 100, + reqCPU: 100 * float64(time.Millisecond)}, + }, + lbRebalanceDimension, + ) - sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr) - - // Load a fake hot range that's already on the best stores. We want to ensure - // that the store rebalancer doesn't attempt to rebalance ranges that it - // cannot find better rebalance opportunities for. - loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{localDesc.StoreID}, qps: 100}}, load.Queries) - - hottestRanges := sr.replicaRankings.TopLoad() - options := sr.scorerOptions(ctx) - rctx := sr.NewRebalanceContext( - ctx, options, hottestRanges, - LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV))) - rctx.options.StoreHealthOptions = allocatorimpl.StoreHealthOptions{ - EnforcementLevel: allocatorimpl.StoreHealthNoAction} - rctx.options.LoadThreshold = allocatorimpl.MakeQPSOnlyDim(0.05) - - sr.chooseRangeToRebalance(ctx, rctx) - trace := finishAndGetRecording() - require.Regexpf( - t, "could not find.*opportunities for r1", - trace, "expected the store rebalancer to explicitly ignore r1; but found %s", trace, - ) + hottestRanges := sr.replicaRankings.TopLoad() + options := sr.scorerOptions(ctx, lbRebalanceDimension) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) + rctx.options.StoreHealthOptions = allocatorimpl.StoreHealthOptions{ + EnforcementLevel: allocatorimpl.StoreHealthNoAction} + rctx.options.LoadThreshold = allocatorimpl.WithAllDims(0.05) + + sr.chooseRangeToRebalance(ctx, rctx) + trace := finishAndGetRecording() + require.Regexpf( + t, "could not find.*opportunities for r1", + trace, "expected the store rebalancer to explicitly ignore r1; but found %s", trace, + ) + }) } func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { @@ -1232,6 +1352,7 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() + objectiveProvider := &testRebalanceObjectiveProvider{} imbalancedStores := []*roachpb.StoreDescriptor{ { StoreID: 1, @@ -1240,6 +1361,7 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 12000, + CPUPerSecond: 12000 * float64(time.Millisecond), }, }, { @@ -1249,6 +1371,7 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 10000, + CPUPerSecond: 10000 * float64(time.Millisecond), }, }, { @@ -1258,6 +1381,7 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 8000, + CPUPerSecond: 8000 * float64(time.Millisecond), }, }, { @@ -1267,6 +1391,7 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 200, + CPUPerSecond: 200 * float64(time.Millisecond), }, }, { @@ -1276,18 +1401,20 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { }, Capacity: roachpb.StoreCapacity{ QueriesPerSecond: 100, + CPUPerSecond: 100 * float64(time.Millisecond), }, }, } for _, tc := range []struct { - voters, expRebalancedVoters []roachpb.StoreID - QPS, rebalanceThreshold float64 - shouldRebalance bool + voters, expRebalancedVoters []roachpb.StoreID + QPS, reqCPU, rebalanceThreshold float64 + shouldRebalance bool }{ { voters: []roachpb.StoreID{1, 2, 3}, expRebalancedVoters: []roachpb.StoreID{3, 4, 5}, QPS: 5000, + reqCPU: 5000 * float64(time.Millisecond), rebalanceThreshold: 0.25, shouldRebalance: true, }, @@ -1295,6 +1422,7 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { voters: []roachpb.StoreID{1, 2, 3}, expRebalancedVoters: []roachpb.StoreID{5, 2, 3}, QPS: 5000, + reqCPU: 5000 * float64(time.Millisecond), rebalanceThreshold: 0.8, shouldRebalance: true, }, @@ -1302,12 +1430,14 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { voters: []roachpb.StoreID{1, 2, 3}, expRebalancedVoters: []roachpb.StoreID{3, 4, 5}, QPS: 1000, + reqCPU: 1000 * float64(time.Millisecond), rebalanceThreshold: 0.05, shouldRebalance: true, }, { voters: []roachpb.StoreID{1, 2, 3}, QPS: 5000, + reqCPU: 5000 * float64(time.Millisecond), // NB: This will lead to an overfull threshold of just above 12000. Thus, // no store should be considered overfull and we should not rebalance at // all. @@ -1317,6 +1447,7 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { { voters: []roachpb.StoreID{4}, QPS: 100, + reqCPU: 100 * float64(time.Millisecond), rebalanceThreshold: 0.01, // NB: We don't expect a rebalance here because the difference between s4 // and s5 is not high enough to justify a rebalance. @@ -1326,12 +1457,13 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { voters: []roachpb.StoreID{1, 2, 3}, expRebalancedVoters: []roachpb.StoreID{5, 2, 3}, QPS: 10000, + reqCPU: 10000 * float64(time.Millisecond), rebalanceThreshold: 0.01, // NB: s5 will be hotter than s1 after this move. shouldRebalance: true, }, } { - t.Run("", func(t *testing.T) { + t.Run("", withQPSCPU(t, objectiveProvider, func(t *testing.T) { stopper, g, sp, a, _ := allocatorimpl.CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(context.Background()) gossiputil.NewStoreGossiper(g).GossipStores(imbalancedStores, t) @@ -1351,7 +1483,8 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { rq := newReplicateQueue(s, a) rr := NewReplicaRankings() - sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr) + sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr, objectiveProvider) + lbRebalanceDimension := sr.RebalanceObjective().ToDimension() // Rather than trying to populate every Replica with a real raft group in // order to pass replicaIsBehind checks, fake out the function for getting @@ -1361,16 +1494,17 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { } s.cfg.DefaultSpanConfig.NumReplicas = int32(len(tc.voters)) - loadRanges(rr, s, []testRange{{voters: tc.voters, qps: tc.QPS}}, load.Queries) + loadRanges(rr, s, + []testRange{{voters: tc.voters, qps: tc.QPS, reqCPU: tc.reqCPU}}, + lbRebalanceDimension, + ) hottestRanges := sr.replicaRankings.TopLoad() - options := sr.scorerOptions(ctx) - rctx := sr.NewRebalanceContext( - ctx, options, hottestRanges, - LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV))) + options := sr.scorerOptions(ctx, lbRebalanceDimension) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) rctx.options.StoreHealthOptions = allocatorimpl.StoreHealthOptions{ EnforcementLevel: allocatorimpl.StoreHealthNoAction} - rctx.options.LoadThreshold = allocatorimpl.MakeQPSOnlyDim(tc.rebalanceThreshold) + rctx.options.LoadThreshold = allocatorimpl.WithAllDims(tc.rebalanceThreshold) _, voterTargets, _ := sr.chooseRangeToRebalance(ctx, rctx) require.Len(t, voterTargets, len(tc.expRebalancedVoters)) @@ -1383,7 +1517,7 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { if tc.shouldRebalance { require.ElementsMatch(t, voterStoreIDs, tc.expRebalancedVoters) } - }) + })) } } @@ -1435,66 +1569,64 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { ) defer stopper.Stop(context.Background()) - localDesc := *noLocalityStores[0] - cfg := TestStoreConfig(nil) - cfg.Gossip = g - cfg.StorePool = sp - s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) - gossiputil.NewStoreGossiper(cfg.Gossip).GossipStores(noLocalityStores, t) - s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID} - rq := newReplicateQueue(s, a) - rr := NewReplicaRankings() - - sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr) - sr.getRaftStatusFn = func(r CandidateReplica) *raft.Status { - return behindTestingRaftStatusFn(r) - } - - // Load in a range with replicas on an overfull node, a slightly underfull - // node, and a very underfull node. - loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 4, 5}, qps: 100}}, load.Queries) - - hottestRanges := sr.replicaRankings.TopLoad() - options := sr.scorerOptions(ctx) - rctx := sr.NewRebalanceContext( - ctx, options, hottestRanges, - LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV)), - ) - repl := rctx.hottestRanges[0] - - _, target, _ := sr.chooseLeaseToTransfer(ctx, rctx) - expectTarget := roachpb.StoreID(4) - if target.StoreID != expectTarget { - t.Errorf("got target store s%d for range with RaftStatus %v; want s%d", - target.StoreID, sr.getRaftStatusFn(repl), expectTarget) - } + objectiveProvider := &testRebalanceObjectiveProvider{} + withQPSCPU(t, objectiveProvider, func(t *testing.T) { + localDesc := *noLocalityStores[0] + cfg := TestStoreConfig(nil) + cfg.Gossip = g + cfg.StorePool = sp + s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) + gossiputil.NewStoreGossiper(cfg.Gossip).GossipStores(noLocalityStores, t) + s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID} + rq := newReplicateQueue(s, a) + rr := NewReplicaRankings() + + sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr, objectiveProvider) + sr.getRaftStatusFn = func(r CandidateReplica) *raft.Status { + return behindTestingRaftStatusFn(r) + } + lbRebalanceDimension := sr.RebalanceObjective().ToDimension() + + // Load in a range with replicas on an overfull node, a slightly underfull + // node, and a very underfull node. + loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 4, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}, lbRebalanceDimension) + + hottestRanges := sr.replicaRankings.TopLoad() + options := sr.scorerOptions(ctx, lbRebalanceDimension) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) + repl := rctx.hottestRanges[0] + + _, target, _ := sr.chooseLeaseToTransfer(ctx, rctx) + expectTarget := roachpb.StoreID(4) + if target.StoreID != expectTarget { + t.Errorf("got target store s%d for range with RaftStatus %v; want s%d", + target.StoreID, sr.getRaftStatusFn(repl), expectTarget) + } - // Then do the same, but for replica rebalancing. Make s5 an existing replica - // that's behind, and see how a new replica is preferred as the leaseholder - // over it. - loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100}}, load.Queries) + // Then do the same, but for replica rebalancing. Make s5 an existing replica + // that's behind, and see how a new replica is preferred as the leaseholder + // over it. + loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}, lbRebalanceDimension) - hottestRanges = sr.replicaRankings.TopLoad() - options = sr.scorerOptions(ctx) - rctx = sr.NewRebalanceContext( - ctx, options, hottestRanges, - LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV)), - ) - rctx.options.StoreHealthOptions = allocatorimpl.StoreHealthOptions{ - EnforcementLevel: allocatorimpl.StoreHealthNoAction} - rctx.options.LoadThreshold = allocatorimpl.MakeQPSOnlyDim(0.05) - rctx.options.Deterministic = true + hottestRanges = sr.replicaRankings.TopLoad() + options = sr.scorerOptions(ctx, lbRebalanceDimension) + rctx = sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) + rctx.options.StoreHealthOptions = allocatorimpl.StoreHealthOptions{ + EnforcementLevel: allocatorimpl.StoreHealthNoAction} + rctx.options.LoadThreshold = allocatorimpl.WithAllDims(0.05) + rctx.options.Deterministic = true - repl = rctx.hottestRanges[0] + repl = rctx.hottestRanges[0] - _, targets, _ := sr.chooseRangeToRebalance(ctx, rctx) - expectTargets := []roachpb.ReplicationTarget{ - {NodeID: 4, StoreID: 4}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, - } - if !reflect.DeepEqual(targets, expectTargets) { - t.Errorf("got targets %v for range with RaftStatus %v; want %v", - targets, sr.getRaftStatusFn(repl), expectTargets) - } + _, targets, _ := sr.chooseRangeToRebalance(ctx, rctx) + expectTargets := []roachpb.ReplicationTarget{ + {NodeID: 4, StoreID: 4}, {NodeID: 3, StoreID: 3}, {NodeID: 5, StoreID: 5}, + } + if !reflect.DeepEqual(targets, expectTargets) { + t.Errorf("got targets %v for range with RaftStatus %v; want %v", + targets, sr.getRaftStatusFn(repl), expectTargets) + } + }) } // TestStoreRebalancerReadAmpCheck checks that: @@ -1507,6 +1639,7 @@ func TestStoreRebalancerReadAmpCheck(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() + objectiveProvider := &testRebalanceObjectiveProvider{} type testCase struct { name string stores []*roachpb.StoreDescriptor @@ -1623,7 +1756,7 @@ func TestStoreRebalancerReadAmpCheck(t *testing.T) { } for i, test := range tests { - t.Run(fmt.Sprintf("%d_%s", i+1, test.name), func(t *testing.T) { + t.Run(fmt.Sprintf("%d_%s", i+1, test.name), withQPSCPU(t, objectiveProvider, func(t *testing.T) { stopper, g, sp, a, _ := allocatorimpl.CreateTestAllocator(ctx, 10, false /* deterministic */) defer stopper.Stop(ctx) @@ -1637,27 +1770,25 @@ func TestStoreRebalancerReadAmpCheck(t *testing.T) { rq := newReplicateQueue(s, a) rr := NewReplicaRankings() - sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr) + sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr, objectiveProvider) + lbRebalanceDimension := sr.RebalanceObjective().ToDimension() // Load in a range with replicas on an overfull node, a slightly underfull // node, and a very underfull node. - loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100}}, load.Queries) + loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}, lbRebalanceDimension) hottestRanges := sr.replicaRankings.TopLoad() - options := sr.scorerOptions(ctx) - rctx := sr.NewRebalanceContext( - ctx, options, hottestRanges, - LBRebalancingMode(LoadBasedRebalancingMode.Get(&sr.st.SV)), - ) + options := sr.scorerOptions(ctx, lbRebalanceDimension) + rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) require.Greater(t, len(rctx.hottestRanges), 0) rctx.options.StoreHealthOptions = allocatorimpl.StoreHealthOptions{ EnforcementLevel: test.enforcement, L0SublevelThreshold: allocatorimpl.MaxL0SublevelThreshold} - rctx.options.LoadThreshold = allocatorimpl.MakeQPSOnlyDim(0.05) + rctx.options.LoadThreshold = allocatorimpl.WithAllDims(0.05) _, targetVoters, _ := sr.chooseRangeToRebalance(ctx, rctx) require.Equal(t, test.expectedTargets, targetVoters) - }) + })) } } diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index 60514987fe21..adda471073fb 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -537,6 +537,7 @@ func (sc StoreCapacity) FractionUsed() float64 { func (sc StoreCapacity) Load() load.Load { dims := load.Vector{} dims[load.Queries] = sc.QueriesPerSecond + dims[load.CPU] = sc.CPUPerSecond return dims } diff --git a/pkg/roachpb/metadata.proto b/pkg/roachpb/metadata.proto index f92e704582e1..a3ab852f85f2 100644 --- a/pkg/roachpb/metadata.proto +++ b/pkg/roachpb/metadata.proto @@ -323,6 +323,10 @@ message StoreCapacity { // instances where overlapping node-binary versions within a cluster result // in this this field missing. optional int64 l0_sublevels = 12 [(gogoproto.nullable) = false]; + // cpu_per_second tracks the average store cpu use (ns) per second. + // This is the sum of all the replica's cpu time on this store, which is + // tracked in replica stats. + optional double cpu_per_second = 14 [(gogoproto.nullable) = false, (gogoproto.customname) = "CPUPerSecond"]; optional cockroach.util.admission.admissionpb.IOThreshold io_threshold = 13 [(gogoproto.nullable) = false, (gogoproto.customname) = "IOThreshold" ]; // bytes_per_replica and writes_per_replica contain percentiles for the // number of bytes and writes-per-second to each replica in the store.