Skip to content

Commit

Permalink
kvserver: [WIP] instrument cpu load based splits
Browse files Browse the repository at this point in the history
This commit instruments the ability to peform load based splitting with
replica cpu usage rather than queries per second. Load based splitting
now will use either cpu or qps for deciding split points, depending on
the cluster setting `kv.allocator.load_based_rebalancing_dimension`.

When set to `qps`, qps is used in deciding split points and when
splitting should occur; similarly, `store_cpu` means that request cpu
against a replicas keyspan is used to decide split points.

To revert to QPS splits when
`kv.allocator.load_based_rebalancing_dimension` is `store_cpu`,
`kv.unweighted_lb_split_finder.enabled` can be set to `true`. This will
disable using CPU splits. TODO(kvoli): reasoning on this.

The split threshold when using `store_cpu` is the cluster setting
`kv.range_split.load_cpu_threshold` which defaults to `250ms` of cpu time
per second, i.e. a replica using 1/4 processor of a machine consistently.

The merge queue uses the load based splitter in deciding whether to
merge due to low load. This commit also updates the merge queue to be
consistent with the load based splitter signal. When switching between
`qps` and `store_cpu`, the load based splitter for every replica is
reset to avoid spurious results.

resolves: cockroachdb#95377

Release note (ops change): Load based splitter now supports using request
cpu usage to split ranges. This is introduced with the previous cluster
setting `kv.allocator.load_based_rebalancing_dimension`, which when set
to `store_cpu`, will use request cpu usage. The threshold above which
CPU usage of a replica is considered for splitting is defined in the
cluster setting `kv.range_split.load_cpu_threshold`, which has a default
value of `250ms`.
  • Loading branch information
kvoli committed Jan 17, 2023
1 parent 354c4b1 commit d360af7
Show file tree
Hide file tree
Showing 15 changed files with 464 additions and 164 deletions.
3 changes: 2 additions & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ kv.closed_timestamp.follower_reads_enabled boolean true allow (all) replicas to
kv.log_range_and_node_events.enabled boolean true set to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog
kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconciling jobs with protected timestamp records
kv.range_split.by_load_enabled boolean true allow automatic splits of ranges based on where load is concentrated
kv.range_split.load_cpu_threshold duration 250ms the CPU use per second over which, the range becomes a candidate for load based splitting
kv.range_split.load_qps_threshold integer 2500 the QPS over which, the range becomes a candidate for load based splitting
kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled
kv.rangefeed.range_stuck_threshold duration 1m0s restart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.closed_timestamp.side_transport_interval takes precedence)
Expand Down Expand Up @@ -297,4 +298,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 1000022.2-28 set the active cluster version in the format '<major>.<minor>'
version version 1000022.2-30 set the active cluster version in the format '<major>.<minor>'
3 changes: 2 additions & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
<tr><td><div id="setting-kv-log-range-and-node-events-enabled" class="anchored"><code>kv.log_range_and_node_events.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog</td></tr>
<tr><td><div id="setting-kv-protectedts-reconciliation-interval" class="anchored"><code>kv.protectedts.reconciliation.interval</code></div></td><td>duration</td><td><code>5m0s</code></td><td>the frequency for reconciling jobs with protected timestamp records</td></tr>
<tr><td><div id="setting-kv-range-split-by-load-enabled" class="anchored"><code>kv.range_split.by_load_enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>allow automatic splits of ranges based on where load is concentrated</td></tr>
<tr><td><div id="setting-kv-range-split-load-cpu-threshold" class="anchored"><code>kv.range_split.load_cpu_threshold</code></div></td><td>duration</td><td><code>250ms</code></td><td>the CPU use per second over which, the range becomes a candidate for load based splitting</td></tr>
<tr><td><div id="setting-kv-range-split-load-qps-threshold" class="anchored"><code>kv.range_split.load_qps_threshold</code></div></td><td>integer</td><td><code>2500</code></td><td>the QPS over which, the range becomes a candidate for load based splitting</td></tr>
<tr><td><div id="setting-kv-rangefeed-enabled" class="anchored"><code>kv.rangefeed.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if set, rangefeed registration is enabled</td></tr>
<tr><td><div id="setting-kv-rangefeed-range-stuck-threshold" class="anchored"><code>kv.rangefeed.range_stuck_threshold</code></div></td><td>duration</td><td><code>1m0s</code></td><td>restart rangefeeds if they don&#39;t emit anything for the specified threshold; 0 disables (kv.closed_timestamp.side_transport_interval takes precedence)</td></tr>
Expand Down Expand Up @@ -237,6 +238,6 @@
<tr><td><div id="setting-trace-opentelemetry-collector" class="anchored"><code>trace.opentelemetry.collector</code></div></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 4317 will be used.</td></tr>
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000022.2-28</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000022.2-30</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
</tbody>
</table>
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_range_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,19 @@ func RangeStats(
) (result.Result, error) {
reply := resp.(*roachpb.RangeStatsResponse)
reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats()

if qps, ok := cArgs.EvalCtx.GetMaxSplitQPS(ctx); ok {
reply.MaxQueriesPerSecond = qps
} else {
// See comment on MaxQueriesPerSecond. -1 means !ok.
reply.MaxQueriesPerSecond = -1
}
if cpu, ok := cArgs.EvalCtx.GetMaxSplitCPU(ctx); ok {
reply.MaxCPUPerSecond = cpu
} else {
// See comment on MaxCPUPerSecond. -1 means !ok.
reply.MaxCPUPerSecond = -1
}
reply.RangeInfo = cArgs.EvalCtx.GetRangeInfo(ctx)
return result.Result{}, nil
}
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ type EvalContext interface {
// is disabled.
GetMaxSplitQPS(context.Context) (float64, bool)

// GetMaxSplitCPU returns the Replicas maximum request cpu/s rate over a
// configured retention period.
//
// NOTE: This should not be used when the load based splitting cluster setting
// is disabled.
GetMaxSplitCPU(context.Context) (float64, bool)

GetGCThreshold() hlc.Timestamp
ExcludeDataFromBackup() bool
GetLastReplicaGCTimestamp(context.Context) (hlc.Timestamp, error)
Expand Down Expand Up @@ -156,6 +163,7 @@ type MockEvalCtx struct {
Clock *hlc.Clock
Stats enginepb.MVCCStats
QPS float64
CPU float64
AbortSpan *abortspan.AbortSpan
GCThreshold hlc.Timestamp
Term, FirstIndex uint64
Expand Down Expand Up @@ -234,6 +242,9 @@ func (m *mockEvalCtxImpl) GetMVCCStats() enginepb.MVCCStats {
func (m *mockEvalCtxImpl) GetMaxSplitQPS(context.Context) (float64, bool) {
return m.QPS, true
}
func (m *mockEvalCtxImpl) GetMaxSplitCPU(context.Context) (float64, bool) {
return m.CPU, true
}
func (m *mockEvalCtxImpl) CanCreateTxnRecord(
context.Context, uuid.UUID, []byte, hlc.Timestamp,
) (bool, hlc.Timestamp, roachpb.TransactionAbortedReason) {
Expand Down
129 changes: 97 additions & 32 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4346,10 +4346,17 @@ func TestMergeQueue(t *testing.T) {
})

t.Run("load-based-merging", func(t *testing.T) {
const splitByLoadQPS = 10
const mergeByLoadQPS = splitByLoadQPS / 2 // see conservativeLoadBasedSplitThreshold
// NB: It is possible for the ranges being checked to record load
// during the test. To avoid flakiness, we set the splitByLoadStat high
// enough that any recorded load from testing won't exceed it.
const splitByLoadStat = 10e9
const mergeByLoadStat = splitByLoadStat / 2 // see conservativeLoadBasedSplitThreshold
const splitByLoadMergeDelay = 500 * time.Millisecond

setSplitDimension := func(dim kvserver.LBRebalancingDimension) {
kvserver.LoadBasedRebalancingDimension.Override(ctx, sv, int64(dim))
}

resetForLoadBasedSubtest := func(t *testing.T) {
reset(t)

Expand All @@ -4366,7 +4373,8 @@ func TestMergeQueue(t *testing.T) {
// meaning that it was a maximum measurement over some extended period of
// time.
kvserver.SplitByLoadEnabled.Override(ctx, sv, true)
kvserver.SplitByLoadQPSThreshold.Override(ctx, sv, splitByLoadQPS)
kvserver.SplitByLoadQPSThreshold.Override(ctx, sv, splitByLoadStat)
kvserver.SplitByLoadCPUThreshold.Override(ctx, sv, splitByLoadStat)

// Drop the load-based splitting merge delay setting, which also dictates
// the duration that a leaseholder must measure QPS before considering its
Expand All @@ -4381,47 +4389,104 @@ func TestMergeQueue(t *testing.T) {
rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().PhysicalTime())
manualClock.Increment(splitByLoadMergeDelay.Nanoseconds())
}
for _, splitDimension := range []kvserver.LBRebalancingDimension{
kvserver.LBRebalancingQueries,
kvserver.LBRebalancingStoreCPU,
} {
setSplitDimension(splitDimension)
t.Run(fmt.Sprintf("unreliable-lhs-%s", splitDimension.ToDimension().String()), func(t *testing.T) {
resetForLoadBasedSubtest(t)

t.Run("unreliable-lhs-qps", func(t *testing.T) {
resetForLoadBasedSubtest(t)
lhs().LoadBasedSplitter().Reset(tc.Servers[0].Clock().PhysicalTime())

lhs().LoadBasedSplitter().Reset(tc.Servers[0].Clock().PhysicalTime())
clearRange(t, lhsStartKey, rhsEndKey)
verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)
})

clearRange(t, lhsStartKey, rhsEndKey)
verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)
})
t.Run(fmt.Sprintf("unreliable-rhs-%s", splitDimension.ToDimension().String()), func(t *testing.T) {
resetForLoadBasedSubtest(t)

t.Run("unreliable-rhs-qps", func(t *testing.T) {
resetForLoadBasedSubtest(t)
rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().PhysicalTime())

rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().PhysicalTime())
clearRange(t, lhsStartKey, rhsEndKey)
verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)
})

clearRange(t, lhsStartKey, rhsEndKey)
verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)
})
t.Run(fmt.Sprintf("combined-%s-above-threshold", splitDimension.ToDimension().String()), func(t *testing.T) {
resetForLoadBasedSubtest(t)

moreThanHalfStat := mergeByLoadStat/2 + 1
rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), moreThanHalfStat)
lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), moreThanHalfStat)

t.Run("combined-qps-above-threshold", func(t *testing.T) {
resetForLoadBasedSubtest(t)
clearRange(t, lhsStartKey, rhsEndKey)
verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)
})

moreThanHalfQPS := mergeByLoadQPS/2 + 1
rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), float64(moreThanHalfQPS))
lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), float64(moreThanHalfQPS))
t.Run(fmt.Sprintf("combined-%s-below-threshold", splitDimension.ToDimension().String()), func(t *testing.T) {
resetForLoadBasedSubtest(t)

clearRange(t, lhsStartKey, rhsEndKey)
verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)
})
manualClock.Increment(splitByLoadMergeDelay.Nanoseconds())
lessThanHalfStat := mergeByLoadStat/2 - 1
rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), lessThanHalfStat)
lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), lessThanHalfStat)

t.Run("combined-qps-below-threshold", func(t *testing.T) {
resetForLoadBasedSubtest(t)
clearRange(t, lhsStartKey, rhsEndKey)
verifyMergedSoon(t, store, lhsStartKey, rhsStartKey)
})

manualClock.Increment(splitByLoadMergeDelay.Nanoseconds())
lessThanHalfQPS := mergeByLoadQPS/2 - 1
rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), float64(lessThanHalfQPS))
lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), float64(lessThanHalfQPS))
// These nested tests assert that after changing the split
// dimension, any previous load is discarded and the range will not
// merge, even if the previous load was above or below the
// threshold.
for _, secondSplitDimension := range []kvserver.LBRebalancingDimension{
kvserver.LBRebalancingQueries,
kvserver.LBRebalancingStoreCPU,
} {
if splitDimension == secondSplitDimension {
// Nothing to do when there is no change. We expect the
// same outcome as the above tests.
continue
}
t.Run(fmt.Sprintf("switch-%s-to-%s-prev-combined-above-threshold",
splitDimension.ToDimension().String(),
secondSplitDimension.ToDimension().String(),
), func(t *testing.T) {
// Set the split dimension again, since we have modified it
// at the bottom of this loop.
setSplitDimension(splitDimension)
resetForLoadBasedSubtest(t)

moreThanHalfStat := mergeByLoadStat/2 + 1
rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), moreThanHalfStat)
lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), moreThanHalfStat)

clearRange(t, lhsStartKey, rhsEndKey)
// Switch the dimension, so that any recorded load should
// be discarded and despite being above the threshold (for
// both dimensions), it shouldn't merge.
setSplitDimension(secondSplitDimension)
verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)
})

clearRange(t, lhsStartKey, rhsEndKey)
verifyMergedSoon(t, store, lhsStartKey, rhsStartKey)
})
t.Run(fmt.Sprintf("switch-%s-to-%s-prev-combined-below-threshold",
splitDimension.ToDimension().String(),
secondSplitDimension.ToDimension().String(),
), func(t *testing.T) {
setSplitDimension(splitDimension)
resetForLoadBasedSubtest(t)

manualClock.Increment(splitByLoadMergeDelay.Nanoseconds())
lessThanHalfStat := mergeByLoadStat/2 - 1
rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), lessThanHalfStat)
lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), lessThanHalfStat)

clearRange(t, lhsStartKey, rhsEndKey)
setSplitDimension(secondSplitDimension)
verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)
})
}
}
})

t.Run("sticky-bit", func(t *testing.T) {
Expand Down
Loading

0 comments on commit d360af7

Please sign in to comment.