Skip to content

Commit

Permalink
kvserver: add store cpu rebalancing
Browse files Browse the repository at this point in the history
This patch allows the store rebalancer to use store cpu time as opposed
to QPS when balancing the cluster. This patch adds `store_cpu` as an
option with the existing, now public cluster setting:

`kv.allocator.load_based_rebalancing_dimension`

When set to `store_cpu`, rather than `qps`. The store rebalancer will perform
a mostly identical function, however target balancing the sum of all
replica's cpu time on each store, rather than qps.

Similar to QPS, the rebalance threshold can be set to allow controlling
the aggressiveness of balancing:

`kv.allocator.store_cpu_rebalance_threshold`: 0.1

resolves: cockroachdb#95380

Release note (ops change):
Add option to balance store cpu time instead of queries per second (qps)
by setting `kv.allocator.load_based_rebalancing_dimension='store_cpu'`.
`kv.allocator.store_cpu_rebalance_threshold` is also added, similar
to `kv.allocator.qps_rebalance_threshold` to control the target range
for store cpu above and below the cluster mean.
  • Loading branch information
kvoli committed Jan 23, 2023
1 parent 43e46f3 commit 8be73e0
Show file tree
Hide file tree
Showing 14 changed files with 391 additions and 135 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -297,4 +297,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 1000022.2-30 set the active cluster version in the format '<major>.<minor>'
version version 1000022.2-32 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,6 @@
<tr><td><div id="setting-trace-opentelemetry-collector" class="anchored"><code>trace.opentelemetry.collector</code></div></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 4317 will be used.</td></tr>
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000022.2-30</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000022.2-32</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
</tbody>
</table>
8 changes: 8 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,10 @@ const (
// V23_1AlterSystemSQLInstancesAddSqlAddr adds a sql_addr column to the
// system.sql_instances table.
V23_1AlterSystemSQLInstancesAddSQLAddr
// V23_1AllocatorCPUBalancing adds balancing CPU usage among stores using
// the allocator and store rebalancer. It assumes that at this version,
// stores now include their CPU in the StoreCapacity proto when gossiping.
V23_1AllocatorCPUBalancing

// V23_1_ChangefeedExpressionProductionReady marks changefeed expressions (transformation)
// as production ready. This gate functions as a signal to attempt to upgrade
Expand Down Expand Up @@ -687,6 +691,10 @@ var rawVersionsSingleton = keyedVersions{
Key: V23_1_ChangefeedExpressionProductionReady,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 30},
},
{
Key: V23_1AllocatorCPUBalancing,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 32},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/threshold.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ func getLoadThreshold(dim load.Dimension, sv *settings.Values) float64 {
switch dim {
case load.Queries:
return allocator.QPSRebalanceThreshold.Get(sv)
case load.StoreCPU:
return allocator.StoreCPURebalanceThreshold.Get(sv)
default:
panic(errors.AssertionFailedf("Unkown load dimension %d", dim))
}
Expand All @@ -51,6 +53,8 @@ func getLoadMinThreshold(dim load.Dimension) float64 {
switch dim {
case load.Queries:
return allocator.MinQPSThresholdDifference
case load.StoreCPU:
return allocator.MinStoreCPUThresholdDifference
default:
panic(errors.AssertionFailedf("Unkown load dimension %d", dim))
}
Expand All @@ -76,6 +80,8 @@ func getLoadRebalanceMinRequiredDiff(dim load.Dimension, sv *settings.Values) fl
switch dim {
case load.Queries:
return allocator.MinQPSDifferenceForTransfers.Get(sv)
case load.StoreCPU:
return allocator.MinStoreCPUDifferenceForTransfers
default:
panic(errors.AssertionFailedf("Unkown load dimension %d", dim))
}
Expand Down Expand Up @@ -117,3 +123,13 @@ func MakeQPSOnlyDim(v float64) load.Load {
dims[load.Queries] = v
return dims
}

// SetAllDims returns a load vector with all dimensions filled in with the
// value given.
func SetAllDims(v float64) load.Load {
dims := load.Vector{}
for i := range dims {
dims[i] = v
}
return dims
}
45 changes: 45 additions & 0 deletions pkg/kv/kvserver/allocator/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,30 @@ const (
// lightly loaded clusters.
MinQPSThresholdDifference = 100

// MinStoreCPUThresholdDifference is the minimum CPU difference from the cluster
// mean that this system should care about. The system won't attempt to
// take action if a store's CPU differs from the mean by less than this
// amount even if it is greater than the percentage threshold. This
// prevents too many lease transfers or range rebalances in lightly loaded
// clusters.
//
// NB: This represents 5% (1/20) utilization of 1 cpu on average. This
// number was arrived at from testing to minimize thrashing. This number is
// set independent of processor speed and assumes identical value of cpu
// time across all stores. i.e. all cpu's are identical.
MinStoreCPUThresholdDifference = float64(50 * time.Millisecond)

// MinStoreCPUDifferenceForTransfers is the minimum CPU difference that a
// store rebalncer would care about to reconcile (via lease or replica
// rebalancing) between any two stores.
//
// NB: This is set to be two times the minimum threshold that a store needs
// to be above or below the mean to be considered overfull or underfull
// respectively. This is to make lease transfers and replica rebalances
// less sensistive to jitters in any given workload by introducing
// additional friction before taking these actions.
MinStoreCPUDifferenceForTransfers = 2 * MinStoreCPUThresholdDifference

// defaultLoadBasedRebalancingInterval is how frequently to check the store-level
// balance of the cluster.
defaultLoadBasedRebalancingInterval = time.Minute
Expand Down Expand Up @@ -107,6 +131,27 @@ var QPSRebalanceThreshold = func() *settings.FloatSetting {
return s
}()

// StoreCPURebalanceThreshold is the minimum ratio of a store's cpu time to the mean
// cpu time at which that store is considered overfull or underfull of cpu
// usage.
var StoreCPURebalanceThreshold = func() *settings.FloatSetting {
s := settings.RegisterFloatSetting(
settings.SystemOnly,
"kv.allocator.store_cpu_rebalance_threshold",
"minimum fraction away from the mean a store's cpu usage can be before it is considered overfull or underfull",
0.10,
settings.NonNegativeFloat,
func(f float64) error {
if f < 0.01 {
return errors.Errorf("cannot set kv.allocator.store_cpu_rebalance_threshold to less than 0.01")
}
return nil
},
)
s.SetVisibility(settings.Public)
return s
}()

// LoadBasedRebalanceInterval controls how frequently each store checks for
// load-base lease/replica rebalancing opportunties.
var LoadBasedRebalanceInterval = settings.RegisterPublicDurationSettingWithExplicitUnit(
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/allocator/load/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load",
visibility = ["//visibility:public"],
deps = ["//pkg/util/humanizeutil"],
)

get_x_data(name = "get_x_data")
13 changes: 12 additions & 1 deletion pkg/kv/kvserver/allocator/load/dimension.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,21 @@

package load

import "fmt"
import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
)

// Dimension is a singe dimension of load that a component may track.
type Dimension int

const (
// Queries refers to the number of queries.
Queries Dimension = iota
// StoreCPU refers to the sum of replica cpu time on a store.
StoreCPU

nDimensionsTyped
nDimensions = int(nDimensionsTyped)
Expand All @@ -28,6 +35,8 @@ func (d Dimension) String() string {
switch d {
case Queries:
return "queries-per-second"
case StoreCPU:
return "store-cpu-per-second"
default:
panic(fmt.Sprintf("cannot name: unknown dimension with ordinal %d", d))
}
Expand All @@ -38,6 +47,8 @@ func (d Dimension) Format(value float64) string {
switch d {
case Queries:
return fmt.Sprintf("%.1f", value)
case StoreCPU:
return string(humanizeutil.Duration(time.Duration(int64(value))))
default:
panic(fmt.Sprintf("cannot format value: unknown dimension with ordinal %d", d))
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/allocator/range_usage_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type RangeRequestLocalityInfo struct {
func (r RangeUsageInfo) Load() load.Load {
dims := load.Vector{}
dims[load.Queries] = r.QueriesPerSecond
dims[load.StoreCPU] = r.RequestCPUNanosPerSecond + r.RaftCPUNanosPerSecond
return dims
}

Expand All @@ -47,5 +48,12 @@ func (r RangeUsageInfo) Load() load.Load {
func (r RangeUsageInfo) TransferImpact() load.Load {
dims := load.Vector{}
dims[load.Queries] = r.QueriesPerSecond
// Only use the request recorded cpu. This assumes that all replicas will
// use the same amount of raft cpu - which may be dubious.
//
// TODO(kvoli):
// Investigate whether this is a valid assumption and look to separate out
// leaseholder vs replica cpu usage in accounting.
dims[load.StoreCPU] = r.RequestCPUNanosPerSecond
return dims
}
37 changes: 35 additions & 2 deletions pkg/kv/kvserver/allocator/storepool/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,11 +577,14 @@ func (sp *StorePool) UpdateLocalStoreAfterRebalance(
// network). We can't update the local store at this time.
return
}
// Only apply the raft cpu delta on rebalance. This estimate assumes that
// the raft cpu usage is approximately equal across replicas for a range.
switch changeType {
case roachpb.ADD_VOTER, roachpb.ADD_NON_VOTER:
detail.Desc.Capacity.RangeCount++
detail.Desc.Capacity.LogicalBytes += rangeUsageInfo.LogicalBytes
detail.Desc.Capacity.WritesPerSecond += rangeUsageInfo.WritesPerSecond
detail.Desc.Capacity.StoreCPUPerSecond += rangeUsageInfo.RaftCPUNanosPerSecond
case roachpb.REMOVE_VOTER, roachpb.REMOVE_NON_VOTER:
detail.Desc.Capacity.RangeCount--
if detail.Desc.Capacity.LogicalBytes <= rangeUsageInfo.LogicalBytes {
Expand All @@ -594,6 +597,11 @@ func (sp *StorePool) UpdateLocalStoreAfterRebalance(
} else {
detail.Desc.Capacity.WritesPerSecond -= rangeUsageInfo.WritesPerSecond
}
if detail.Desc.Capacity.StoreCPUPerSecond <= rangeUsageInfo.RaftCPUNanosPerSecond {
detail.Desc.Capacity.StoreCPUPerSecond = 0
} else {
detail.Desc.Capacity.StoreCPUPerSecond -= rangeUsageInfo.RaftCPUNanosPerSecond
}
default:
return
}
Expand Down Expand Up @@ -622,17 +630,23 @@ func (sp *StorePool) UpdateLocalStoreAfterRelocate(
sp.DetailsMu.Lock()
defer sp.DetailsMu.Unlock()

// Only apply the raft cpu delta on rebalance. This estimate assumes
// that the raft cpu usage is approximately equal across replicas for a
// range.
// TODO(kvoli): Validate this assumption or remove the estimated impact.
updateTargets := func(targets []roachpb.ReplicationTarget) {
for _, target := range targets {
if toDetail := sp.GetStoreDetailLocked(target.StoreID); toDetail != nil {
toDetail.Desc.Capacity.RangeCount++
toDetail.Desc.Capacity.StoreCPUPerSecond += rangeUsageInfo.RaftCPUNanosPerSecond
}
}
}
updatePrevious := func(previous []roachpb.ReplicaDescriptor) {
for _, old := range previous {
if toDetail := sp.GetStoreDetailLocked(old.StoreID); toDetail != nil {
toDetail.Desc.Capacity.RangeCount--
toDetail.Desc.Capacity.StoreCPUPerSecond -= rangeUsageInfo.RaftCPUNanosPerSecond
}
}
}
Expand All @@ -659,13 +673,24 @@ func (sp *StorePool) UpdateLocalStoresAfterLeaseTransfer(
} else {
fromDetail.Desc.Capacity.QueriesPerSecond -= rangeUsageInfo.QueriesPerSecond
}
// Only apply the request cpu (leaseholder + follower-reads) delta on
// transfers. Note this does not correctly account for follower reads
// remaining on the prior leaseholder after lease transfer. Instead,
// only a cpu delta specific to the lease should be applied.
if fromDetail.Desc.Capacity.StoreCPUPerSecond <= rangeUsageInfo.RequestCPUNanosPerSecond {
fromDetail.Desc.Capacity.StoreCPUPerSecond = 0
} else {
fromDetail.Desc.Capacity.StoreCPUPerSecond -= rangeUsageInfo.RequestCPUNanosPerSecond
}

sp.DetailsMu.StoreDetails[from] = &fromDetail
}

toDetail := *sp.GetStoreDetailLocked(to)
if toDetail.Desc != nil {
toDetail.Desc.Capacity.LeaseCount++
toDetail.Desc.Capacity.QueriesPerSecond += rangeUsageInfo.QueriesPerSecond
toDetail.Desc.Capacity.StoreCPUPerSecond += rangeUsageInfo.RequestCPUNanosPerSecond
sp.DetailsMu.StoreDetails[to] = &toDetail
}
}
Expand Down Expand Up @@ -935,6 +960,10 @@ type StoreList struct {
// to be rebalance targets.
candidateLogicalBytes Stat

// CandidateStoreCPU tracks store-cpu-per-second stats for Stores that are
// eligible to be rebalance targets.
CandidateStoreCPU Stat

// CandidateQueriesPerSecond tracks queries-per-second stats for Stores that
// are eligible to be rebalance targets.
CandidateQueriesPerSecond Stat
Expand All @@ -961,29 +990,32 @@ func MakeStoreList(descriptors []roachpb.StoreDescriptor) StoreList {
sl.CandidateQueriesPerSecond.update(desc.Capacity.QueriesPerSecond)
sl.candidateWritesPerSecond.update(desc.Capacity.WritesPerSecond)
sl.CandidateL0Sublevels.update(float64(desc.Capacity.L0Sublevels))
sl.CandidateStoreCPU.update(desc.Capacity.StoreCPUPerSecond)
}
return sl
}

func (sl StoreList) String() string {
var buf bytes.Buffer
fmt.Fprintf(&buf,
" candidate: avg-ranges=%v avg-leases=%v avg-disk-usage=%v avg-queries-per-second=%v",
" candidate: avg-ranges=%v avg-leases=%v avg-disk-usage=%v avg-queries-per-second=%v avg-store-cpu-per-second=%v",
sl.CandidateRanges.Mean,
sl.CandidateLeases.Mean,
humanizeutil.IBytes(int64(sl.candidateLogicalBytes.Mean)),
sl.CandidateQueriesPerSecond.Mean,
humanizeutil.Duration(time.Duration(int64(sl.CandidateStoreCPU.Mean))),
)
if len(sl.Stores) > 0 {
fmt.Fprintf(&buf, "\n")
} else {
fmt.Fprintf(&buf, " <no candidates>")
}
for _, desc := range sl.Stores {
fmt.Fprintf(&buf, " %d: ranges=%d leases=%d disk-usage=%s queries-per-second=%.2f l0-sublevels=%d\n",
fmt.Fprintf(&buf, " %d: ranges=%d leases=%d disk-usage=%s queries-per-second=%.2f store-cpu-per-second=%s l0-sublevels=%d\n",
desc.StoreID, desc.Capacity.RangeCount,
desc.Capacity.LeaseCount, humanizeutil.IBytes(desc.Capacity.LogicalBytes),
desc.Capacity.QueriesPerSecond,
humanizeutil.Duration(time.Duration(int64(desc.Capacity.StoreCPUPerSecond))),
desc.Capacity.L0Sublevels,
)
}
Expand All @@ -1010,6 +1042,7 @@ func (sl StoreList) ExcludeInvalid(constraints []roachpb.ConstraintsConjunction)
func (sl StoreList) LoadMeans() load.Load {
dims := load.Vector{}
dims[load.Queries] = sl.CandidateQueriesPerSecond.Mean
dims[load.StoreCPU] = sl.CandidateStoreCPU.Mean
return dims
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2548,11 +2548,11 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa
var l0SublevelsMax int64
var totalQueriesPerSecond float64
var totalWritesPerSecond float64
var totalStoreCPUTimePerSecond float64
replicaCount := s.metrics.ReplicaCount.Value()
bytesPerReplica := make([]float64, 0, replicaCount)
writesPerReplica := make([]float64, 0, replicaCount)
rankingsAccumulator := NewReplicaAccumulator(
LBRebalancingDimension(LoadBasedRebalancingDimension.Get(&s.ClusterSettings().SV)).ToDimension())
rankingsAccumulator := NewReplicaAccumulator(LoadBasedRebalancingObjective(ctx, s.ClusterSettings()).ToDimension())
rankingsByTenantAccumulator := NewTenantReplicaAccumulator()

// Query the current L0 sublevels and record the updated maximum to metrics.
Expand All @@ -2570,6 +2570,7 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa
// starts? We can't easily have a countdown as its value changes like for
// leases/replicas.
// TODO(a-robinson): Calculate percentiles for qps? Get rid of other percentiles?
totalStoreCPUTimePerSecond += usage.RequestCPUNanosPerSecond + usage.RaftCPUNanosPerSecond
totalQueriesPerSecond += usage.QueriesPerSecond
totalWritesPerSecond += usage.WritesPerSecond
writesPerReplica = append(writesPerReplica, usage.WritesPerSecond)
Expand All @@ -2584,6 +2585,7 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa
capacity.RangeCount = rangeCount
capacity.LeaseCount = leaseCount
capacity.LogicalBytes = logicalBytes
capacity.StoreCPUPerSecond = totalStoreCPUTimePerSecond
capacity.QueriesPerSecond = totalQueriesPerSecond
capacity.WritesPerSecond = totalWritesPerSecond
capacity.L0Sublevels = l0SublevelsMax
Expand Down
Loading

0 comments on commit 8be73e0

Please sign in to comment.