diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 8925c203da55..e41d1d4218a6 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -42,6 +42,7 @@ kv.closed_timestamp.follower_reads_enabled boolean true allow (all) replicas to kv.log_range_and_node_events.enabled boolean true set to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconciling jobs with protected timestamp records kv.range_split.by_load_enabled boolean true allow automatic splits of ranges based on where load is concentrated +kv.range_split.load_cpu_threshold duration 250ms the CPU use per second over which, the range becomes a candidate for load based splitting kv.range_split.load_qps_threshold integer 2500 the QPS over which, the range becomes a candidate for load based splitting kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled kv.rangefeed.range_stuck_threshold duration 1m0s restart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.closed_timestamp.side_transport_interval takes precedence) @@ -297,4 +298,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 1000022.2-28 set the active cluster version in the format '.' +version version 1000022.2-30 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index d60ce0df78e8..242aa5dcd4fe 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -56,6 +56,7 @@
kv.log_range_and_node_events.enabled
booleantrueset to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog
kv.protectedts.reconciliation.interval
duration5m0sthe frequency for reconciling jobs with protected timestamp records
kv.range_split.by_load_enabled
booleantrueallow automatic splits of ranges based on where load is concentrated +
kv.range_split.load_cpu_threshold
duration250msthe CPU use per second over which, the range becomes a candidate for load based splitting
kv.range_split.load_qps_threshold
integer2500the QPS over which, the range becomes a candidate for load based splitting
kv.rangefeed.enabled
booleanfalseif set, rangefeed registration is enabled
kv.rangefeed.range_stuck_threshold
duration1m0srestart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.closed_timestamp.side_transport_interval takes precedence) @@ -237,6 +238,6 @@
trace.opentelemetry.collector
stringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled
booleantrueif set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector
stringthe address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used. -
version
version1000022.2-28set the active cluster version in the format '<major>.<minor>' +
version
version1000022.2-30set the active cluster version in the format '<major>.<minor>' diff --git a/pkg/kv/kvserver/batcheval/cmd_range_stats.go b/pkg/kv/kvserver/batcheval/cmd_range_stats.go index 85277d6ee947..bc8a42b40c51 100644 --- a/pkg/kv/kvserver/batcheval/cmd_range_stats.go +++ b/pkg/kv/kvserver/batcheval/cmd_range_stats.go @@ -44,12 +44,19 @@ func RangeStats( ) (result.Result, error) { reply := resp.(*roachpb.RangeStatsResponse) reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats() + if qps, ok := cArgs.EvalCtx.GetMaxSplitQPS(ctx); ok { reply.MaxQueriesPerSecond = qps } else { // See comment on MaxQueriesPerSecond. -1 means !ok. reply.MaxQueriesPerSecond = -1 } + if cpu, ok := cArgs.EvalCtx.GetMaxSplitCPU(ctx); ok { + reply.MaxCPUPerSecond = cpu + } else { + // See comment on MaxCPUPerSecond. -1 means !ok. + reply.MaxCPUPerSecond = -1 + } reply.RangeInfo = cArgs.EvalCtx.GetRangeInfo(ctx) return result.Result{}, nil } diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index 6f4aeb7362b5..7829a9d56b5a 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -89,6 +89,13 @@ type EvalContext interface { // is disabled. GetMaxSplitQPS(context.Context) (float64, bool) + // GetMaxSplitCPU returns the Replicas maximum request cpu/s rate over a + // configured retention period. + // + // NOTE: This should not be used when the load based splitting cluster setting + // is disabled. + GetMaxSplitCPU(context.Context) (float64, bool) + GetGCThreshold() hlc.Timestamp ExcludeDataFromBackup() bool GetLastReplicaGCTimestamp(context.Context) (hlc.Timestamp, error) @@ -156,6 +163,7 @@ type MockEvalCtx struct { Clock *hlc.Clock Stats enginepb.MVCCStats QPS float64 + CPU float64 AbortSpan *abortspan.AbortSpan GCThreshold hlc.Timestamp Term, FirstIndex uint64 @@ -234,6 +242,9 @@ func (m *mockEvalCtxImpl) GetMVCCStats() enginepb.MVCCStats { func (m *mockEvalCtxImpl) GetMaxSplitQPS(context.Context) (float64, bool) { return m.QPS, true } +func (m *mockEvalCtxImpl) GetMaxSplitCPU(context.Context) (float64, bool) { + return m.CPU, true +} func (m *mockEvalCtxImpl) CanCreateTxnRecord( context.Context, uuid.UUID, []byte, hlc.Timestamp, ) (bool, hlc.Timestamp, roachpb.TransactionAbortedReason) { diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index 311f452399ca..7ebd0e02cabc 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -4346,10 +4346,17 @@ func TestMergeQueue(t *testing.T) { }) t.Run("load-based-merging", func(t *testing.T) { - const splitByLoadQPS = 10 - const mergeByLoadQPS = splitByLoadQPS / 2 // see conservativeLoadBasedSplitThreshold + // NB: It is possible for the ranges being checked to record load + // during the test. To avoid flakiness, we set the splitByLoadStat high + // enough that any recorded load from testing won't exceed it. + const splitByLoadStat = 10e9 + const mergeByLoadStat = splitByLoadStat / 2 // see conservativeLoadBasedSplitThreshold const splitByLoadMergeDelay = 500 * time.Millisecond + setSplitDimension := func(dim kvserver.LBRebalancingDimension) { + kvserver.LoadBasedRebalancingDimension.Override(ctx, sv, int64(dim)) + } + resetForLoadBasedSubtest := func(t *testing.T) { reset(t) @@ -4366,7 +4373,8 @@ func TestMergeQueue(t *testing.T) { // meaning that it was a maximum measurement over some extended period of // time. kvserver.SplitByLoadEnabled.Override(ctx, sv, true) - kvserver.SplitByLoadQPSThreshold.Override(ctx, sv, splitByLoadQPS) + kvserver.SplitByLoadQPSThreshold.Override(ctx, sv, splitByLoadStat) + kvserver.SplitByLoadCPUThreshold.Override(ctx, sv, splitByLoadStat) // Drop the load-based splitting merge delay setting, which also dictates // the duration that a leaseholder must measure QPS before considering its @@ -4381,47 +4389,104 @@ func TestMergeQueue(t *testing.T) { rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().PhysicalTime()) manualClock.Increment(splitByLoadMergeDelay.Nanoseconds()) } + for _, splitDimension := range []kvserver.LBRebalancingDimension{ + kvserver.LBRebalancingQueries, + kvserver.LBRebalancingStoreCPU, + } { + setSplitDimension(splitDimension) + t.Run(fmt.Sprintf("unreliable-lhs-%s", splitDimension.ToDimension().String()), func(t *testing.T) { + resetForLoadBasedSubtest(t) - t.Run("unreliable-lhs-qps", func(t *testing.T) { - resetForLoadBasedSubtest(t) + lhs().LoadBasedSplitter().Reset(tc.Servers[0].Clock().PhysicalTime()) - lhs().LoadBasedSplitter().Reset(tc.Servers[0].Clock().PhysicalTime()) + clearRange(t, lhsStartKey, rhsEndKey) + verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) + }) - clearRange(t, lhsStartKey, rhsEndKey) - verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) - }) + t.Run(fmt.Sprintf("unreliable-rhs-%s", splitDimension.ToDimension().String()), func(t *testing.T) { + resetForLoadBasedSubtest(t) - t.Run("unreliable-rhs-qps", func(t *testing.T) { - resetForLoadBasedSubtest(t) + rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().PhysicalTime()) - rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().PhysicalTime()) + clearRange(t, lhsStartKey, rhsEndKey) + verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) + }) - clearRange(t, lhsStartKey, rhsEndKey) - verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) - }) + t.Run(fmt.Sprintf("combined-%s-above-threshold", splitDimension.ToDimension().String()), func(t *testing.T) { + resetForLoadBasedSubtest(t) + + moreThanHalfStat := mergeByLoadStat/2 + 1 + rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), moreThanHalfStat) + lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), moreThanHalfStat) - t.Run("combined-qps-above-threshold", func(t *testing.T) { - resetForLoadBasedSubtest(t) + clearRange(t, lhsStartKey, rhsEndKey) + verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) + }) - moreThanHalfQPS := mergeByLoadQPS/2 + 1 - rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), float64(moreThanHalfQPS)) - lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), float64(moreThanHalfQPS)) + t.Run(fmt.Sprintf("combined-%s-below-threshold", splitDimension.ToDimension().String()), func(t *testing.T) { + resetForLoadBasedSubtest(t) - clearRange(t, lhsStartKey, rhsEndKey) - verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) - }) + manualClock.Increment(splitByLoadMergeDelay.Nanoseconds()) + lessThanHalfStat := mergeByLoadStat/2 - 1 + rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), lessThanHalfStat) + lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), lessThanHalfStat) - t.Run("combined-qps-below-threshold", func(t *testing.T) { - resetForLoadBasedSubtest(t) + clearRange(t, lhsStartKey, rhsEndKey) + verifyMergedSoon(t, store, lhsStartKey, rhsStartKey) + }) - manualClock.Increment(splitByLoadMergeDelay.Nanoseconds()) - lessThanHalfQPS := mergeByLoadQPS/2 - 1 - rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), float64(lessThanHalfQPS)) - lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), float64(lessThanHalfQPS)) + // These nested tests assert that after changing the split + // dimension, any previous load is discarded and the range will not + // merge, even if the previous load was above or below the + // threshold. + for _, secondSplitDimension := range []kvserver.LBRebalancingDimension{ + kvserver.LBRebalancingQueries, + kvserver.LBRebalancingStoreCPU, + } { + if splitDimension == secondSplitDimension { + // Nothing to do when there is no change. We expect the + // same outcome as the above tests. + continue + } + t.Run(fmt.Sprintf("switch-%s-to-%s-prev-combined-above-threshold", + splitDimension.ToDimension().String(), + secondSplitDimension.ToDimension().String(), + ), func(t *testing.T) { + // Set the split dimension again, since we have modified it + // at the bottom of this loop. + setSplitDimension(splitDimension) + resetForLoadBasedSubtest(t) + + moreThanHalfStat := mergeByLoadStat/2 + 1 + rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), moreThanHalfStat) + lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), moreThanHalfStat) + + clearRange(t, lhsStartKey, rhsEndKey) + // Switch the dimension, so that any recorded load should + // be discarded and despite being above the threshold (for + // both dimensions), it shouldn't merge. + setSplitDimension(secondSplitDimension) + verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) + }) - clearRange(t, lhsStartKey, rhsEndKey) - verifyMergedSoon(t, store, lhsStartKey, rhsStartKey) - }) + t.Run(fmt.Sprintf("switch-%s-to-%s-prev-combined-below-threshold", + splitDimension.ToDimension().String(), + secondSplitDimension.ToDimension().String(), + ), func(t *testing.T) { + setSplitDimension(splitDimension) + resetForLoadBasedSubtest(t) + + manualClock.Increment(splitByLoadMergeDelay.Nanoseconds()) + lessThanHalfStat := mergeByLoadStat/2 - 1 + rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), lessThanHalfStat) + lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), lessThanHalfStat) + + clearRange(t, lhsStartKey, rhsEndKey) + setSplitDimension(secondSplitDimension) + verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) + }) + } + } }) t.Run("sticky-bit", func(t *testing.T) { diff --git a/pkg/kv/kvserver/merge_queue.go b/pkg/kv/kvserver/merge_queue.go index 4ab4cc0f196b..0b0b76d50e51 100644 --- a/pkg/kv/kvserver/merge_queue.go +++ b/pkg/kv/kvserver/merge_queue.go @@ -184,7 +184,7 @@ var _ PurgatoryError = rangeMergePurgatoryError{} func (mq *mergeQueue) requestRangeStats( ctx context.Context, key roachpb.Key, -) (desc *roachpb.RangeDescriptor, stats enginepb.MVCCStats, qps float64, qpsOK bool, err error) { +) (desc *roachpb.RangeDescriptor, stats enginepb.MVCCStats, lss loadSplitStats, err error) { ba := &roachpb.BatchRequest{} ba.Add(&roachpb.RangeStatsRequest{ @@ -193,15 +193,31 @@ func (mq *mergeQueue) requestRangeStats( br, pErr := mq.db.NonTransactionalSender().Send(ctx, ba) if pErr != nil { - return nil, enginepb.MVCCStats{}, 0, false, pErr.GoError() + return nil, enginepb.MVCCStats{}, loadSplitStats{}, pErr.GoError() } res := br.Responses[0].GetInner().(*roachpb.RangeStatsResponse) desc = &res.RangeInfo.Desc stats = res.MVCCStats - qps = res.MaxQueriesPerSecond - qpsOK = qps >= 0 - return desc, stats, qps, qpsOK, nil + + if res.MaxCPUPerSecond >= 0 && res.MaxQueriesPerSecond >= 0 { + err = errors.AssertionFailedf( + "unexpected both max qps %.2f and max cpu %.2f set in range stats response", + res.MaxQueriesPerSecond, res.MaxCPUPerSecond) + return nil, enginepb.MVCCStats{}, loadSplitStats{}, err + } + + if res.MaxQueriesPerSecond >= 0 { + lss.QPS.max = res.MaxQueriesPerSecond + lss.QPS.ok = true + } + + if res.MaxCPUPerSecond >= 0 { + lss.CPU.max = res.MaxCPUPerSecond + lss.CPU.ok = true + } + + return desc, stats, lss, nil } func (mq *mergeQueue) process( @@ -214,7 +230,7 @@ func (mq *mergeQueue) process( lhsDesc := lhsRepl.Desc() lhsStats := lhsRepl.GetMVCCStats() - lhsQPS, lhsQPSOK := lhsRepl.GetMaxSplitQPS(ctx) + lhsSplitStats := lhsRepl.GetLoadSplitStats(ctx) minBytes := lhsRepl.GetMinBytes() if lhsStats.Total() >= minBytes { log.VEventf(ctx, 2, "skipping merge: LHS meets minimum size threshold %d with %d bytes", @@ -222,7 +238,7 @@ func (mq *mergeQueue) process( return false, nil } - rhsDesc, rhsStats, rhsQPS, rhsQPSOK, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey()) + rhsDesc, rhsStats, rhsSplitStats, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey()) if err != nil { return false, err } @@ -247,8 +263,8 @@ func (mq *mergeQueue) process( } mergedStats := lhsStats mergedStats.Add(rhsStats) + mergedLoadSplitStats := lhsSplitStats.merge(rhsSplitStats) - var mergedQPS float64 if lhsRepl.SplitByLoadEnabled() { // When load is a consideration for splits and, by extension, merges, the // mergeQueue is fairly conservative. In an effort to avoid thrashing and to @@ -260,29 +276,46 @@ func (mq *mergeQueue) process( // maximum qps measurement from both sides to be sufficiently stable and // reliable, meaning that it was a maximum measurement over some extended // period of time. - if !lhsQPSOK { - log.VEventf(ctx, 2, "skipping merge: LHS QPS measurement not yet reliable") - return false, nil - } - if !rhsQPSOK { - log.VEventf(ctx, 2, "skipping merge: RHS QPS measurement not yet reliable") - return false, nil + if lhsRepl.QPSSplittingEnabled(ctx) { + if !lhsSplitStats.QPS.ok { + log.VEventf(ctx, 2, "skipping merge: LHS QPS measurement not yet reliable") + return false, nil + } + if !rhsSplitStats.QPS.ok { + log.VEventf(ctx, 2, "skipping merge: RHS QPS measurement not yet reliable") + return false, nil + } + } else { + if !lhsSplitStats.CPU.ok { + log.VEventf(ctx, 2, "skipping merge: LHS CPU measurement not yet reliable") + return false, nil + } + if !rhsSplitStats.CPU.ok { + log.VEventf(ctx, 2, "skipping merge: RHS CPU measurement not yet reliable") + return false, nil + } } - mergedQPS = lhsQPS + rhsQPS } // Check if the merged range would need to be split, if so, skip merge. // Use a lower threshold for load based splitting so we don't find ourselves // in a situation where we keep merging ranges that would be split soon after // by a small increase in load. - conservativeLoadBasedSplitThreshold := 0.5 * lhsRepl.SplitByLoadQPSThreshold() + conservativeLoadBasedSplitThreshold := 0.5 * lhsRepl.SplitByLoadThreshold(ctx) + var exceedsLoadSplitThreshold bool + if lhsRepl.QPSSplittingEnabled(ctx) { + exceedsLoadSplitThreshold = mergedLoadSplitStats.QPS.max >= conservativeLoadBasedSplitThreshold + } else { + exceedsLoadSplitThreshold = mergedLoadSplitStats.CPU.max >= conservativeLoadBasedSplitThreshold + } + shouldSplit, _ := shouldSplitRange(ctx, mergedDesc, mergedStats, lhsRepl.GetMaxBytes(), lhsRepl.shouldBackpressureWrites(), confReader) - if shouldSplit || mergedQPS >= conservativeLoadBasedSplitThreshold { + if shouldSplit || exceedsLoadSplitThreshold { log.VEventf(ctx, 2, "skipping merge to avoid thrashing: merged range %s may split "+ - "(estimated size, estimated QPS: %d, %v)", - mergedDesc, mergedStats.Total(), mergedQPS) + "(estimated size, estimated load: %d, %s)", + mergedDesc, mergedStats.Total(), mergedLoadSplitStats) return false, nil } @@ -360,7 +393,7 @@ func (mq *mergeQueue) process( } // Refresh RHS descriptor. - rhsDesc, _, _, _, err = mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey()) + rhsDesc, _, _, err = mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey()) if err != nil { return false, err } @@ -377,16 +410,24 @@ func (mq *mergeQueue) process( } } + var conservativeThresholdString string + if lhsRepl.QPSSplittingEnabled(ctx) { + conservativeThresholdString = fmt.Sprintf("qps=%.2f", conservativeLoadBasedSplitThreshold) + } else { + conservativeThresholdString = fmt.Sprintf("cpu=%s", string(humanizeutil.Duration(time.Duration(conservativeLoadBasedSplitThreshold)))) + } + log.VEventf(ctx, 2, "merging to produce range: %s-%s", mergedDesc.StartKey, mergedDesc.EndKey) - reason := fmt.Sprintf("lhs+rhs has (size=%s+%s=%s qps=%.2f+%.2f=%.2fqps) below threshold (size=%s, qps=%.2f)", + // TODO(kvoli): handle this print + reason := fmt.Sprintf("lhs+rhs has (size=%s+%s=%s %s+%s=%s) below threshold (size=%s, %s)", humanizeutil.IBytes(lhsStats.Total()), humanizeutil.IBytes(rhsStats.Total()), humanizeutil.IBytes(mergedStats.Total()), - lhsQPS, - rhsQPS, - mergedQPS, + lhsSplitStats, + rhsSplitStats, + mergedLoadSplitStats, humanizeutil.IBytes(minBytes), - conservativeLoadBasedSplitThreshold, + conservativeThresholdString, ) _, pErr := lhsRepl.AdminMerge(ctx, roachpb.AdminMergeRequest{ RequestHeader: roachpb.RequestHeader{Key: lhsRepl.Desc().StartKey.AsRawKey()}, @@ -417,8 +458,9 @@ func (mq *mergeQueue) process( // Adjust the splitter to account for the additional load from the RHS. We // could just Reset the splitter, but then we'd need to wait out a full // measurement period (default of 5m) before merging this range again. - if mergedQPS != 0 { - lhsRepl.loadBasedSplitter.RecordMax(mq.store.Clock().PhysicalTime(), mergedQPS) + // TODO(kvoli): handle this record max + if mergedLoadSplitStats.QPS.max != 0 { + lhsRepl.loadBasedSplitter.RecordMax(mq.store.Clock().PhysicalTime(), mergedLoadSplitStats.QPS.max) } return true, nil } diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index afcba47435a8..5356be4518ee 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -1118,9 +1118,28 @@ func (r *Replica) SetMVCCStatsForTesting(stats *enginepb.MVCCStats) { // NOTE: This should only be used for load based splitting, only // works when the load based splitting cluster setting is enabled. // -// Use QueriesPerSecond() for current QPS stats for all other purposes. +// Use LoadStats.QueriesPerSecond for all other purposes. func (r *Replica) GetMaxSplitQPS(ctx context.Context) (float64, bool) { - return r.loadBasedSplitter.MaxQPS(ctx, r.Clock().PhysicalTime()) + if !r.QPSSplittingEnabled(ctx) { + return 0, false + } + return r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime()) +} + +// GetMaxSplitCPU returns the Replica's maximum CPU/s rate over a configured +// measurement period. If the Replica has not been recording CPU for at least +// an entire measurement period, the method will return false. +// +// NOTE: This should only be used for load based splitting, only +// works when the load based splitting cluster setting is enabled. +// +// Use LoadStats.RaftCPUNanosPerSecond and RequestCPUNanosPerSecond for current +// CPU stats for all other purposes. +func (r *Replica) GetMaxSplitCPU(ctx context.Context) (float64, bool) { + if r.QPSSplittingEnabled(ctx) { + return 0, false + } + return r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime()) } // ContainsKey returns whether this range contains the specified key. diff --git a/pkg/kv/kvserver/replica_eval_context_span.go b/pkg/kv/kvserver/replica_eval_context_span.go index d8ab0d7c67ee..ae0100830575 100644 --- a/pkg/kv/kvserver/replica_eval_context_span.go +++ b/pkg/kv/kvserver/replica_eval_context_span.go @@ -135,6 +135,12 @@ func (rec SpanSetReplicaEvalContext) GetMaxSplitQPS(ctx context.Context) (float6 return rec.i.GetMaxSplitQPS(ctx) } +// GetMaxSplitCPU returns the Replica's maximum CPU/s rate for splitting and +// merging purposes. +func (rec SpanSetReplicaEvalContext) GetMaxSplitCPU(ctx context.Context) (float64, bool) { + return rec.i.GetMaxSplitCPU(ctx) +} + // CanCreateTxnRecord determines whether a transaction record can be created // for the provided transaction information. See Replica.CanCreateTxnRecord // for details about its arguments, return values, and preconditions. diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 41f2fd877c47..69ead29ae9bf 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -95,11 +95,23 @@ func newUnloadedReplica( r.mu.stateLoader = stateloader.Make(desc.RangeID) r.mu.quiescent = true r.mu.conf = store.cfg.DefaultSpanConfig - split.Init(&r.loadBasedSplitter, store.cfg.Settings, split.GlobalRandSource(), func() float64 { - return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV)) - }, func() time.Duration { - return kvserverbase.SplitByLoadMergeDelay.Get(&store.cfg.Settings.SV) - }, store.metrics.LoadSplitterMetrics) + split.Init( + &r.loadBasedSplitter, + store.cfg.Settings, + split.GlobalRandSource(), + SplitByLoadThresholdFn(ctx, store.cfg.Settings), + func() time.Duration { + return kvserverbase.SplitByLoadMergeDelay.Get(&store.cfg.Settings.SV) + }, + store.metrics.LoadSplitterMetrics, + NewFinderFn(ctx, store.cfg.Settings), + ) + + LoadBasedRebalancingDimension.SetOnChange(&store.cfg.Settings.SV, func(ctx context.Context) { + r.setOnDimensionChange(ctx) + }) + + r.mu.proposals = map[kvserverbase.CmdIDKey]*ProposalData{} r.mu.checksums = map[uuid.UUID]*replicaChecksum{} r.mu.proposalBuf.Init((*replicaProposer)(r), tracker.NewLockfreeTracker(), r.Clock(), r.ClusterSettings()) diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index e0017ea86cbf..676b7a8b4f8a 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -134,8 +134,8 @@ func (r *Replica) SendWithWriteBytes( // Record the CPU time processing the request for this replica. This is // recorded regardless of errors that are encountered. - defer r.MeasureReqCPUNanos(grunning.Time()) - + startCPU := grunning.Time() + defer r.MeasureReqCPUNanos(startCPU) // Record summary throughput information about the batch request for // accounting. r.recordBatchRequestLoad(ctx, ba) @@ -201,12 +201,14 @@ func (r *Replica) SendWithWriteBytes( } } - // Return range information if it was requested. Note that we don't return it - // on errors because the code doesn't currently support returning both a br - // and a pErr here. Also, some errors (e.g. NotLeaseholderError) have custom - // ways of returning range info. if pErr == nil { + // Return range information if it was requested. Note that we don't return it + // on errors because the code doesn't currently support returning both a br + // and a pErr here. Also, some errors (e.g. NotLeaseholderError) have custom + // ways of returning range info. r.maybeAddRangeInfoToResponse(ctx, ba, br) + // Handle load-based splitting, if necessary. + r.recordBatchForLoadBasedSplitting(ctx, ba, br, int(grunning.Difference(startCPU, grunning.Time()))) } r.recordRequestWriteBytes(writeBytes) @@ -405,17 +407,6 @@ func (r *Replica) executeBatchWithConcurrencyRetries( var requestEvalKind concurrency.RequestEvalKind var g *concurrency.Guard defer func() { - // Handle load-based splitting, if necessary. - if pErr == nil && br != nil { - if len(ba.Requests) != len(br.Responses) { - log.KvDistribution.Errorf(ctx, - "Requests and responses should be equal lengths: # of requests = %d, # of responses = %d", - len(ba.Requests), len(br.Responses)) - } else { - r.recordBatchForLoadBasedSplitting(ctx, ba, br) - } - } - // NB: wrapped to delay g evaluation to its value when returning. if g != nil { r.concMgr.FinishReq(g) @@ -1005,8 +996,9 @@ func (r *Replica) executeAdminBatch( return br, nil } -// recordBatchRequestLoad records the load information about a batch request issued -// against this replica. +// recordBatchRequestLoad records the load information about a batch request +// completed against this replica. This also handles load based splitting based +// on the request load. func (r *Replica) recordBatchRequestLoad(ctx context.Context, ba *roachpb.BatchRequest) { if r.loadStats == nil { log.VEventf( diff --git a/pkg/kv/kvserver/replica_split_load.go b/pkg/kv/kvserver/replica_split_load.go index 3cc43ae28e39..1e4f67ccb668 100644 --- a/pkg/kv/kvserver/replica_split_load.go +++ b/pkg/kv/kvserver/replica_split_load.go @@ -12,12 +12,26 @@ package kvserver import ( "context" + "fmt" + "strings" + "time" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) +// transitions (cluster setting change) +// add rpc field MaxCPU +// maxCPU / maxQPS alwasy return not ok when being used +// decider reset +// set threshold function +// set finder creation fn + // SplitByLoadEnabled wraps "kv.range_split.by_load_enabled". var SplitByLoadEnabled = settings.RegisterBoolSetting( settings.TenantWritable, @@ -26,6 +40,16 @@ var SplitByLoadEnabled = settings.RegisterBoolSetting( true, ).WithPublic() +var EnableUnweightedLBSplitFinder = settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.unweighted_lb_split_finder.enabled", + "if enabled, use the un-weighted finder for load-based splitting; "+ + "the unweighted finder will attempt to find a key during when splitting "+ + "a range based on load that evenly divides the QPS among the resulting "+ + "left and right hand side ranges", + false, +) + // SplitByLoadQPSThreshold wraps "kv.range_split.load_qps_threshold". var SplitByLoadQPSThreshold = settings.RegisterIntSetting( settings.TenantWritable, @@ -34,11 +58,51 @@ var SplitByLoadQPSThreshold = settings.RegisterIntSetting( 2500, // 2500 req/s ).WithPublic() -// SplitByLoadQPSThreshold returns the QPS request rate for a given replica. -func (r *Replica) SplitByLoadQPSThreshold() float64 { - return float64(SplitByLoadQPSThreshold.Get(&r.store.cfg.Settings.SV)) +// SplitByLoadCPUThreshold wraps "kv.range_split.load_qps_threshold". +var SplitByLoadCPUThreshold = settings.RegisterDurationSetting( + settings.TenantWritable, + "kv.range_split.load_cpu_threshold", + "the CPU use per second over which, the range becomes a candidate for load based splitting", + 250*time.Millisecond, +).WithPublic() + +func (r *Replica) SplitByLoadThreshold(ctx context.Context) float64 { + return SplitByLoadThresholdFn(ctx, r.store.cfg.Settings)() +} + +func SplitByLoadThresholdFn(ctx context.Context, st *cluster.Settings) func() float64 { + return func() float64 { + if QPSSplittingEnabled(ctx, st) { + return float64(SplitByLoadQPSThreshold.Get(&st.SV)) + } + return float64(SplitByLoadCPUThreshold.Get(&st.SV)) + } +} + +func NewFinderFn( + ctx context.Context, st *cluster.Settings, +) func(time.Time, split.RandSource) split.LoadBasedSplitter { + return func(startTime time.Time, randSource split.RandSource) split.LoadBasedSplitter { + if QPSSplittingEnabled(ctx, st) { + return split.NewUnweightedFinder(startTime, randSource) + } else { + return split.NewWeightedFinder(startTime, randSource) + } + } } +// split queue +// - store rebalancer dimension +// - threshold for that dimension +// merge queue +// - store rebalancer dimension +// - threshold for that dimension +// - which value to look at when trying to merge +// finder +// - split threshold +// - new finder fn +// - when to reset for new dimension +// // SplitByLoadEnabled returns whether load based splitting is enabled. // Although this is a method of *Replica, the configuration is really global, // shared across all stores. @@ -47,6 +111,19 @@ func (r *Replica) SplitByLoadEnabled() bool { !r.store.TestingKnobs().DisableLoadBasedSplitting } +func QPSSplittingEnabled(ctx context.Context, sv *cluster.Settings) bool { + rebalanceObjective := LoadBasedRebalancingObjective(ctx, sv) + return rebalanceObjective == LBRebalancingQueries || EnableUnweightedLBSplitFinder.Get(&sv.SV) +} + +func (r *Replica) QPSSplittingEnabled(ctx context.Context) bool { + return QPSSplittingEnabled(ctx, r.store.cfg.Settings) +} + +func (r *Replica) setOnDimensionChange(ctx context.Context) { + r.loadBasedSplitter.Reset(r.Clock().PhysicalTime()) +} + // getResponseBoundarySpan computes the union span of the true spans that were // iterated over using the request span and the response's resumeSpan. // @@ -120,15 +197,81 @@ func getResponseBoundarySpan( return } +type loadSplitStat struct { + max float64 + ok bool +} + +func (ls loadSplitStat) merge(other loadSplitStat) loadSplitStat { + return loadSplitStat{ + ok: ls.ok && other.ok, + max: ls.max + other.max, + } +} + +type loadSplitStats struct { + CPU loadSplitStat + QPS loadSplitStat +} + +func (lss loadSplitStats) merge(other loadSplitStats) loadSplitStats { + return loadSplitStats{ + CPU: lss.CPU.merge(other.CPU), + QPS: lss.QPS.merge(other.QPS), + } +} + +func (lss loadSplitStats) String() string { + var buf strings.Builder + + if lss.CPU.ok { + fmt.Fprintf(&buf, "cpu-per-second=%s", string(humanizeutil.Duration(time.Duration(int64(lss.CPU.max))))) + } + if lss.QPS.ok { + fmt.Fprintf(&buf, "queries-per-second=%.2f", lss.QPS.max) + } + return buf.String() +} + +func (r *Replica) GetLoadSplitStats(ctx context.Context) loadSplitStats { + lss := loadSplitStats{} + max, ok := r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime()) + if r.QPSSplittingEnabled(ctx) { + lss.QPS = loadSplitStat{max: max, ok: ok} + } else { + lss.CPU = loadSplitStat{max: max, ok: ok} + } + return lss +} + // recordBatchForLoadBasedSplitting records the batch's spans to be considered // for load based splitting. func (r *Replica) recordBatchForLoadBasedSplitting( - ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse, + ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse, stat int, ) { if !r.SplitByLoadEnabled() { return } - shouldInitSplit := r.loadBasedSplitter.Record(ctx, timeutil.Now(), len(ba.Requests), func() roachpb.Span { + + // When there is nothing to do when either the batch request or batch + // response are nil. + if ba == nil || br == nil { + return + } + + if len(ba.Requests) != len(br.Responses) { + log.KvDistribution.Errorf(ctx, + "Requests and responses should be equal lengths: # of requests = %d, # of responses = %d", + len(ba.Requests), len(br.Responses)) + } + + // When QPS splitting is enabled, use the number of requests rather than + // the given stat for recording load. + if r.QPSSplittingEnabled(ctx) { + stat = len(ba.Requests) + } + + shouldInitSplit := r.loadBasedSplitter.Record(ctx, timeutil.Now(), stat, func() roachpb.Span { return getResponseBoundarySpan(ba, br) }) if shouldInitSplit { diff --git a/pkg/kv/kvserver/split/BUILD.bazel b/pkg/kv/kvserver/split/BUILD.bazel index 031270e152c7..3ad9cf6333d5 100644 --- a/pkg/kv/kvserver/split/BUILD.bazel +++ b/pkg/kv/kvserver/split/BUILD.bazel @@ -13,7 +13,6 @@ go_library( deps = [ "//pkg/keys", "//pkg/roachpb", - "//pkg/settings", "//pkg/settings/cluster", "//pkg/util/log", "//pkg/util/metric", diff --git a/pkg/kv/kvserver/split/decider.go b/pkg/kv/kvserver/split/decider.go index e34937bb4641..10884d655998 100644 --- a/pkg/kv/kvserver/split/decider.go +++ b/pkg/kv/kvserver/split/decider.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" @@ -29,7 +28,7 @@ import ( const minSplitSuggestionInterval = time.Minute const minNoSplitKeyLoggingMetricsInterval = time.Minute -const minQueriesPerSecondSampleDuration = time.Second +const minPerSecondSampleDuration = time.Second type LoadBasedSplitter interface { // Record informs the LoadBasedSplitter about where the span lies with regard @@ -86,16 +85,6 @@ func GlobalRandSource() RandSource { return globalRandSource{} } -var enableUnweightedLBSplitFinder = settings.RegisterBoolSetting( - settings.SystemOnly, - "kv.unweighted_lb_split_finder.enabled", - "if enabled, use the un-weighted finder for load-based splitting; "+ - "the unweighted finder will attempt to find a key during when splitting "+ - "a range based on load that evenly divides the QPS among the resulting "+ - "left and right hand side ranges", - true, -) - // A Decider collects measurements about the activity (measured in qps) on a // Replica and, assuming that qps thresholds are exceeded, tries to determine a // split key that would approximately result in halving the load on each of the @@ -134,20 +123,21 @@ type LoadSplitterMetrics struct { type Decider struct { st *cluster.Settings // supplied to Init randSource RandSource // supplied to Init - qpsThreshold func() float64 // supplied to Init - qpsRetention func() time.Duration // supplied to Init + statThreshold func() float64 // supplied to Init + statRetention func() time.Duration // supplied to Init + newFinderFn func(time.Time, RandSource) LoadBasedSplitter loadSplitterMetrics *LoadSplitterMetrics // supplied to Init mu struct { syncutil.Mutex // Fields tracking the current qps sample. - lastQPSRollover time.Time // most recent time recorded by requests. - lastQPS float64 // last reqs/s rate as of lastQPSRollover - count int64 // number of requests recorded since last rollover + lastStatRollover time.Time // most recent time recorded by requests. + lastStatVal float64 // last reqs/s rate as of lastStatRollover + count int64 // number of requests recorded since last rollover // Fields tracking historical qps samples. - maxQPS maxQPSTracker + maxStat maxStatTracker // Fields tracking split key suggestions. splitFinder LoadBasedSplitter // populated when engaged or decided @@ -166,15 +156,17 @@ func Init( lbs *Decider, st *cluster.Settings, randSource RandSource, - qpsThreshold func() float64, - qpsRetention func() time.Duration, + statThreshold func() float64, + statRetention func() time.Duration, loadSplitterMetrics *LoadSplitterMetrics, + newFinderFn func(time.Time, RandSource) LoadBasedSplitter, ) { lbs.st = st lbs.randSource = randSource - lbs.qpsThreshold = qpsThreshold - lbs.qpsRetention = qpsRetention + lbs.statThreshold = statThreshold + lbs.statRetention = statRetention lbs.loadSplitterMetrics = loadSplitterMetrics + lbs.newFinderFn = newFinderFn } // Record notifies the Decider that 'n' operations are being carried out which @@ -198,32 +190,28 @@ func (d *Decider) recordLocked( d.mu.count += int64(n) // First compute requests per second since the last check. - if d.mu.lastQPSRollover.IsZero() { - d.mu.lastQPSRollover = now + if d.mu.lastStatRollover.IsZero() { + d.mu.lastStatRollover = now } - elapsedSinceLastQPS := now.Sub(d.mu.lastQPSRollover) - if elapsedSinceLastQPS >= minQueriesPerSecondSampleDuration { - // Update the latest QPS and reset the time and request counter. - d.mu.lastQPS = (float64(d.mu.count) / float64(elapsedSinceLastQPS)) * 1e9 - d.mu.lastQPSRollover = now + elapsedSinceLastSample := now.Sub(d.mu.lastStatRollover) + if elapsedSinceLastSample >= minPerSecondSampleDuration { + // Update the latest stat value and reset the time and request counter. + d.mu.lastStatVal = (float64(d.mu.count) / float64(elapsedSinceLastSample)) * 1e9 + d.mu.lastStatRollover = now d.mu.count = 0 - // Record the latest QPS sample in the historical tracker. - d.mu.maxQPS.record(now, d.qpsRetention(), d.mu.lastQPS) + // Record the latest stat sample in the historical tracker. + d.mu.maxStat.record(now, d.statRetention(), d.mu.lastStatVal) - // If the QPS for the range exceeds the threshold, start actively + // If the stat for the range exceeds the threshold, start actively // tracking potential for splitting this range based on load. // This tracking will begin by initiating a splitFinder so it can // begin to Record requests so it can find a split point. If a // splitFinder already exists, we check if a split point is ready // to be used. - if d.mu.lastQPS >= d.qpsThreshold() { + if d.mu.lastStatVal >= d.statThreshold() { if d.mu.splitFinder == nil { - if d.st == nil || enableUnweightedLBSplitFinder.Get(&d.st.SV) { - d.mu.splitFinder = NewUnweightedFinder(now, d.randSource) - } else { - d.mu.splitFinder = NewWeightedFinder(now, d.randSource) - } + d.mu.splitFinder = d.newFinderFn(now, d.randSource) } } else { d.mu.splitFinder = nil @@ -261,34 +249,34 @@ func (d *Decider) recordLocked( return false } -// RecordMax adds a QPS measurement directly into the Decider's historical QPS -// tracker. The QPS sample is considered to have been captured at the provided -// time. +// RecordMax adds a stat measurement directly into the Decider's historical +// stat value tracker. The stat sample is considered to have been captured at +// the provided time. func (d *Decider) RecordMax(now time.Time, qps float64) { d.mu.Lock() defer d.mu.Unlock() - d.mu.maxQPS.record(now, d.qpsRetention(), qps) + d.mu.maxStat.record(now, d.statRetention(), qps) } -// LastQPS returns the most recent QPS measurement. -func (d *Decider) LastQPS(ctx context.Context, now time.Time) float64 { +// LastStat returns the most recent stat measurement. +func (d *Decider) LastStat(ctx context.Context, now time.Time) float64 { d.mu.Lock() defer d.mu.Unlock() - d.recordLocked(ctx, now, 0, nil) // force QPS computation - return d.mu.lastQPS + d.recordLocked(ctx, now, 0, nil) // force stat computation + return d.mu.lastStatVal } -// MaxQPS returns the maximum QPS measurement recorded over the retention +// MaxStat returns the maximum stat measurement recorded over the retention // period. If the Decider has not been recording for a full retention period, // the method returns false. -func (d *Decider) MaxQPS(ctx context.Context, now time.Time) (float64, bool) { +func (d *Decider) MaxStat(ctx context.Context, now time.Time) (float64, bool) { d.mu.Lock() defer d.mu.Unlock() - d.recordLocked(ctx, now, 0, nil) // force QPS computation - return d.mu.maxQPS.maxQPS(now, d.qpsRetention()) + d.recordLocked(ctx, now, 0, nil) // force stat computation + return d.mu.maxStat.max(now, d.statRetention()) } // MaybeSplitKey returns a key to perform a split at. The return value will be @@ -345,21 +333,21 @@ func (d *Decider) MaybeSplitKey(ctx context.Context, now time.Time) roachpb.Key } // Reset deactivates any current attempt at determining a split key. The method -// also discards any historical QPS tracking information. +// also discards any historical stat tracking information. func (d *Decider) Reset(now time.Time) { d.mu.Lock() defer d.mu.Unlock() - d.mu.lastQPSRollover = time.Time{} - d.mu.lastQPS = 0 + d.mu.lastStatRollover = time.Time{} + d.mu.lastStatVal = 0 d.mu.count = 0 - d.mu.maxQPS.reset(now, d.qpsRetention()) + d.mu.maxStat.reset(now, d.statRetention()) d.mu.splitFinder = nil d.mu.lastSplitSuggestion = time.Time{} d.mu.lastNoSplitKeyLoggingMetrics = time.Time{} } -// maxQPSTracker collects a series of queries-per-second measurement samples and +// maxStatTracker collects a series of stat per-second measurement samples and // tracks the maximum observed over a period of time. // // The tracker internally uses a set of time windows in order to age out old @@ -367,13 +355,13 @@ func (d *Decider) Reset(now time.Time) { // a circular buffer of the last N windows of stats. We rotate through the // circular buffer every so often as determined by `minRetention`. // -// The tracker can be queried through its `maxQPS` method, which returns the +// The tracker can be queried through its `max` method, which returns the // maximum of all queries-per-second samples recorded over the retention period. // If the tracker has not been recording for a full retention period, then the // method returns false. // -// The zero-value of a maxQPSTracker can be used immediately. -type maxQPSTracker struct { +// The zero-value of a maxStatTracker can be used immediately. +type maxStatTracker struct { windows [6]float64 curIdx int curStart time.Time @@ -382,15 +370,15 @@ type maxQPSTracker struct { } // record adds the qps sample to the tracker. -func (t *maxQPSTracker) record(now time.Time, minRetention time.Duration, qps float64) { +func (t *maxStatTracker) record(now time.Time, minRetention time.Duration, qps float64) { t.maybeReset(now, minRetention) t.maybeRotate(now) t.windows[t.curIdx] = max(t.windows[t.curIdx], qps) } -// reset clears the tracker. maxQPS will begin returning false until a full +// reset clears the tracker. maxStatTracker will begin returning false until a full // minRetention period has elapsed. -func (t *maxQPSTracker) reset(now time.Time, minRetention time.Duration) { +func (t *maxStatTracker) reset(now time.Time, minRetention time.Duration) { if minRetention <= 0 { panic("minRetention must be positive") } @@ -401,18 +389,18 @@ func (t *maxQPSTracker) reset(now time.Time, minRetention time.Duration) { t.minRetention = minRetention } -func (t *maxQPSTracker) maybeReset(now time.Time, minRetention time.Duration) { +func (t *maxStatTracker) maybeReset(now time.Time, minRetention time.Duration) { // If the retention period changes, simply reset the entire tracker. Merging // or splitting windows would be a difficult task and could lead to samples // either not being retained for long-enough, or being retained for too long. - // Resetting indicates to maxQPS that a new retention period needs to be + // Resetting indicates to max that a new retention period needs to be // measured before accurate results can be returned. if minRetention != t.minRetention { t.reset(now, minRetention) } } -func (t *maxQPSTracker) maybeRotate(now time.Time) { +func (t *maxStatTracker) maybeRotate(now time.Time) { sinceLastRotate := now.Sub(t.curStart) windowWidth := t.windowWidth() if sinceLastRotate < windowWidth { @@ -435,10 +423,10 @@ func (t *maxQPSTracker) maybeRotate(now time.Time) { } } -// maxQPS returns the maximum queries-per-second samples recorded over the last +// max returns the maximum queries-per-second samples recorded over the last // retention period. If the tracker has not been recording for a full retention // period, then the method returns false. -func (t *maxQPSTracker) maxQPS(now time.Time, minRetention time.Duration) (float64, bool) { +func (t *maxStatTracker) max(now time.Time, minRetention time.Duration) (float64, bool) { t.record(now, minRetention, 0) // expire samples, if necessary if now.Sub(t.lastReset) < t.minRetention { @@ -453,7 +441,7 @@ func (t *maxQPSTracker) maxQPS(now time.Time, minRetention time.Duration) (float return qps, true } -func (t *maxQPSTracker) windowWidth() time.Duration { +func (t *maxStatTracker) windowWidth() time.Duration { // NB: -1 because during a rotation, only len(t.windows)-1 windows survive. return t.minRetention / time.Duration(len(t.windows)-1) } diff --git a/pkg/kv/kvserver/split_queue.go b/pkg/kv/kvserver/split_queue.go index 56adf14df73b..2810f497a119 100644 --- a/pkg/kv/kvserver/split_queue.go +++ b/pkg/kv/kvserver/split_queue.go @@ -266,12 +266,18 @@ func (sq *splitQueue) processAttempt( loadStats := r.loadStats.Stats() batchHandledQPS := loadStats.QueriesPerSecond raftAppliedQPS := loadStats.WriteKeysPerSecond - splitQPS := r.loadBasedSplitter.LastQPS(ctx, now) + lastSplitStat := r.loadBasedSplitter.LastStat(ctx, now) + var loadSplitString string + if r.QPSSplittingEnabled(ctx) { + loadSplitString = fmt.Sprintf("%.2f reqs/sec", lastSplitStat) + } else { + loadSplitString = fmt.Sprintf("%s cpu/sec", string(humanizeutil.Duration(time.Duration(lastSplitStat)))) + } reason := fmt.Sprintf( - "load at key %s (%.2f splitQPS, %.2f batches/sec, %.2f raft mutations/sec)", + "load at key %s (%s, %.2f batches/sec, %.2f raft mutations/sec)", splitByLoadKey, - splitQPS, + loadSplitString, batchHandledQPS, raftAppliedQPS, ) diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index d2623562c32e..1b7725013f4b 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -2091,6 +2091,14 @@ message RangeStatsResponse { // base important decisions off of. double max_queries_per_second = 5; + // MaxCPUPerSecond is the maximum rate of cpu/s that the range has used at + // the leaseholder over a configured measurement period. Set to -1 if the + // replica serving the RangeStats request has not been the leaseholder long + // enough to have recorded CPU rates for at least a full measurement period. + // In such cases, the reciipient should not consider the CPU value reliable + // enough to base important decisions off of. + double max_cpu_per_second = 7 [(gogoproto.customname) = "MaxCPUPerSecond"]; + // range_info contains descriptor and lease information. RangeInfo range_info = 4 [(gogoproto.nullable) = false];