From d360af72ffad713cfe702ba678c5e2fac3b1aced Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Thu, 12 Jan 2023 00:06:38 +0000 Subject: [PATCH] kvserver: [WIP] instrument cpu load based splits This commit instruments the ability to peform load based splitting with replica cpu usage rather than queries per second. Load based splitting now will use either cpu or qps for deciding split points, depending on the cluster setting `kv.allocator.load_based_rebalancing_dimension`. When set to `qps`, qps is used in deciding split points and when splitting should occur; similarly, `store_cpu` means that request cpu against a replicas keyspan is used to decide split points. To revert to QPS splits when `kv.allocator.load_based_rebalancing_dimension` is `store_cpu`, `kv.unweighted_lb_split_finder.enabled` can be set to `true`. This will disable using CPU splits. TODO(kvoli): reasoning on this. The split threshold when using `store_cpu` is the cluster setting `kv.range_split.load_cpu_threshold` which defaults to `250ms` of cpu time per second, i.e. a replica using 1/4 processor of a machine consistently. The merge queue uses the load based splitter in deciding whether to merge due to low load. This commit also updates the merge queue to be consistent with the load based splitter signal. When switching between `qps` and `store_cpu`, the load based splitter for every replica is reset to avoid spurious results. resolves: #95377 Release note (ops change): Load based splitter now supports using request cpu usage to split ranges. This is introduced with the previous cluster setting `kv.allocator.load_based_rebalancing_dimension`, which when set to `store_cpu`, will use request cpu usage. The threshold above which CPU usage of a replica is considered for splitting is defined in the cluster setting `kv.range_split.load_cpu_threshold`, which has a default value of `250ms`. --- .../settings/settings-for-tenants.txt | 3 +- docs/generated/settings/settings.html | 3 +- pkg/kv/kvserver/batcheval/cmd_range_stats.go | 7 + pkg/kv/kvserver/batcheval/eval_context.go | 11 ++ pkg/kv/kvserver/client_merge_test.go | 129 +++++++++++---- pkg/kv/kvserver/merge_queue.go | 98 +++++++---- pkg/kv/kvserver/replica.go | 23 ++- pkg/kv/kvserver/replica_eval_context_span.go | 6 + pkg/kv/kvserver/replica_init.go | 22 ++- pkg/kv/kvserver/replica_send.go | 30 ++-- pkg/kv/kvserver/replica_split_load.go | 153 +++++++++++++++++- pkg/kv/kvserver/split/BUILD.bazel | 1 - pkg/kv/kvserver/split/decider.go | 122 +++++++------- pkg/kv/kvserver/split_queue.go | 12 +- pkg/roachpb/api.proto | 8 + 15 files changed, 464 insertions(+), 164 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 8925c203da55..e41d1d4218a6 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -42,6 +42,7 @@ kv.closed_timestamp.follower_reads_enabled boolean true allow (all) replicas to kv.log_range_and_node_events.enabled boolean true set to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconciling jobs with protected timestamp records kv.range_split.by_load_enabled boolean true allow automatic splits of ranges based on where load is concentrated +kv.range_split.load_cpu_threshold duration 250ms the CPU use per second over which, the range becomes a candidate for load based splitting kv.range_split.load_qps_threshold integer 2500 the QPS over which, the range becomes a candidate for load based splitting kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled kv.rangefeed.range_stuck_threshold duration 1m0s restart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.closed_timestamp.side_transport_interval takes precedence) @@ -297,4 +298,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 1000022.2-28 set the active cluster version in the format '.' +version version 1000022.2-30 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index d60ce0df78e8..242aa5dcd4fe 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -56,6 +56,7 @@
kv.log_range_and_node_events.enabled
booleantrueset to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog
kv.protectedts.reconciliation.interval
duration5m0sthe frequency for reconciling jobs with protected timestamp records
kv.range_split.by_load_enabled
booleantrueallow automatic splits of ranges based on where load is concentrated +
kv.range_split.load_cpu_threshold
duration250msthe CPU use per second over which, the range becomes a candidate for load based splitting
kv.range_split.load_qps_threshold
integer2500the QPS over which, the range becomes a candidate for load based splitting
kv.rangefeed.enabled
booleanfalseif set, rangefeed registration is enabled
kv.rangefeed.range_stuck_threshold
duration1m0srestart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.closed_timestamp.side_transport_interval takes precedence) @@ -237,6 +238,6 @@
trace.opentelemetry.collector
stringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled
booleantrueif set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector
stringthe address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used. -
version
version1000022.2-28set the active cluster version in the format '<major>.<minor>' +
version
version1000022.2-30set the active cluster version in the format '<major>.<minor>' diff --git a/pkg/kv/kvserver/batcheval/cmd_range_stats.go b/pkg/kv/kvserver/batcheval/cmd_range_stats.go index 85277d6ee947..bc8a42b40c51 100644 --- a/pkg/kv/kvserver/batcheval/cmd_range_stats.go +++ b/pkg/kv/kvserver/batcheval/cmd_range_stats.go @@ -44,12 +44,19 @@ func RangeStats( ) (result.Result, error) { reply := resp.(*roachpb.RangeStatsResponse) reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats() + if qps, ok := cArgs.EvalCtx.GetMaxSplitQPS(ctx); ok { reply.MaxQueriesPerSecond = qps } else { // See comment on MaxQueriesPerSecond. -1 means !ok. reply.MaxQueriesPerSecond = -1 } + if cpu, ok := cArgs.EvalCtx.GetMaxSplitCPU(ctx); ok { + reply.MaxCPUPerSecond = cpu + } else { + // See comment on MaxCPUPerSecond. -1 means !ok. + reply.MaxCPUPerSecond = -1 + } reply.RangeInfo = cArgs.EvalCtx.GetRangeInfo(ctx) return result.Result{}, nil } diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index 6f4aeb7362b5..7829a9d56b5a 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -89,6 +89,13 @@ type EvalContext interface { // is disabled. GetMaxSplitQPS(context.Context) (float64, bool) + // GetMaxSplitCPU returns the Replicas maximum request cpu/s rate over a + // configured retention period. + // + // NOTE: This should not be used when the load based splitting cluster setting + // is disabled. + GetMaxSplitCPU(context.Context) (float64, bool) + GetGCThreshold() hlc.Timestamp ExcludeDataFromBackup() bool GetLastReplicaGCTimestamp(context.Context) (hlc.Timestamp, error) @@ -156,6 +163,7 @@ type MockEvalCtx struct { Clock *hlc.Clock Stats enginepb.MVCCStats QPS float64 + CPU float64 AbortSpan *abortspan.AbortSpan GCThreshold hlc.Timestamp Term, FirstIndex uint64 @@ -234,6 +242,9 @@ func (m *mockEvalCtxImpl) GetMVCCStats() enginepb.MVCCStats { func (m *mockEvalCtxImpl) GetMaxSplitQPS(context.Context) (float64, bool) { return m.QPS, true } +func (m *mockEvalCtxImpl) GetMaxSplitCPU(context.Context) (float64, bool) { + return m.CPU, true +} func (m *mockEvalCtxImpl) CanCreateTxnRecord( context.Context, uuid.UUID, []byte, hlc.Timestamp, ) (bool, hlc.Timestamp, roachpb.TransactionAbortedReason) { diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index 311f452399ca..7ebd0e02cabc 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -4346,10 +4346,17 @@ func TestMergeQueue(t *testing.T) { }) t.Run("load-based-merging", func(t *testing.T) { - const splitByLoadQPS = 10 - const mergeByLoadQPS = splitByLoadQPS / 2 // see conservativeLoadBasedSplitThreshold + // NB: It is possible for the ranges being checked to record load + // during the test. To avoid flakiness, we set the splitByLoadStat high + // enough that any recorded load from testing won't exceed it. + const splitByLoadStat = 10e9 + const mergeByLoadStat = splitByLoadStat / 2 // see conservativeLoadBasedSplitThreshold const splitByLoadMergeDelay = 500 * time.Millisecond + setSplitDimension := func(dim kvserver.LBRebalancingDimension) { + kvserver.LoadBasedRebalancingDimension.Override(ctx, sv, int64(dim)) + } + resetForLoadBasedSubtest := func(t *testing.T) { reset(t) @@ -4366,7 +4373,8 @@ func TestMergeQueue(t *testing.T) { // meaning that it was a maximum measurement over some extended period of // time. kvserver.SplitByLoadEnabled.Override(ctx, sv, true) - kvserver.SplitByLoadQPSThreshold.Override(ctx, sv, splitByLoadQPS) + kvserver.SplitByLoadQPSThreshold.Override(ctx, sv, splitByLoadStat) + kvserver.SplitByLoadCPUThreshold.Override(ctx, sv, splitByLoadStat) // Drop the load-based splitting merge delay setting, which also dictates // the duration that a leaseholder must measure QPS before considering its @@ -4381,47 +4389,104 @@ func TestMergeQueue(t *testing.T) { rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().PhysicalTime()) manualClock.Increment(splitByLoadMergeDelay.Nanoseconds()) } + for _, splitDimension := range []kvserver.LBRebalancingDimension{ + kvserver.LBRebalancingQueries, + kvserver.LBRebalancingStoreCPU, + } { + setSplitDimension(splitDimension) + t.Run(fmt.Sprintf("unreliable-lhs-%s", splitDimension.ToDimension().String()), func(t *testing.T) { + resetForLoadBasedSubtest(t) - t.Run("unreliable-lhs-qps", func(t *testing.T) { - resetForLoadBasedSubtest(t) + lhs().LoadBasedSplitter().Reset(tc.Servers[0].Clock().PhysicalTime()) - lhs().LoadBasedSplitter().Reset(tc.Servers[0].Clock().PhysicalTime()) + clearRange(t, lhsStartKey, rhsEndKey) + verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) + }) - clearRange(t, lhsStartKey, rhsEndKey) - verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) - }) + t.Run(fmt.Sprintf("unreliable-rhs-%s", splitDimension.ToDimension().String()), func(t *testing.T) { + resetForLoadBasedSubtest(t) - t.Run("unreliable-rhs-qps", func(t *testing.T) { - resetForLoadBasedSubtest(t) + rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().PhysicalTime()) - rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().PhysicalTime()) + clearRange(t, lhsStartKey, rhsEndKey) + verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) + }) - clearRange(t, lhsStartKey, rhsEndKey) - verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) - }) + t.Run(fmt.Sprintf("combined-%s-above-threshold", splitDimension.ToDimension().String()), func(t *testing.T) { + resetForLoadBasedSubtest(t) + + moreThanHalfStat := mergeByLoadStat/2 + 1 + rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), moreThanHalfStat) + lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), moreThanHalfStat) - t.Run("combined-qps-above-threshold", func(t *testing.T) { - resetForLoadBasedSubtest(t) + clearRange(t, lhsStartKey, rhsEndKey) + verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) + }) - moreThanHalfQPS := mergeByLoadQPS/2 + 1 - rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), float64(moreThanHalfQPS)) - lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), float64(moreThanHalfQPS)) + t.Run(fmt.Sprintf("combined-%s-below-threshold", splitDimension.ToDimension().String()), func(t *testing.T) { + resetForLoadBasedSubtest(t) - clearRange(t, lhsStartKey, rhsEndKey) - verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) - }) + manualClock.Increment(splitByLoadMergeDelay.Nanoseconds()) + lessThanHalfStat := mergeByLoadStat/2 - 1 + rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), lessThanHalfStat) + lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), lessThanHalfStat) - t.Run("combined-qps-below-threshold", func(t *testing.T) { - resetForLoadBasedSubtest(t) + clearRange(t, lhsStartKey, rhsEndKey) + verifyMergedSoon(t, store, lhsStartKey, rhsStartKey) + }) - manualClock.Increment(splitByLoadMergeDelay.Nanoseconds()) - lessThanHalfQPS := mergeByLoadQPS/2 - 1 - rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), float64(lessThanHalfQPS)) - lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), float64(lessThanHalfQPS)) + // These nested tests assert that after changing the split + // dimension, any previous load is discarded and the range will not + // merge, even if the previous load was above or below the + // threshold. + for _, secondSplitDimension := range []kvserver.LBRebalancingDimension{ + kvserver.LBRebalancingQueries, + kvserver.LBRebalancingStoreCPU, + } { + if splitDimension == secondSplitDimension { + // Nothing to do when there is no change. We expect the + // same outcome as the above tests. + continue + } + t.Run(fmt.Sprintf("switch-%s-to-%s-prev-combined-above-threshold", + splitDimension.ToDimension().String(), + secondSplitDimension.ToDimension().String(), + ), func(t *testing.T) { + // Set the split dimension again, since we have modified it + // at the bottom of this loop. + setSplitDimension(splitDimension) + resetForLoadBasedSubtest(t) + + moreThanHalfStat := mergeByLoadStat/2 + 1 + rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), moreThanHalfStat) + lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), moreThanHalfStat) + + clearRange(t, lhsStartKey, rhsEndKey) + // Switch the dimension, so that any recorded load should + // be discarded and despite being above the threshold (for + // both dimensions), it shouldn't merge. + setSplitDimension(secondSplitDimension) + verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) + }) - clearRange(t, lhsStartKey, rhsEndKey) - verifyMergedSoon(t, store, lhsStartKey, rhsStartKey) - }) + t.Run(fmt.Sprintf("switch-%s-to-%s-prev-combined-below-threshold", + splitDimension.ToDimension().String(), + secondSplitDimension.ToDimension().String(), + ), func(t *testing.T) { + setSplitDimension(splitDimension) + resetForLoadBasedSubtest(t) + + manualClock.Increment(splitByLoadMergeDelay.Nanoseconds()) + lessThanHalfStat := mergeByLoadStat/2 - 1 + rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), lessThanHalfStat) + lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), lessThanHalfStat) + + clearRange(t, lhsStartKey, rhsEndKey) + setSplitDimension(secondSplitDimension) + verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) + }) + } + } }) t.Run("sticky-bit", func(t *testing.T) { diff --git a/pkg/kv/kvserver/merge_queue.go b/pkg/kv/kvserver/merge_queue.go index 4ab4cc0f196b..0b0b76d50e51 100644 --- a/pkg/kv/kvserver/merge_queue.go +++ b/pkg/kv/kvserver/merge_queue.go @@ -184,7 +184,7 @@ var _ PurgatoryError = rangeMergePurgatoryError{} func (mq *mergeQueue) requestRangeStats( ctx context.Context, key roachpb.Key, -) (desc *roachpb.RangeDescriptor, stats enginepb.MVCCStats, qps float64, qpsOK bool, err error) { +) (desc *roachpb.RangeDescriptor, stats enginepb.MVCCStats, lss loadSplitStats, err error) { ba := &roachpb.BatchRequest{} ba.Add(&roachpb.RangeStatsRequest{ @@ -193,15 +193,31 @@ func (mq *mergeQueue) requestRangeStats( br, pErr := mq.db.NonTransactionalSender().Send(ctx, ba) if pErr != nil { - return nil, enginepb.MVCCStats{}, 0, false, pErr.GoError() + return nil, enginepb.MVCCStats{}, loadSplitStats{}, pErr.GoError() } res := br.Responses[0].GetInner().(*roachpb.RangeStatsResponse) desc = &res.RangeInfo.Desc stats = res.MVCCStats - qps = res.MaxQueriesPerSecond - qpsOK = qps >= 0 - return desc, stats, qps, qpsOK, nil + + if res.MaxCPUPerSecond >= 0 && res.MaxQueriesPerSecond >= 0 { + err = errors.AssertionFailedf( + "unexpected both max qps %.2f and max cpu %.2f set in range stats response", + res.MaxQueriesPerSecond, res.MaxCPUPerSecond) + return nil, enginepb.MVCCStats{}, loadSplitStats{}, err + } + + if res.MaxQueriesPerSecond >= 0 { + lss.QPS.max = res.MaxQueriesPerSecond + lss.QPS.ok = true + } + + if res.MaxCPUPerSecond >= 0 { + lss.CPU.max = res.MaxCPUPerSecond + lss.CPU.ok = true + } + + return desc, stats, lss, nil } func (mq *mergeQueue) process( @@ -214,7 +230,7 @@ func (mq *mergeQueue) process( lhsDesc := lhsRepl.Desc() lhsStats := lhsRepl.GetMVCCStats() - lhsQPS, lhsQPSOK := lhsRepl.GetMaxSplitQPS(ctx) + lhsSplitStats := lhsRepl.GetLoadSplitStats(ctx) minBytes := lhsRepl.GetMinBytes() if lhsStats.Total() >= minBytes { log.VEventf(ctx, 2, "skipping merge: LHS meets minimum size threshold %d with %d bytes", @@ -222,7 +238,7 @@ func (mq *mergeQueue) process( return false, nil } - rhsDesc, rhsStats, rhsQPS, rhsQPSOK, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey()) + rhsDesc, rhsStats, rhsSplitStats, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey()) if err != nil { return false, err } @@ -247,8 +263,8 @@ func (mq *mergeQueue) process( } mergedStats := lhsStats mergedStats.Add(rhsStats) + mergedLoadSplitStats := lhsSplitStats.merge(rhsSplitStats) - var mergedQPS float64 if lhsRepl.SplitByLoadEnabled() { // When load is a consideration for splits and, by extension, merges, the // mergeQueue is fairly conservative. In an effort to avoid thrashing and to @@ -260,29 +276,46 @@ func (mq *mergeQueue) process( // maximum qps measurement from both sides to be sufficiently stable and // reliable, meaning that it was a maximum measurement over some extended // period of time. - if !lhsQPSOK { - log.VEventf(ctx, 2, "skipping merge: LHS QPS measurement not yet reliable") - return false, nil - } - if !rhsQPSOK { - log.VEventf(ctx, 2, "skipping merge: RHS QPS measurement not yet reliable") - return false, nil + if lhsRepl.QPSSplittingEnabled(ctx) { + if !lhsSplitStats.QPS.ok { + log.VEventf(ctx, 2, "skipping merge: LHS QPS measurement not yet reliable") + return false, nil + } + if !rhsSplitStats.QPS.ok { + log.VEventf(ctx, 2, "skipping merge: RHS QPS measurement not yet reliable") + return false, nil + } + } else { + if !lhsSplitStats.CPU.ok { + log.VEventf(ctx, 2, "skipping merge: LHS CPU measurement not yet reliable") + return false, nil + } + if !rhsSplitStats.CPU.ok { + log.VEventf(ctx, 2, "skipping merge: RHS CPU measurement not yet reliable") + return false, nil + } } - mergedQPS = lhsQPS + rhsQPS } // Check if the merged range would need to be split, if so, skip merge. // Use a lower threshold for load based splitting so we don't find ourselves // in a situation where we keep merging ranges that would be split soon after // by a small increase in load. - conservativeLoadBasedSplitThreshold := 0.5 * lhsRepl.SplitByLoadQPSThreshold() + conservativeLoadBasedSplitThreshold := 0.5 * lhsRepl.SplitByLoadThreshold(ctx) + var exceedsLoadSplitThreshold bool + if lhsRepl.QPSSplittingEnabled(ctx) { + exceedsLoadSplitThreshold = mergedLoadSplitStats.QPS.max >= conservativeLoadBasedSplitThreshold + } else { + exceedsLoadSplitThreshold = mergedLoadSplitStats.CPU.max >= conservativeLoadBasedSplitThreshold + } + shouldSplit, _ := shouldSplitRange(ctx, mergedDesc, mergedStats, lhsRepl.GetMaxBytes(), lhsRepl.shouldBackpressureWrites(), confReader) - if shouldSplit || mergedQPS >= conservativeLoadBasedSplitThreshold { + if shouldSplit || exceedsLoadSplitThreshold { log.VEventf(ctx, 2, "skipping merge to avoid thrashing: merged range %s may split "+ - "(estimated size, estimated QPS: %d, %v)", - mergedDesc, mergedStats.Total(), mergedQPS) + "(estimated size, estimated load: %d, %s)", + mergedDesc, mergedStats.Total(), mergedLoadSplitStats) return false, nil } @@ -360,7 +393,7 @@ func (mq *mergeQueue) process( } // Refresh RHS descriptor. - rhsDesc, _, _, _, err = mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey()) + rhsDesc, _, _, err = mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey()) if err != nil { return false, err } @@ -377,16 +410,24 @@ func (mq *mergeQueue) process( } } + var conservativeThresholdString string + if lhsRepl.QPSSplittingEnabled(ctx) { + conservativeThresholdString = fmt.Sprintf("qps=%.2f", conservativeLoadBasedSplitThreshold) + } else { + conservativeThresholdString = fmt.Sprintf("cpu=%s", string(humanizeutil.Duration(time.Duration(conservativeLoadBasedSplitThreshold)))) + } + log.VEventf(ctx, 2, "merging to produce range: %s-%s", mergedDesc.StartKey, mergedDesc.EndKey) - reason := fmt.Sprintf("lhs+rhs has (size=%s+%s=%s qps=%.2f+%.2f=%.2fqps) below threshold (size=%s, qps=%.2f)", + // TODO(kvoli): handle this print + reason := fmt.Sprintf("lhs+rhs has (size=%s+%s=%s %s+%s=%s) below threshold (size=%s, %s)", humanizeutil.IBytes(lhsStats.Total()), humanizeutil.IBytes(rhsStats.Total()), humanizeutil.IBytes(mergedStats.Total()), - lhsQPS, - rhsQPS, - mergedQPS, + lhsSplitStats, + rhsSplitStats, + mergedLoadSplitStats, humanizeutil.IBytes(minBytes), - conservativeLoadBasedSplitThreshold, + conservativeThresholdString, ) _, pErr := lhsRepl.AdminMerge(ctx, roachpb.AdminMergeRequest{ RequestHeader: roachpb.RequestHeader{Key: lhsRepl.Desc().StartKey.AsRawKey()}, @@ -417,8 +458,9 @@ func (mq *mergeQueue) process( // Adjust the splitter to account for the additional load from the RHS. We // could just Reset the splitter, but then we'd need to wait out a full // measurement period (default of 5m) before merging this range again. - if mergedQPS != 0 { - lhsRepl.loadBasedSplitter.RecordMax(mq.store.Clock().PhysicalTime(), mergedQPS) + // TODO(kvoli): handle this record max + if mergedLoadSplitStats.QPS.max != 0 { + lhsRepl.loadBasedSplitter.RecordMax(mq.store.Clock().PhysicalTime(), mergedLoadSplitStats.QPS.max) } return true, nil } diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index afcba47435a8..5356be4518ee 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -1118,9 +1118,28 @@ func (r *Replica) SetMVCCStatsForTesting(stats *enginepb.MVCCStats) { // NOTE: This should only be used for load based splitting, only // works when the load based splitting cluster setting is enabled. // -// Use QueriesPerSecond() for current QPS stats for all other purposes. +// Use LoadStats.QueriesPerSecond for all other purposes. func (r *Replica) GetMaxSplitQPS(ctx context.Context) (float64, bool) { - return r.loadBasedSplitter.MaxQPS(ctx, r.Clock().PhysicalTime()) + if !r.QPSSplittingEnabled(ctx) { + return 0, false + } + return r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime()) +} + +// GetMaxSplitCPU returns the Replica's maximum CPU/s rate over a configured +// measurement period. If the Replica has not been recording CPU for at least +// an entire measurement period, the method will return false. +// +// NOTE: This should only be used for load based splitting, only +// works when the load based splitting cluster setting is enabled. +// +// Use LoadStats.RaftCPUNanosPerSecond and RequestCPUNanosPerSecond for current +// CPU stats for all other purposes. +func (r *Replica) GetMaxSplitCPU(ctx context.Context) (float64, bool) { + if r.QPSSplittingEnabled(ctx) { + return 0, false + } + return r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime()) } // ContainsKey returns whether this range contains the specified key. diff --git a/pkg/kv/kvserver/replica_eval_context_span.go b/pkg/kv/kvserver/replica_eval_context_span.go index d8ab0d7c67ee..ae0100830575 100644 --- a/pkg/kv/kvserver/replica_eval_context_span.go +++ b/pkg/kv/kvserver/replica_eval_context_span.go @@ -135,6 +135,12 @@ func (rec SpanSetReplicaEvalContext) GetMaxSplitQPS(ctx context.Context) (float6 return rec.i.GetMaxSplitQPS(ctx) } +// GetMaxSplitCPU returns the Replica's maximum CPU/s rate for splitting and +// merging purposes. +func (rec SpanSetReplicaEvalContext) GetMaxSplitCPU(ctx context.Context) (float64, bool) { + return rec.i.GetMaxSplitCPU(ctx) +} + // CanCreateTxnRecord determines whether a transaction record can be created // for the provided transaction information. See Replica.CanCreateTxnRecord // for details about its arguments, return values, and preconditions. diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 41f2fd877c47..69ead29ae9bf 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -95,11 +95,23 @@ func newUnloadedReplica( r.mu.stateLoader = stateloader.Make(desc.RangeID) r.mu.quiescent = true r.mu.conf = store.cfg.DefaultSpanConfig - split.Init(&r.loadBasedSplitter, store.cfg.Settings, split.GlobalRandSource(), func() float64 { - return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV)) - }, func() time.Duration { - return kvserverbase.SplitByLoadMergeDelay.Get(&store.cfg.Settings.SV) - }, store.metrics.LoadSplitterMetrics) + split.Init( + &r.loadBasedSplitter, + store.cfg.Settings, + split.GlobalRandSource(), + SplitByLoadThresholdFn(ctx, store.cfg.Settings), + func() time.Duration { + return kvserverbase.SplitByLoadMergeDelay.Get(&store.cfg.Settings.SV) + }, + store.metrics.LoadSplitterMetrics, + NewFinderFn(ctx, store.cfg.Settings), + ) + + LoadBasedRebalancingDimension.SetOnChange(&store.cfg.Settings.SV, func(ctx context.Context) { + r.setOnDimensionChange(ctx) + }) + + r.mu.proposals = map[kvserverbase.CmdIDKey]*ProposalData{} r.mu.checksums = map[uuid.UUID]*replicaChecksum{} r.mu.proposalBuf.Init((*replicaProposer)(r), tracker.NewLockfreeTracker(), r.Clock(), r.ClusterSettings()) diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index e0017ea86cbf..676b7a8b4f8a 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -134,8 +134,8 @@ func (r *Replica) SendWithWriteBytes( // Record the CPU time processing the request for this replica. This is // recorded regardless of errors that are encountered. - defer r.MeasureReqCPUNanos(grunning.Time()) - + startCPU := grunning.Time() + defer r.MeasureReqCPUNanos(startCPU) // Record summary throughput information about the batch request for // accounting. r.recordBatchRequestLoad(ctx, ba) @@ -201,12 +201,14 @@ func (r *Replica) SendWithWriteBytes( } } - // Return range information if it was requested. Note that we don't return it - // on errors because the code doesn't currently support returning both a br - // and a pErr here. Also, some errors (e.g. NotLeaseholderError) have custom - // ways of returning range info. if pErr == nil { + // Return range information if it was requested. Note that we don't return it + // on errors because the code doesn't currently support returning both a br + // and a pErr here. Also, some errors (e.g. NotLeaseholderError) have custom + // ways of returning range info. r.maybeAddRangeInfoToResponse(ctx, ba, br) + // Handle load-based splitting, if necessary. + r.recordBatchForLoadBasedSplitting(ctx, ba, br, int(grunning.Difference(startCPU, grunning.Time()))) } r.recordRequestWriteBytes(writeBytes) @@ -405,17 +407,6 @@ func (r *Replica) executeBatchWithConcurrencyRetries( var requestEvalKind concurrency.RequestEvalKind var g *concurrency.Guard defer func() { - // Handle load-based splitting, if necessary. - if pErr == nil && br != nil { - if len(ba.Requests) != len(br.Responses) { - log.KvDistribution.Errorf(ctx, - "Requests and responses should be equal lengths: # of requests = %d, # of responses = %d", - len(ba.Requests), len(br.Responses)) - } else { - r.recordBatchForLoadBasedSplitting(ctx, ba, br) - } - } - // NB: wrapped to delay g evaluation to its value when returning. if g != nil { r.concMgr.FinishReq(g) @@ -1005,8 +996,9 @@ func (r *Replica) executeAdminBatch( return br, nil } -// recordBatchRequestLoad records the load information about a batch request issued -// against this replica. +// recordBatchRequestLoad records the load information about a batch request +// completed against this replica. This also handles load based splitting based +// on the request load. func (r *Replica) recordBatchRequestLoad(ctx context.Context, ba *roachpb.BatchRequest) { if r.loadStats == nil { log.VEventf( diff --git a/pkg/kv/kvserver/replica_split_load.go b/pkg/kv/kvserver/replica_split_load.go index 3cc43ae28e39..1e4f67ccb668 100644 --- a/pkg/kv/kvserver/replica_split_load.go +++ b/pkg/kv/kvserver/replica_split_load.go @@ -12,12 +12,26 @@ package kvserver import ( "context" + "fmt" + "strings" + "time" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) +// transitions (cluster setting change) +// add rpc field MaxCPU +// maxCPU / maxQPS alwasy return not ok when being used +// decider reset +// set threshold function +// set finder creation fn + // SplitByLoadEnabled wraps "kv.range_split.by_load_enabled". var SplitByLoadEnabled = settings.RegisterBoolSetting( settings.TenantWritable, @@ -26,6 +40,16 @@ var SplitByLoadEnabled = settings.RegisterBoolSetting( true, ).WithPublic() +var EnableUnweightedLBSplitFinder = settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.unweighted_lb_split_finder.enabled", + "if enabled, use the un-weighted finder for load-based splitting; "+ + "the unweighted finder will attempt to find a key during when splitting "+ + "a range based on load that evenly divides the QPS among the resulting "+ + "left and right hand side ranges", + false, +) + // SplitByLoadQPSThreshold wraps "kv.range_split.load_qps_threshold". var SplitByLoadQPSThreshold = settings.RegisterIntSetting( settings.TenantWritable, @@ -34,11 +58,51 @@ var SplitByLoadQPSThreshold = settings.RegisterIntSetting( 2500, // 2500 req/s ).WithPublic() -// SplitByLoadQPSThreshold returns the QPS request rate for a given replica. -func (r *Replica) SplitByLoadQPSThreshold() float64 { - return float64(SplitByLoadQPSThreshold.Get(&r.store.cfg.Settings.SV)) +// SplitByLoadCPUThreshold wraps "kv.range_split.load_qps_threshold". +var SplitByLoadCPUThreshold = settings.RegisterDurationSetting( + settings.TenantWritable, + "kv.range_split.load_cpu_threshold", + "the CPU use per second over which, the range becomes a candidate for load based splitting", + 250*time.Millisecond, +).WithPublic() + +func (r *Replica) SplitByLoadThreshold(ctx context.Context) float64 { + return SplitByLoadThresholdFn(ctx, r.store.cfg.Settings)() +} + +func SplitByLoadThresholdFn(ctx context.Context, st *cluster.Settings) func() float64 { + return func() float64 { + if QPSSplittingEnabled(ctx, st) { + return float64(SplitByLoadQPSThreshold.Get(&st.SV)) + } + return float64(SplitByLoadCPUThreshold.Get(&st.SV)) + } +} + +func NewFinderFn( + ctx context.Context, st *cluster.Settings, +) func(time.Time, split.RandSource) split.LoadBasedSplitter { + return func(startTime time.Time, randSource split.RandSource) split.LoadBasedSplitter { + if QPSSplittingEnabled(ctx, st) { + return split.NewUnweightedFinder(startTime, randSource) + } else { + return split.NewWeightedFinder(startTime, randSource) + } + } } +// split queue +// - store rebalancer dimension +// - threshold for that dimension +// merge queue +// - store rebalancer dimension +// - threshold for that dimension +// - which value to look at when trying to merge +// finder +// - split threshold +// - new finder fn +// - when to reset for new dimension +// // SplitByLoadEnabled returns whether load based splitting is enabled. // Although this is a method of *Replica, the configuration is really global, // shared across all stores. @@ -47,6 +111,19 @@ func (r *Replica) SplitByLoadEnabled() bool { !r.store.TestingKnobs().DisableLoadBasedSplitting } +func QPSSplittingEnabled(ctx context.Context, sv *cluster.Settings) bool { + rebalanceObjective := LoadBasedRebalancingObjective(ctx, sv) + return rebalanceObjective == LBRebalancingQueries || EnableUnweightedLBSplitFinder.Get(&sv.SV) +} + +func (r *Replica) QPSSplittingEnabled(ctx context.Context) bool { + return QPSSplittingEnabled(ctx, r.store.cfg.Settings) +} + +func (r *Replica) setOnDimensionChange(ctx context.Context) { + r.loadBasedSplitter.Reset(r.Clock().PhysicalTime()) +} + // getResponseBoundarySpan computes the union span of the true spans that were // iterated over using the request span and the response's resumeSpan. // @@ -120,15 +197,81 @@ func getResponseBoundarySpan( return } +type loadSplitStat struct { + max float64 + ok bool +} + +func (ls loadSplitStat) merge(other loadSplitStat) loadSplitStat { + return loadSplitStat{ + ok: ls.ok && other.ok, + max: ls.max + other.max, + } +} + +type loadSplitStats struct { + CPU loadSplitStat + QPS loadSplitStat +} + +func (lss loadSplitStats) merge(other loadSplitStats) loadSplitStats { + return loadSplitStats{ + CPU: lss.CPU.merge(other.CPU), + QPS: lss.QPS.merge(other.QPS), + } +} + +func (lss loadSplitStats) String() string { + var buf strings.Builder + + if lss.CPU.ok { + fmt.Fprintf(&buf, "cpu-per-second=%s", string(humanizeutil.Duration(time.Duration(int64(lss.CPU.max))))) + } + if lss.QPS.ok { + fmt.Fprintf(&buf, "queries-per-second=%.2f", lss.QPS.max) + } + return buf.String() +} + +func (r *Replica) GetLoadSplitStats(ctx context.Context) loadSplitStats { + lss := loadSplitStats{} + max, ok := r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime()) + if r.QPSSplittingEnabled(ctx) { + lss.QPS = loadSplitStat{max: max, ok: ok} + } else { + lss.CPU = loadSplitStat{max: max, ok: ok} + } + return lss +} + // recordBatchForLoadBasedSplitting records the batch's spans to be considered // for load based splitting. func (r *Replica) recordBatchForLoadBasedSplitting( - ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse, + ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse, stat int, ) { if !r.SplitByLoadEnabled() { return } - shouldInitSplit := r.loadBasedSplitter.Record(ctx, timeutil.Now(), len(ba.Requests), func() roachpb.Span { + + // When there is nothing to do when either the batch request or batch + // response are nil. + if ba == nil || br == nil { + return + } + + if len(ba.Requests) != len(br.Responses) { + log.KvDistribution.Errorf(ctx, + "Requests and responses should be equal lengths: # of requests = %d, # of responses = %d", + len(ba.Requests), len(br.Responses)) + } + + // When QPS splitting is enabled, use the number of requests rather than + // the given stat for recording load. + if r.QPSSplittingEnabled(ctx) { + stat = len(ba.Requests) + } + + shouldInitSplit := r.loadBasedSplitter.Record(ctx, timeutil.Now(), stat, func() roachpb.Span { return getResponseBoundarySpan(ba, br) }) if shouldInitSplit { diff --git a/pkg/kv/kvserver/split/BUILD.bazel b/pkg/kv/kvserver/split/BUILD.bazel index 031270e152c7..3ad9cf6333d5 100644 --- a/pkg/kv/kvserver/split/BUILD.bazel +++ b/pkg/kv/kvserver/split/BUILD.bazel @@ -13,7 +13,6 @@ go_library( deps = [ "//pkg/keys", "//pkg/roachpb", - "//pkg/settings", "//pkg/settings/cluster", "//pkg/util/log", "//pkg/util/metric", diff --git a/pkg/kv/kvserver/split/decider.go b/pkg/kv/kvserver/split/decider.go index e34937bb4641..10884d655998 100644 --- a/pkg/kv/kvserver/split/decider.go +++ b/pkg/kv/kvserver/split/decider.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" @@ -29,7 +28,7 @@ import ( const minSplitSuggestionInterval = time.Minute const minNoSplitKeyLoggingMetricsInterval = time.Minute -const minQueriesPerSecondSampleDuration = time.Second +const minPerSecondSampleDuration = time.Second type LoadBasedSplitter interface { // Record informs the LoadBasedSplitter about where the span lies with regard @@ -86,16 +85,6 @@ func GlobalRandSource() RandSource { return globalRandSource{} } -var enableUnweightedLBSplitFinder = settings.RegisterBoolSetting( - settings.SystemOnly, - "kv.unweighted_lb_split_finder.enabled", - "if enabled, use the un-weighted finder for load-based splitting; "+ - "the unweighted finder will attempt to find a key during when splitting "+ - "a range based on load that evenly divides the QPS among the resulting "+ - "left and right hand side ranges", - true, -) - // A Decider collects measurements about the activity (measured in qps) on a // Replica and, assuming that qps thresholds are exceeded, tries to determine a // split key that would approximately result in halving the load on each of the @@ -134,20 +123,21 @@ type LoadSplitterMetrics struct { type Decider struct { st *cluster.Settings // supplied to Init randSource RandSource // supplied to Init - qpsThreshold func() float64 // supplied to Init - qpsRetention func() time.Duration // supplied to Init + statThreshold func() float64 // supplied to Init + statRetention func() time.Duration // supplied to Init + newFinderFn func(time.Time, RandSource) LoadBasedSplitter loadSplitterMetrics *LoadSplitterMetrics // supplied to Init mu struct { syncutil.Mutex // Fields tracking the current qps sample. - lastQPSRollover time.Time // most recent time recorded by requests. - lastQPS float64 // last reqs/s rate as of lastQPSRollover - count int64 // number of requests recorded since last rollover + lastStatRollover time.Time // most recent time recorded by requests. + lastStatVal float64 // last reqs/s rate as of lastStatRollover + count int64 // number of requests recorded since last rollover // Fields tracking historical qps samples. - maxQPS maxQPSTracker + maxStat maxStatTracker // Fields tracking split key suggestions. splitFinder LoadBasedSplitter // populated when engaged or decided @@ -166,15 +156,17 @@ func Init( lbs *Decider, st *cluster.Settings, randSource RandSource, - qpsThreshold func() float64, - qpsRetention func() time.Duration, + statThreshold func() float64, + statRetention func() time.Duration, loadSplitterMetrics *LoadSplitterMetrics, + newFinderFn func(time.Time, RandSource) LoadBasedSplitter, ) { lbs.st = st lbs.randSource = randSource - lbs.qpsThreshold = qpsThreshold - lbs.qpsRetention = qpsRetention + lbs.statThreshold = statThreshold + lbs.statRetention = statRetention lbs.loadSplitterMetrics = loadSplitterMetrics + lbs.newFinderFn = newFinderFn } // Record notifies the Decider that 'n' operations are being carried out which @@ -198,32 +190,28 @@ func (d *Decider) recordLocked( d.mu.count += int64(n) // First compute requests per second since the last check. - if d.mu.lastQPSRollover.IsZero() { - d.mu.lastQPSRollover = now + if d.mu.lastStatRollover.IsZero() { + d.mu.lastStatRollover = now } - elapsedSinceLastQPS := now.Sub(d.mu.lastQPSRollover) - if elapsedSinceLastQPS >= minQueriesPerSecondSampleDuration { - // Update the latest QPS and reset the time and request counter. - d.mu.lastQPS = (float64(d.mu.count) / float64(elapsedSinceLastQPS)) * 1e9 - d.mu.lastQPSRollover = now + elapsedSinceLastSample := now.Sub(d.mu.lastStatRollover) + if elapsedSinceLastSample >= minPerSecondSampleDuration { + // Update the latest stat value and reset the time and request counter. + d.mu.lastStatVal = (float64(d.mu.count) / float64(elapsedSinceLastSample)) * 1e9 + d.mu.lastStatRollover = now d.mu.count = 0 - // Record the latest QPS sample in the historical tracker. - d.mu.maxQPS.record(now, d.qpsRetention(), d.mu.lastQPS) + // Record the latest stat sample in the historical tracker. + d.mu.maxStat.record(now, d.statRetention(), d.mu.lastStatVal) - // If the QPS for the range exceeds the threshold, start actively + // If the stat for the range exceeds the threshold, start actively // tracking potential for splitting this range based on load. // This tracking will begin by initiating a splitFinder so it can // begin to Record requests so it can find a split point. If a // splitFinder already exists, we check if a split point is ready // to be used. - if d.mu.lastQPS >= d.qpsThreshold() { + if d.mu.lastStatVal >= d.statThreshold() { if d.mu.splitFinder == nil { - if d.st == nil || enableUnweightedLBSplitFinder.Get(&d.st.SV) { - d.mu.splitFinder = NewUnweightedFinder(now, d.randSource) - } else { - d.mu.splitFinder = NewWeightedFinder(now, d.randSource) - } + d.mu.splitFinder = d.newFinderFn(now, d.randSource) } } else { d.mu.splitFinder = nil @@ -261,34 +249,34 @@ func (d *Decider) recordLocked( return false } -// RecordMax adds a QPS measurement directly into the Decider's historical QPS -// tracker. The QPS sample is considered to have been captured at the provided -// time. +// RecordMax adds a stat measurement directly into the Decider's historical +// stat value tracker. The stat sample is considered to have been captured at +// the provided time. func (d *Decider) RecordMax(now time.Time, qps float64) { d.mu.Lock() defer d.mu.Unlock() - d.mu.maxQPS.record(now, d.qpsRetention(), qps) + d.mu.maxStat.record(now, d.statRetention(), qps) } -// LastQPS returns the most recent QPS measurement. -func (d *Decider) LastQPS(ctx context.Context, now time.Time) float64 { +// LastStat returns the most recent stat measurement. +func (d *Decider) LastStat(ctx context.Context, now time.Time) float64 { d.mu.Lock() defer d.mu.Unlock() - d.recordLocked(ctx, now, 0, nil) // force QPS computation - return d.mu.lastQPS + d.recordLocked(ctx, now, 0, nil) // force stat computation + return d.mu.lastStatVal } -// MaxQPS returns the maximum QPS measurement recorded over the retention +// MaxStat returns the maximum stat measurement recorded over the retention // period. If the Decider has not been recording for a full retention period, // the method returns false. -func (d *Decider) MaxQPS(ctx context.Context, now time.Time) (float64, bool) { +func (d *Decider) MaxStat(ctx context.Context, now time.Time) (float64, bool) { d.mu.Lock() defer d.mu.Unlock() - d.recordLocked(ctx, now, 0, nil) // force QPS computation - return d.mu.maxQPS.maxQPS(now, d.qpsRetention()) + d.recordLocked(ctx, now, 0, nil) // force stat computation + return d.mu.maxStat.max(now, d.statRetention()) } // MaybeSplitKey returns a key to perform a split at. The return value will be @@ -345,21 +333,21 @@ func (d *Decider) MaybeSplitKey(ctx context.Context, now time.Time) roachpb.Key } // Reset deactivates any current attempt at determining a split key. The method -// also discards any historical QPS tracking information. +// also discards any historical stat tracking information. func (d *Decider) Reset(now time.Time) { d.mu.Lock() defer d.mu.Unlock() - d.mu.lastQPSRollover = time.Time{} - d.mu.lastQPS = 0 + d.mu.lastStatRollover = time.Time{} + d.mu.lastStatVal = 0 d.mu.count = 0 - d.mu.maxQPS.reset(now, d.qpsRetention()) + d.mu.maxStat.reset(now, d.statRetention()) d.mu.splitFinder = nil d.mu.lastSplitSuggestion = time.Time{} d.mu.lastNoSplitKeyLoggingMetrics = time.Time{} } -// maxQPSTracker collects a series of queries-per-second measurement samples and +// maxStatTracker collects a series of stat per-second measurement samples and // tracks the maximum observed over a period of time. // // The tracker internally uses a set of time windows in order to age out old @@ -367,13 +355,13 @@ func (d *Decider) Reset(now time.Time) { // a circular buffer of the last N windows of stats. We rotate through the // circular buffer every so often as determined by `minRetention`. // -// The tracker can be queried through its `maxQPS` method, which returns the +// The tracker can be queried through its `max` method, which returns the // maximum of all queries-per-second samples recorded over the retention period. // If the tracker has not been recording for a full retention period, then the // method returns false. // -// The zero-value of a maxQPSTracker can be used immediately. -type maxQPSTracker struct { +// The zero-value of a maxStatTracker can be used immediately. +type maxStatTracker struct { windows [6]float64 curIdx int curStart time.Time @@ -382,15 +370,15 @@ type maxQPSTracker struct { } // record adds the qps sample to the tracker. -func (t *maxQPSTracker) record(now time.Time, minRetention time.Duration, qps float64) { +func (t *maxStatTracker) record(now time.Time, minRetention time.Duration, qps float64) { t.maybeReset(now, minRetention) t.maybeRotate(now) t.windows[t.curIdx] = max(t.windows[t.curIdx], qps) } -// reset clears the tracker. maxQPS will begin returning false until a full +// reset clears the tracker. maxStatTracker will begin returning false until a full // minRetention period has elapsed. -func (t *maxQPSTracker) reset(now time.Time, minRetention time.Duration) { +func (t *maxStatTracker) reset(now time.Time, minRetention time.Duration) { if minRetention <= 0 { panic("minRetention must be positive") } @@ -401,18 +389,18 @@ func (t *maxQPSTracker) reset(now time.Time, minRetention time.Duration) { t.minRetention = minRetention } -func (t *maxQPSTracker) maybeReset(now time.Time, minRetention time.Duration) { +func (t *maxStatTracker) maybeReset(now time.Time, minRetention time.Duration) { // If the retention period changes, simply reset the entire tracker. Merging // or splitting windows would be a difficult task and could lead to samples // either not being retained for long-enough, or being retained for too long. - // Resetting indicates to maxQPS that a new retention period needs to be + // Resetting indicates to max that a new retention period needs to be // measured before accurate results can be returned. if minRetention != t.minRetention { t.reset(now, minRetention) } } -func (t *maxQPSTracker) maybeRotate(now time.Time) { +func (t *maxStatTracker) maybeRotate(now time.Time) { sinceLastRotate := now.Sub(t.curStart) windowWidth := t.windowWidth() if sinceLastRotate < windowWidth { @@ -435,10 +423,10 @@ func (t *maxQPSTracker) maybeRotate(now time.Time) { } } -// maxQPS returns the maximum queries-per-second samples recorded over the last +// max returns the maximum queries-per-second samples recorded over the last // retention period. If the tracker has not been recording for a full retention // period, then the method returns false. -func (t *maxQPSTracker) maxQPS(now time.Time, minRetention time.Duration) (float64, bool) { +func (t *maxStatTracker) max(now time.Time, minRetention time.Duration) (float64, bool) { t.record(now, minRetention, 0) // expire samples, if necessary if now.Sub(t.lastReset) < t.minRetention { @@ -453,7 +441,7 @@ func (t *maxQPSTracker) maxQPS(now time.Time, minRetention time.Duration) (float return qps, true } -func (t *maxQPSTracker) windowWidth() time.Duration { +func (t *maxStatTracker) windowWidth() time.Duration { // NB: -1 because during a rotation, only len(t.windows)-1 windows survive. return t.minRetention / time.Duration(len(t.windows)-1) } diff --git a/pkg/kv/kvserver/split_queue.go b/pkg/kv/kvserver/split_queue.go index 56adf14df73b..2810f497a119 100644 --- a/pkg/kv/kvserver/split_queue.go +++ b/pkg/kv/kvserver/split_queue.go @@ -266,12 +266,18 @@ func (sq *splitQueue) processAttempt( loadStats := r.loadStats.Stats() batchHandledQPS := loadStats.QueriesPerSecond raftAppliedQPS := loadStats.WriteKeysPerSecond - splitQPS := r.loadBasedSplitter.LastQPS(ctx, now) + lastSplitStat := r.loadBasedSplitter.LastStat(ctx, now) + var loadSplitString string + if r.QPSSplittingEnabled(ctx) { + loadSplitString = fmt.Sprintf("%.2f reqs/sec", lastSplitStat) + } else { + loadSplitString = fmt.Sprintf("%s cpu/sec", string(humanizeutil.Duration(time.Duration(lastSplitStat)))) + } reason := fmt.Sprintf( - "load at key %s (%.2f splitQPS, %.2f batches/sec, %.2f raft mutations/sec)", + "load at key %s (%s, %.2f batches/sec, %.2f raft mutations/sec)", splitByLoadKey, - splitQPS, + loadSplitString, batchHandledQPS, raftAppliedQPS, ) diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index d2623562c32e..1b7725013f4b 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -2091,6 +2091,14 @@ message RangeStatsResponse { // base important decisions off of. double max_queries_per_second = 5; + // MaxCPUPerSecond is the maximum rate of cpu/s that the range has used at + // the leaseholder over a configured measurement period. Set to -1 if the + // replica serving the RangeStats request has not been the leaseholder long + // enough to have recorded CPU rates for at least a full measurement period. + // In such cases, the reciipient should not consider the CPU value reliable + // enough to base important decisions off of. + double max_cpu_per_second = 7 [(gogoproto.customname) = "MaxCPUPerSecond"]; + // range_info contains descriptor and lease information. RangeInfo range_info = 4 [(gogoproto.nullable) = false];