diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index 1f05e57e5828..175eed01c966 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -43,6 +43,7 @@ kv.closed_timestamp.follower_reads_enabled boolean true allow (all) replicas to
kv.log_range_and_node_events.enabled boolean true set to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog
kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconciling jobs with protected timestamp records
kv.range_split.by_load_enabled boolean true allow automatic splits of ranges based on where load is concentrated
+kv.range_split.load_cpu_threshold duration 250ms the CPU use per second over which, the range becomes a candidate for load based splitting
kv.range_split.load_qps_threshold integer 2500 the QPS over which, the range becomes a candidate for load based splitting
kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled
kv.rangefeed.range_stuck_threshold duration 1m0s restart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.closed_timestamp.side_transport_interval takes precedence)
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 4dee1dc4ece0..2b0228398f30 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -57,6 +57,7 @@
kv.log_range_and_node_events.enabled
| boolean | true | set to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog |
kv.protectedts.reconciliation.interval
| duration | 5m0s | the frequency for reconciling jobs with protected timestamp records |
kv.range_split.by_load_enabled
| boolean | true | allow automatic splits of ranges based on where load is concentrated |
+kv.range_split.load_cpu_threshold
| duration | 250ms | the CPU use per second over which, the range becomes a candidate for load based splitting |
kv.range_split.load_qps_threshold
| integer | 2500 | the QPS over which, the range becomes a candidate for load based splitting |
kv.rangefeed.enabled
| boolean | false | if set, rangefeed registration is enabled |
kv.rangefeed.range_stuck_threshold
| duration | 1m0s | restart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.closed_timestamp.side_transport_interval takes precedence) |
diff --git a/pkg/kv/kvserver/asim/config/settings.go b/pkg/kv/kvserver/asim/config/settings.go
index 1206c4ed6d2a..69cac7b7edc1 100644
--- a/pkg/kv/kvserver/asim/config/settings.go
+++ b/pkg/kv/kvserver/asim/config/settings.go
@@ -24,7 +24,7 @@ const (
defaultStateExchangeInterval = 10 * time.Second
defaultStateExchangeDelay = 500 * time.Millisecond
defaultSplitQPSThreshold = 2500
- defaultSplitQPSRetention = 10 * time.Minute
+ defaultSplitStatRetention = 10 * time.Minute
defaultSeed = 42
defaultLBRebalancingMode = 2 // Leases and replicas.
defaultLBRebalancingInterval = time.Minute
@@ -89,9 +89,9 @@ type SimulationSettings struct {
// SplitQPSThreshold is the threshold above which a range will be a
// candidate for load based splitting.
SplitQPSThreshold float64
- // SplitQPSRetention is the duration which recorded load will be retained
+ // SplitStatRetention is the duration which recorded load will be retained
// and factored into load based splitting decisions.
- SplitQPSRetention time.Duration
+ SplitStatRetention time.Duration
// LBRebalancingMode controls if and when we do store-level rebalancing
// based on load. It maps to kvserver.LBRebalancingMode.
LBRebalancingMode int64
@@ -125,7 +125,7 @@ func DefaultSimulationSettings() *SimulationSettings {
StateExchangeInterval: defaultStateExchangeInterval,
StateExchangeDelay: defaultStateExchangeDelay,
SplitQPSThreshold: defaultSplitQPSThreshold,
- SplitQPSRetention: defaultSplitQPSRetention,
+ SplitStatRetention: defaultSplitStatRetention,
LBRebalancingMode: defaultLBRebalancingMode,
LBRebalancingObjective: defaultLBRebalancingObjective,
LBRebalancingInterval: defaultLBRebalancingInterval,
@@ -167,6 +167,6 @@ func (s *SimulationSettings) SplitQPSThresholdFn() func() float64 {
// split decisions.
func (s *SimulationSettings) SplitQPSRetentionFn() func() time.Duration {
return func() time.Duration {
- return s.SplitQPSRetention
+ return s.SplitStatRetention
}
}
diff --git a/pkg/kv/kvserver/asim/state/impl.go b/pkg/kv/kvserver/asim/state/impl.go
index 5304d2314600..c172c2b42136 100644
--- a/pkg/kv/kvserver/asim/state/impl.go
+++ b/pkg/kv/kvserver/asim/state/impl.go
@@ -333,10 +333,7 @@ func (s *state) AddStore(nodeID NodeID) (Store, bool) {
s.stores[storeID] = store
// Add a range load splitter for this store.
- s.loadsplits[storeID] = NewSplitDecider(s.settings.Seed,
- s.settings.SplitQPSThresholdFn(),
- s.settings.SplitQPSRetentionFn(),
- )
+ s.loadsplits[storeID] = NewSplitDecider(s.settings)
return store, true
}
diff --git a/pkg/kv/kvserver/asim/state/split_decider.go b/pkg/kv/kvserver/asim/state/split_decider.go
index 3c62003b3b1f..822ea36c1b5d 100644
--- a/pkg/kv/kvserver/asim/state/split_decider.go
+++ b/pkg/kv/kvserver/asim/state/split_decider.go
@@ -15,6 +15,7 @@ import (
"math/rand"
"time"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -38,32 +39,50 @@ type LoadSplitter interface {
ResetRange(rangeID RangeID)
}
+type loadSplitConfig struct {
+ randSource split.RandSource
+ settings *config.SimulationSettings
+}
+
+// NewLoadBasedSplitter returns a new LoadBasedSplitter that may be used to
+// find the midpoint based on recorded load.
+func (lsc loadSplitConfig) NewLoadBasedSplitter(startTime time.Time) split.LoadBasedSplitter {
+ return split.NewUnweightedFinder(startTime, lsc.randSource)
+}
+
+// StatRetention returns the duration that recorded load is to be retained.
+func (lsc loadSplitConfig) StatRetention() time.Duration {
+ return lsc.settings.SplitStatRetention
+}
+
+// StatThreshold returns the threshold for load above which the range
+// should be considered split.
+func (lsc loadSplitConfig) StatThreshold() float64 {
+ return lsc.settings.SplitQPSThreshold
+}
+
// SplitDecider implements the LoadSplitter interface.
type SplitDecider struct {
- deciders map[RangeID]*split.Decider
- suggestions []RangeID
- qpsThreshold func() float64
- qpsRetention func() time.Duration
- seed int64
+ deciders map[RangeID]*split.Decider
+ suggestions []RangeID
+ splitConfig split.LoadSplitConfig
}
// NewSplitDecider returns a new SplitDecider.
-func NewSplitDecider(
- seed int64, qpsThresholdFn func() float64, qpsRetentionFn func() time.Duration,
-) *SplitDecider {
+func NewSplitDecider(settings *config.SimulationSettings) *SplitDecider {
return &SplitDecider{
- deciders: make(map[RangeID]*split.Decider),
- suggestions: []RangeID{},
- seed: seed,
- qpsThreshold: qpsThresholdFn,
- qpsRetention: qpsRetentionFn,
+ deciders: make(map[RangeID]*split.Decider),
+ suggestions: []RangeID{},
+ splitConfig: loadSplitConfig{
+ randSource: rand.New(rand.NewSource(settings.Seed)),
+ settings: settings,
+ },
}
}
func (s *SplitDecider) newDecider() *split.Decider {
- rand := rand.New(rand.NewSource(s.seed))
decider := &split.Decider{}
- split.Init(decider, nil, rand, s.qpsThreshold, s.qpsRetention, &split.LoadSplitterMetrics{
+ split.Init(decider, s.splitConfig, &split.LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
diff --git a/pkg/kv/kvserver/asim/state/split_decider_test.go b/pkg/kv/kvserver/asim/state/split_decider_test.go
index 8bcc7c9d3b2a..bcec5a9e8e6d 100644
--- a/pkg/kv/kvserver/asim/state/split_decider_test.go
+++ b/pkg/kv/kvserver/asim/state/split_decider_test.go
@@ -14,6 +14,7 @@ import (
"testing"
"time"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload"
"github.com/stretchr/testify/require"
)
@@ -21,11 +22,11 @@ import (
var testingSequence = []Key{10, 1, 9, 2, 8, 3, 4, 7, 5, 6}
func TestSplitDecider(t *testing.T) {
- testingSeed := 42
- testingThreshold := func() float64 { return 2500 }
- testingRetention := func() time.Duration { return 60 * time.Second }
+ testSettings := config.DefaultSimulationSettings()
+ testSettings.SplitQPSThreshold = 2500
+ testSettings.SplitStatRetention = 60 * time.Second
startTime := TestingStartTime()
- decider := NewSplitDecider(int64(testingSeed), testingThreshold, testingRetention)
+ decider := NewSplitDecider(testSettings)
// A decider should be created for a range when a load event is first
// recorded against it.
@@ -45,8 +46,8 @@ func TestSplitDecider(t *testing.T) {
sequence := testingSequence
// Register load greater than the threshold.
- for i := 0; int64(i) < int64(testingRetention()/time.Second); i++ {
- for j := 0; j < int(testingThreshold())+100; j++ {
+ for i := 0; int64(i) < int64(testSettings.SplitStatRetention/time.Second); i++ {
+ for j := 0; j < int(testSettings.SplitQPSThreshold)+100; j++ {
decider.Record(
OffsetTick(startTime, int64(i)),
1,
@@ -58,7 +59,7 @@ func TestSplitDecider(t *testing.T) {
// There should now be 1 suggested range for splitting which corresponds to
// the midpoint of the testing sequence.
require.Equal(t, []RangeID{1}, decider.ClearSplitKeys())
- splitKey, found = decider.SplitKey(startTime.Add(testingRetention()), 1)
+ splitKey, found = decider.SplitKey(startTime.Add(testSettings.SplitStatRetention), 1)
require.True(t, found)
require.Equal(t, Key(6), splitKey)
@@ -67,7 +68,6 @@ func TestSplitDecider(t *testing.T) {
}
func TestSplitDeciderWorkload(t *testing.T) {
- testingSeed := 42
testingRangeID := FirstRangeID
startTime := TestingStartTime()
@@ -105,11 +105,10 @@ func TestSplitDeciderWorkload(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
- splitDecider := NewSplitDecider(
- int64(testingSeed),
- func() float64 { return tc.threshold },
- func() time.Duration { return tc.retention },
- )
+ testSettings := config.DefaultSimulationSettings()
+ testSettings.SplitQPSThreshold = tc.threshold
+ testSettings.SplitStatRetention = tc.retention
+ splitDecider := NewSplitDecider(testSettings)
lastTick := int64(0)
for _, tick := range tc.ticks {
diff --git a/pkg/kv/kvserver/batcheval/cmd_range_stats.go b/pkg/kv/kvserver/batcheval/cmd_range_stats.go
index 9456c59dc81e..0496fde939dd 100644
--- a/pkg/kv/kvserver/batcheval/cmd_range_stats.go
+++ b/pkg/kv/kvserver/batcheval/cmd_range_stats.go
@@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
+ "github.com/cockroachdb/errors"
)
func init() {
@@ -44,12 +45,21 @@ func RangeStats(
) (result.Result, error) {
reply := resp.(*roachpb.RangeStatsResponse)
reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats()
- if qps, ok := cArgs.EvalCtx.GetMaxSplitQPS(ctx); ok {
- reply.MaxQueriesPerSecond = qps
- } else {
- // See comment on MaxQueriesPerSecond. -1 means !ok.
- reply.MaxQueriesPerSecond = -1
+
+ maxQPS, qpsOK := cArgs.EvalCtx.GetMaxSplitQPS(ctx)
+ maxCPU, cpuOK := cArgs.EvalCtx.GetMaxSplitCPU(ctx)
+ // See comment on MaxQueriesPerSecond and MaxCPUPerSecond. -1 means !ok.
+ reply.MaxCPUPerSecond, reply.MaxQueriesPerSecond = -1, -1
+ if qpsOK && cpuOK {
+ return result.Result{}, errors.AssertionFailedf("unexpected both QPS and CPU range statistics set")
+ }
+ if qpsOK {
+ reply.MaxQueriesPerSecond = maxQPS
}
+ if cpuOK {
+ reply.MaxCPUPerSecond = maxCPU
+ }
+
reply.MaxQueriesPerSecondSet = true
reply.RangeInfo = cArgs.EvalCtx.GetRangeInfo(ctx)
return result.Result{}, nil
diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go
index 6e5c9fa48b39..5c9aabacd3cb 100644
--- a/pkg/kv/kvserver/batcheval/eval_context.go
+++ b/pkg/kv/kvserver/batcheval/eval_context.go
@@ -87,13 +87,20 @@ type EvalContext interface {
// results due to concurrent writes.
GetMVCCStats() enginepb.MVCCStats
- // GetMaxSplitQPS returns the Replicas maximum queries/s request rate over a
- // configured retention period.
+ // GetMaxSplitQPS returns the Replica's maximum queries/s request rate over
+ // a configured retention period.
//
// NOTE: This should not be used when the load based splitting cluster setting
// is disabled.
GetMaxSplitQPS(context.Context) (float64, bool)
+ // GetMaxSplitCPU returns the Replica's maximum request cpu/s rate over a
+ // configured retention period.
+ //
+ // NOTE: This should not be used when the load based splitting cluster setting
+ // is disabled.
+ GetMaxSplitCPU(context.Context) (float64, bool)
+
GetGCThreshold() hlc.Timestamp
ExcludeDataFromBackup() bool
GetLastReplicaGCTimestamp(context.Context) (hlc.Timestamp, error)
@@ -161,6 +168,7 @@ type MockEvalCtx struct {
Clock *hlc.Clock
Stats enginepb.MVCCStats
QPS float64
+ CPU float64
AbortSpan *abortspan.AbortSpan
GCThreshold hlc.Timestamp
Term, FirstIndex uint64
@@ -240,6 +248,9 @@ func (m *mockEvalCtxImpl) GetMVCCStats() enginepb.MVCCStats {
func (m *mockEvalCtxImpl) GetMaxSplitQPS(context.Context) (float64, bool) {
return m.QPS, true
}
+func (m *mockEvalCtxImpl) GetMaxSplitCPU(context.Context) (float64, bool) {
+ return m.CPU, true
+}
func (m *mockEvalCtxImpl) CanCreateTxnRecord(
context.Context, uuid.UUID, []byte, hlc.Timestamp,
) (bool, roachpb.TransactionAbortedReason) {
diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go
index b6e8ed9f9741..645d1700aae7 100644
--- a/pkg/kv/kvserver/client_merge_test.go
+++ b/pkg/kv/kvserver/client_merge_test.go
@@ -4364,10 +4364,17 @@ func TestMergeQueue(t *testing.T) {
})
t.Run("load-based-merging", func(t *testing.T) {
- const splitByLoadQPS = 10
- const mergeByLoadQPS = splitByLoadQPS / 2 // see conservativeLoadBasedSplitThreshold
+ // NB: It is possible for the ranges being checked to record load
+ // during the test. To avoid flakiness, we set the splitByLoadStat high
+ // enough that any recorded load from testing won't exceed it.
+ const splitByLoadStat = 10e9
+ const mergeByLoadStat = splitByLoadStat / 2 // see conservativeLoadBasedSplitThreshold
const splitByLoadMergeDelay = 500 * time.Millisecond
+ setSplitObjective := func(dim kvserver.LBRebalancingObjective) {
+ kvserver.LoadBasedRebalancingObjective.Override(ctx, sv, int64(dim))
+ }
+
resetForLoadBasedSubtest := func(t *testing.T) {
reset(t)
@@ -4384,7 +4391,8 @@ func TestMergeQueue(t *testing.T) {
// meaning that it was a maximum measurement over some extended period of
// time.
kvserver.SplitByLoadEnabled.Override(ctx, sv, true)
- kvserver.SplitByLoadQPSThreshold.Override(ctx, sv, splitByLoadQPS)
+ kvserver.SplitByLoadQPSThreshold.Override(ctx, sv, splitByLoadStat)
+ kvserver.SplitByLoadCPUThreshold.Override(ctx, sv, splitByLoadStat)
// Drop the load-based splitting merge delay setting, which also dictates
// the duration that a leaseholder must measure QPS before considering its
@@ -4399,47 +4407,104 @@ func TestMergeQueue(t *testing.T) {
rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().PhysicalTime())
manualClock.Increment(splitByLoadMergeDelay.Nanoseconds())
}
+ for _, splitObjective := range []kvserver.LBRebalancingObjective{
+ kvserver.LBRebalancingQueries,
+ kvserver.LBRebalancingCPU,
+ } {
+ setSplitObjective(splitObjective)
+ t.Run(fmt.Sprintf("unreliable-lhs-%s", splitObjective.ToDimension().String()), func(t *testing.T) {
+ resetForLoadBasedSubtest(t)
- t.Run("unreliable-lhs-qps", func(t *testing.T) {
- resetForLoadBasedSubtest(t)
+ lhs().LoadBasedSplitter().Reset(tc.Servers[0].Clock().PhysicalTime())
- lhs().LoadBasedSplitter().Reset(tc.Servers[0].Clock().PhysicalTime())
+ clearRange(t, lhsStartKey, rhsEndKey)
+ verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)
+ })
- clearRange(t, lhsStartKey, rhsEndKey)
- verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)
- })
+ t.Run(fmt.Sprintf("unreliable-rhs-%s", splitObjective.ToDimension().String()), func(t *testing.T) {
+ resetForLoadBasedSubtest(t)
- t.Run("unreliable-rhs-qps", func(t *testing.T) {
- resetForLoadBasedSubtest(t)
+ rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().PhysicalTime())
- rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().PhysicalTime())
+ clearRange(t, lhsStartKey, rhsEndKey)
+ verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)
+ })
- clearRange(t, lhsStartKey, rhsEndKey)
- verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)
- })
+ t.Run(fmt.Sprintf("combined-%s-above-threshold", splitObjective.ToDimension().String()), func(t *testing.T) {
+ resetForLoadBasedSubtest(t)
+
+ moreThanHalfStat := mergeByLoadStat/2 + 1
+ rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), moreThanHalfStat)
+ lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), moreThanHalfStat)
- t.Run("combined-qps-above-threshold", func(t *testing.T) {
- resetForLoadBasedSubtest(t)
+ clearRange(t, lhsStartKey, rhsEndKey)
+ verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)
+ })
- moreThanHalfQPS := mergeByLoadQPS/2 + 1
- rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), float64(moreThanHalfQPS))
- lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), float64(moreThanHalfQPS))
+ t.Run(fmt.Sprintf("combined-%s-below-threshold", splitObjective.ToDimension().String()), func(t *testing.T) {
+ resetForLoadBasedSubtest(t)
- clearRange(t, lhsStartKey, rhsEndKey)
- verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)
- })
+ manualClock.Increment(splitByLoadMergeDelay.Nanoseconds())
+ lessThanHalfStat := mergeByLoadStat/2 - 1
+ rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), lessThanHalfStat)
+ lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), lessThanHalfStat)
- t.Run("combined-qps-below-threshold", func(t *testing.T) {
- resetForLoadBasedSubtest(t)
+ clearRange(t, lhsStartKey, rhsEndKey)
+ verifyMergedSoon(t, store, lhsStartKey, rhsStartKey)
+ })
- manualClock.Increment(splitByLoadMergeDelay.Nanoseconds())
- lessThanHalfQPS := mergeByLoadQPS/2 - 1
- rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), float64(lessThanHalfQPS))
- lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), float64(lessThanHalfQPS))
+ // These nested tests assert that after changing the split
+ // dimension, any previous load is discarded and the range will not
+ // merge, even if the previous load was above or below the
+ // threshold.
+ for _, secondSplitObjective := range []kvserver.LBRebalancingObjective{
+ kvserver.LBRebalancingQueries,
+ kvserver.LBRebalancingCPU,
+ } {
+ if splitObjective == secondSplitObjective {
+ // Nothing to do when there is no change. We expect the
+ // same outcome as the above tests.
+ continue
+ }
+ t.Run(fmt.Sprintf("switch-%s-to-%s-prev-combined-above-threshold",
+ splitObjective.ToDimension().String(),
+ secondSplitObjective.ToDimension().String(),
+ ), func(t *testing.T) {
+ // Set the split dimension again, since we have modified it
+ // at the bottom of this loop.
+ setSplitObjective(splitObjective)
+ resetForLoadBasedSubtest(t)
+
+ moreThanHalfStat := mergeByLoadStat/2 + 1
+ rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), moreThanHalfStat)
+ lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), moreThanHalfStat)
+
+ clearRange(t, lhsStartKey, rhsEndKey)
+ // Switch the dimension, so that any recorded load should
+ // be discarded and despite being above the threshold (for
+ // both dimensions), it shouldn't merge.
+ setSplitObjective(secondSplitObjective)
+ verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)
+ })
- clearRange(t, lhsStartKey, rhsEndKey)
- verifyMergedSoon(t, store, lhsStartKey, rhsStartKey)
- })
+ t.Run(fmt.Sprintf("switch-%s-to-%s-prev-combined-below-threshold",
+ splitObjective.ToDimension().String(),
+ secondSplitObjective.ToDimension().String(),
+ ), func(t *testing.T) {
+ setSplitObjective(splitObjective)
+ resetForLoadBasedSubtest(t)
+
+ manualClock.Increment(splitByLoadMergeDelay.Nanoseconds())
+ lessThanHalfStat := mergeByLoadStat/2 - 1
+ rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), lessThanHalfStat)
+ lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), lessThanHalfStat)
+
+ clearRange(t, lhsStartKey, rhsEndKey)
+ setSplitObjective(secondSplitObjective)
+ verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)
+ })
+ }
+ }
})
t.Run("sticky-bit", func(t *testing.T) {
diff --git a/pkg/kv/kvserver/merge_queue.go b/pkg/kv/kvserver/merge_queue.go
index 4ab4cc0f196b..f7b72bbc832f 100644
--- a/pkg/kv/kvserver/merge_queue.go
+++ b/pkg/kv/kvserver/merge_queue.go
@@ -184,7 +184,7 @@ var _ PurgatoryError = rangeMergePurgatoryError{}
func (mq *mergeQueue) requestRangeStats(
ctx context.Context, key roachpb.Key,
-) (desc *roachpb.RangeDescriptor, stats enginepb.MVCCStats, qps float64, qpsOK bool, err error) {
+) (desc *roachpb.RangeDescriptor, stats enginepb.MVCCStats, ls loadSplitStat, err error) {
ba := &roachpb.BatchRequest{}
ba.Add(&roachpb.RangeStatsRequest{
@@ -193,15 +193,37 @@ func (mq *mergeQueue) requestRangeStats(
br, pErr := mq.db.NonTransactionalSender().Send(ctx, ba)
if pErr != nil {
- return nil, enginepb.MVCCStats{}, 0, false, pErr.GoError()
+ return nil, enginepb.MVCCStats{}, loadSplitStat{}, pErr.GoError()
}
res := br.Responses[0].GetInner().(*roachpb.RangeStatsResponse)
desc = &res.RangeInfo.Desc
stats = res.MVCCStats
- qps = res.MaxQueriesPerSecond
- qpsOK = qps >= 0
- return desc, stats, qps, qpsOK, nil
+
+ // When constructing RangeStats only one of MaxQueriesPerSecond or
+ // MaxCPUPerSecond will be set. See RangeStats() in cmd_range_stats.go.
+ // The load based splitter will only track the max of at most statistic at
+ // a time for load based splitting. This is either CPU or QPS.
+ if res.MaxCPUPerSecond >= 0 && res.MaxQueriesPerSecond >= 0 {
+ err = errors.AssertionFailedf(
+ "unexpected both max qps %.2f and max cpu %.2f set in range stats response",
+ res.MaxQueriesPerSecond, res.MaxCPUPerSecond)
+ return nil, enginepb.MVCCStats{}, loadSplitStat{}, err
+ }
+
+ if res.MaxQueriesPerSecond >= 0 {
+ ls.max = res.MaxQueriesPerSecond
+ ls.ok = true
+ ls.typ = SplitQPS
+ }
+
+ if res.MaxCPUPerSecond >= 0 {
+ ls.max = res.MaxCPUPerSecond
+ ls.ok = true
+ ls.typ = SplitCPU
+ }
+
+ return desc, stats, ls, nil
}
func (mq *mergeQueue) process(
@@ -214,7 +236,7 @@ func (mq *mergeQueue) process(
lhsDesc := lhsRepl.Desc()
lhsStats := lhsRepl.GetMVCCStats()
- lhsQPS, lhsQPSOK := lhsRepl.GetMaxSplitQPS(ctx)
+ lhsLoadSplitStat := lhsRepl.loadSplitStat(ctx)
minBytes := lhsRepl.GetMinBytes()
if lhsStats.Total() >= minBytes {
log.VEventf(ctx, 2, "skipping merge: LHS meets minimum size threshold %d with %d bytes",
@@ -222,7 +244,7 @@ func (mq *mergeQueue) process(
return false, nil
}
- rhsDesc, rhsStats, rhsQPS, rhsQPSOK, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey())
+ rhsDesc, rhsStats, rhsLoadSplitStat, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey())
if err != nil {
return false, err
}
@@ -248,41 +270,25 @@ func (mq *mergeQueue) process(
mergedStats := lhsStats
mergedStats.Add(rhsStats)
- var mergedQPS float64
+ var loadMergeReason string
if lhsRepl.SplitByLoadEnabled() {
- // When load is a consideration for splits and, by extension, merges, the
- // mergeQueue is fairly conservative. In an effort to avoid thrashing and to
- // avoid overreacting to temporary fluctuations in load, the mergeQueue will
- // only consider a merge when the combined load across the RHS and LHS
- // ranges is below half the threshold required to split a range due to load.
- // Furthermore, to ensure that transient drops in load do not trigger range
- // merges, the mergeQueue will only consider a merge when it deems the
- // maximum qps measurement from both sides to be sufficiently stable and
- // reliable, meaning that it was a maximum measurement over some extended
- // period of time.
- if !lhsQPSOK {
- log.VEventf(ctx, 2, "skipping merge: LHS QPS measurement not yet reliable")
+ var canMergeLoad bool
+ if canMergeLoad, loadMergeReason = canMergeRangeLoad(
+ ctx, lhsLoadSplitStat, rhsLoadSplitStat, mq.store.splitConfig,
+ ); !canMergeLoad {
+ log.VEventf(ctx, 2, "skipping merge to avoid thrashing: merged range %s may split %s",
+ mergedDesc, loadMergeReason)
return false, nil
}
- if !rhsQPSOK {
- log.VEventf(ctx, 2, "skipping merge: RHS QPS measurement not yet reliable")
- return false, nil
- }
- mergedQPS = lhsQPS + rhsQPS
}
- // Check if the merged range would need to be split, if so, skip merge.
- // Use a lower threshold for load based splitting so we don't find ourselves
- // in a situation where we keep merging ranges that would be split soon after
- // by a small increase in load.
- conservativeLoadBasedSplitThreshold := 0.5 * lhsRepl.SplitByLoadQPSThreshold()
shouldSplit, _ := shouldSplitRange(ctx, mergedDesc, mergedStats,
lhsRepl.GetMaxBytes(), lhsRepl.shouldBackpressureWrites(), confReader)
- if shouldSplit || mergedQPS >= conservativeLoadBasedSplitThreshold {
+ if shouldSplit {
log.VEventf(ctx, 2,
"skipping merge to avoid thrashing: merged range %s may split "+
- "(estimated size, estimated QPS: %d, %v)",
- mergedDesc, mergedStats.Total(), mergedQPS)
+ "(estimated size: %d)",
+ mergedDesc, mergedStats.Total())
return false, nil
}
@@ -360,7 +366,7 @@ func (mq *mergeQueue) process(
}
// Refresh RHS descriptor.
- rhsDesc, _, _, _, err = mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey())
+ rhsDesc, _, _, err = mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey())
if err != nil {
return false, err
}
@@ -378,15 +384,12 @@ func (mq *mergeQueue) process(
}
log.VEventf(ctx, 2, "merging to produce range: %s-%s", mergedDesc.StartKey, mergedDesc.EndKey)
- reason := fmt.Sprintf("lhs+rhs has (size=%s+%s=%s qps=%.2f+%.2f=%.2fqps) below threshold (size=%s, qps=%.2f)",
+ reason := fmt.Sprintf("lhs+rhs size (%s+%s=%s) below threshold (%s) %s",
humanizeutil.IBytes(lhsStats.Total()),
humanizeutil.IBytes(rhsStats.Total()),
humanizeutil.IBytes(mergedStats.Total()),
- lhsQPS,
- rhsQPS,
- mergedQPS,
humanizeutil.IBytes(minBytes),
- conservativeLoadBasedSplitThreshold,
+ loadMergeReason,
)
_, pErr := lhsRepl.AdminMerge(ctx, roachpb.AdminMergeRequest{
RequestHeader: roachpb.RequestHeader{Key: lhsRepl.Desc().StartKey.AsRawKey()},
@@ -417,8 +420,8 @@ func (mq *mergeQueue) process(
// Adjust the splitter to account for the additional load from the RHS. We
// could just Reset the splitter, but then we'd need to wait out a full
// measurement period (default of 5m) before merging this range again.
- if mergedQPS != 0 {
- lhsRepl.loadBasedSplitter.RecordMax(mq.store.Clock().PhysicalTime(), mergedQPS)
+ if mergedLoadSplitStat := lhsLoadSplitStat.max + rhsLoadSplitStat.max; mergedLoadSplitStat != 0 {
+ lhsRepl.loadBasedSplitter.RecordMax(mq.store.Clock().PhysicalTime(), mergedLoadSplitStat)
}
return true, nil
}
@@ -439,3 +442,61 @@ func (mq *mergeQueue) purgatoryChan() <-chan time.Time {
func (mq *mergeQueue) updateChan() <-chan time.Time {
return nil
}
+
+func canMergeRangeLoad(
+ ctx context.Context, lhs, rhs loadSplitStat, rsc *replicaSplitConfig,
+) (can bool, reason string) {
+ // When the lhs and rhs split stats are of different types, or do not match
+ // the current split objective they cannot merge together. This could occur
+ // just after changing the split objective to a different value, where
+ // there is a mismatch.
+ splitObjective := rsc.SplitObjective()
+ if lhs.typ != splitObjective {
+ return false, "LHS load measurement is a different type (%s) than current split objective (%s)"
+ }
+ if rhs.typ != splitObjective {
+ return false, "RHS load measurement is a different type (%s) than current split objective (%s)"
+ }
+
+ // When load is a consideration for splits and, by extension, merges, the
+ // mergeQueue is fairly conservative. In an effort to avoid thrashing and to
+ // avoid overreacting to temporary fluctuations in load, the mergeQueue will
+ // only consider a merge when the combined load across the RHS and LHS
+ // ranges is below half the threshold required to split a range due to load.
+ // Furthermore, to ensure that transient drops in load do not trigger range
+ // merges, the mergeQueue will only consider a merge when it deems the
+ // maximum qps measurement from both sides to be sufficiently stable and
+ // reliable, meaning that it was a maximum measurement over some extended
+ // period of time.
+ if !lhs.ok {
+ return false, "LHS load measurement not yet reliable"
+ }
+ if !rhs.ok {
+ return false, "RHS load measurement not yet reliable"
+ }
+
+ // Check if the merged range would need to be split, if so, skip merge.
+ // Use a lower threshold for load based splitting so we don't find ourselves
+ // in a situation where we keep merging ranges that would be split soon after
+ // by a small increase in load.
+ merged := lhs.max + rhs.max
+ conservativeLoadBasedSplitThreshold := 0.5 * rsc.StatThreshold()
+
+ if merged >= conservativeLoadBasedSplitThreshold {
+ return false, fmt.Sprintf("lhs+rhs %s (%s+%s=%s) above threshold (%s)",
+ splitObjective,
+ splitObjective.Format(lhs.max),
+ splitObjective.Format(lhs.max),
+ splitObjective.Format(merged),
+ splitObjective.Format(conservativeLoadBasedSplitThreshold),
+ )
+ }
+
+ return true, fmt.Sprintf("lhs+rhs %s (%s+%s=%s) below threshold (%s)",
+ splitObjective,
+ splitObjective.Format(lhs.max),
+ splitObjective.Format(lhs.max),
+ splitObjective.Format(merged),
+ splitObjective.Format(conservativeLoadBasedSplitThreshold),
+ )
+}
diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go
index b51f0b1fd0d8..ec55514e875d 100644
--- a/pkg/kv/kvserver/replica.go
+++ b/pkg/kv/kvserver/replica.go
@@ -1293,9 +1293,28 @@ func (r *Replica) SetMVCCStatsForTesting(stats *enginepb.MVCCStats) {
// NOTE: This should only be used for load based splitting, only
// works when the load based splitting cluster setting is enabled.
//
-// Use QueriesPerSecond() for current QPS stats for all other purposes.
+// Use LoadStats.QueriesPerSecond for all other purposes.
func (r *Replica) GetMaxSplitQPS(ctx context.Context) (float64, bool) {
- return r.loadBasedSplitter.MaxQPS(ctx, r.Clock().PhysicalTime())
+ if r.store.splitConfig.SplitObjective() != SplitQPS {
+ return 0, false
+ }
+ return r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime())
+}
+
+// GetMaxSplitCPU returns the Replica's maximum CPU/s rate over a configured
+// measurement period. If the Replica has not been recording CPU for at least
+// an entire measurement period, the method will return false.
+//
+// NOTE: This should only be used for load based splitting, only
+// works when the load based splitting cluster setting is enabled.
+//
+// Use LoadStats.RaftCPUNanosPerSecond and RequestCPUNanosPerSecond for current
+// CPU stats for all other purposes.
+func (r *Replica) GetMaxSplitCPU(ctx context.Context) (float64, bool) {
+ if r.store.splitConfig.SplitObjective() != SplitCPU {
+ return 0, false
+ }
+ return r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime())
}
// ContainsKey returns whether this range contains the specified key.
diff --git a/pkg/kv/kvserver/replica_eval_context_span.go b/pkg/kv/kvserver/replica_eval_context_span.go
index 7551a07c402b..f626b10e5f9c 100644
--- a/pkg/kv/kvserver/replica_eval_context_span.go
+++ b/pkg/kv/kvserver/replica_eval_context_span.go
@@ -135,6 +135,12 @@ func (rec SpanSetReplicaEvalContext) GetMaxSplitQPS(ctx context.Context) (float6
return rec.i.GetMaxSplitQPS(ctx)
}
+// GetMaxSplitCPU returns the Replica's maximum CPU/s rate for splitting and
+// merging purposes.
+func (rec SpanSetReplicaEvalContext) GetMaxSplitCPU(ctx context.Context) (float64, bool) {
+ return rec.i.GetMaxSplitCPU(ctx)
+}
+
// CanCreateTxnRecord determines whether a transaction record can be created
// for the provided transaction information. See Replica.CanCreateTxnRecord
// for details about its arguments, return values, and preconditions.
diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go
index 4b8caf4aa77c..b12756688f75 100644
--- a/pkg/kv/kvserver/replica_init.go
+++ b/pkg/kv/kvserver/replica_init.go
@@ -107,11 +107,12 @@ func newUninitializedReplica(
r.mu.stateLoader = stateloader.Make(rangeID)
r.mu.quiescent = true
r.mu.conf = store.cfg.DefaultSpanConfig
- split.Init(&r.loadBasedSplitter, store.cfg.Settings, split.GlobalRandSource(), func() float64 {
- return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV))
- }, func() time.Duration {
- return kvserverbase.SplitByLoadMergeDelay.Get(&store.cfg.Settings.SV)
- }, store.metrics.LoadSplitterMetrics)
+ split.Init(
+ &r.loadBasedSplitter,
+ store.splitConfig,
+ store.metrics.LoadSplitterMetrics,
+ )
+
r.mu.proposals = map[kvserverbase.CmdIDKey]*ProposalData{}
r.mu.checksums = map[uuid.UUID]*replicaChecksum{}
r.mu.proposalBuf.Init((*replicaProposer)(r), tracker.NewLockfreeTracker(), r.Clock(), r.ClusterSettings())
diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go
index 6879b6ce15f5..96be9a7f7242 100644
--- a/pkg/kv/kvserver/replica_send.go
+++ b/pkg/kv/kvserver/replica_send.go
@@ -134,8 +134,8 @@ func (r *Replica) SendWithWriteBytes(
// Record the CPU time processing the request for this replica. This is
// recorded regardless of errors that are encountered.
- defer r.MeasureReqCPUNanos(grunning.Time())
-
+ startCPU := grunning.Time()
+ defer r.MeasureReqCPUNanos(startCPU)
// Record summary throughput information about the batch request for
// accounting.
r.recordBatchRequestLoad(ctx, ba)
@@ -201,12 +201,14 @@ func (r *Replica) SendWithWriteBytes(
}
}
- // Return range information if it was requested. Note that we don't return it
- // on errors because the code doesn't currently support returning both a br
- // and a pErr here. Also, some errors (e.g. NotLeaseholderError) have custom
- // ways of returning range info.
if pErr == nil {
+ // Return range information if it was requested. Note that we don't return it
+ // on errors because the code doesn't currently support returning both a br
+ // and a pErr here. Also, some errors (e.g. NotLeaseholderError) have custom
+ // ways of returning range info.
r.maybeAddRangeInfoToResponse(ctx, ba, br)
+ // Handle load-based splitting, if necessary.
+ r.recordBatchForLoadBasedSplitting(ctx, ba, br, int(grunning.Difference(startCPU, grunning.Time())))
}
r.recordRequestWriteBytes(writeBytes)
@@ -405,17 +407,6 @@ func (r *Replica) executeBatchWithConcurrencyRetries(
var requestEvalKind concurrency.RequestEvalKind
var g *concurrency.Guard
defer func() {
- // Handle load-based splitting, if necessary.
- if pErr == nil && br != nil {
- if len(ba.Requests) != len(br.Responses) {
- log.KvDistribution.Errorf(ctx,
- "Requests and responses should be equal lengths: # of requests = %d, # of responses = %d",
- len(ba.Requests), len(br.Responses))
- } else {
- r.recordBatchForLoadBasedSplitting(ctx, ba, br)
- }
- }
-
// NB: wrapped to delay g evaluation to its value when returning.
if g != nil {
r.concMgr.FinishReq(g)
@@ -1007,8 +998,6 @@ func (r *Replica) executeAdminBatch(
return br, nil
}
-// recordBatchRequestLoad records the load information about a batch request issued
-// against this replica.
func (r *Replica) recordBatchRequestLoad(ctx context.Context, ba *roachpb.BatchRequest) {
if r.loadStats == nil {
log.VEventf(
diff --git a/pkg/kv/kvserver/replica_split_load.go b/pkg/kv/kvserver/replica_split_load.go
index 3cc43ae28e39..fe73ed7605aa 100644
--- a/pkg/kv/kvserver/replica_split_load.go
+++ b/pkg/kv/kvserver/replica_split_load.go
@@ -12,10 +12,18 @@ package kvserver
import (
"context"
+ "fmt"
+ "time"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
+ "github.com/cockroachdb/cockroach/pkg/settings/cluster"
+ "github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
+ "github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
+ "github.com/cockroachdb/errors"
)
// SplitByLoadEnabled wraps "kv.range_split.by_load_enabled".
@@ -34,9 +42,128 @@ var SplitByLoadQPSThreshold = settings.RegisterIntSetting(
2500, // 2500 req/s
).WithPublic()
-// SplitByLoadQPSThreshold returns the QPS request rate for a given replica.
-func (r *Replica) SplitByLoadQPSThreshold() float64 {
- return float64(SplitByLoadQPSThreshold.Get(&r.store.cfg.Settings.SV))
+// SplitByLoadCPUThreshold wraps "kv.range_split.load_cpu_threshold". The
+// default threshold of 250ms translates to a replica utilizing 25% of a CPU
+// core processing requests. In practice, the "real" CPU usage of a replica all
+// things considered (sql,compactions, gc) tends to be around 3x the attributed
+// usage which this threshold is checked against. This means that in a static
+// state we would expect no more than (number of cores) / 0.75 load based
+// splits. In practice however, workload patterns change.
+// TODO(kvoli): Benchmark ycsb, kv0, kv95 on three nodes and bisect a value
+// that achieves the highest throughput. The current value was selected by
+// observing the performance of the cluster from a rebalancing perspective. The
+// specific criteria was to constrain the occurences of a store being overfull
+// relative to the mean but not having any actions available to resolve being
+// overfull. When running TPCE (50k), CPU splitting with a 250ms threshold
+// performed 1 load based split whilst QPS splitting (2500) performed 12.5.
+// When running the allocbench/*/kv roachtest suite, CPU splitting (250ms)
+// tended to make between 33-100% more load based splits than QPS splitting
+// (2500) on workloads involving reads (usually large scans), whilst on the
+// write heavy workloads the number of load based splits was identically low.
+var SplitByLoadCPUThreshold = settings.RegisterDurationSetting(
+ settings.TenantWritable,
+ "kv.range_split.load_cpu_threshold",
+ "the CPU use per second over which, the range becomes a candidate for load based splitting",
+ 250*time.Millisecond,
+).WithPublic()
+
+// SplitObjective is a type that specifies a load based splitting objective.
+type SplitObjective int
+
+const (
+ // SplitQPS will track and split QPS (queries-per-second) over a range.
+ SplitQPS SplitObjective = iota
+ // SplitCPU will track and split CPU (cpu-per-second) over a range.
+ SplitCPU
+)
+
+// String returns a human readable string representation of the dimension.
+func (d SplitObjective) String() string {
+ switch d {
+ case SplitQPS:
+ return "qps"
+ case SplitCPU:
+ return "cpu"
+ default:
+ panic(fmt.Sprintf("cannot name: unknown objective with ordinal %d", d))
+ }
+}
+
+// Format returns a formatted string for a value.
+func (d SplitObjective) Format(value float64) string {
+ switch d {
+ case SplitQPS:
+ return fmt.Sprintf("%.1f", value)
+ case SplitCPU:
+ return string(humanizeutil.Duration(time.Duration(int64(value))))
+ default:
+ panic(fmt.Sprintf("cannot format value: unknown objective with ordinal %d", d))
+ }
+}
+
+// replicaSplitConfig implements the split.SplitConfig interface.
+type replicaSplitConfig struct {
+ randSource split.RandSource
+ rebalanceObjectiveProvider RebalanceObjectiveProvider
+ st *cluster.Settings
+}
+
+func newReplicaSplitConfig(
+ st *cluster.Settings, rebalanceObjectiveProvider RebalanceObjectiveProvider,
+) *replicaSplitConfig {
+ return &replicaSplitConfig{
+ randSource: split.GlobalRandSource(),
+ rebalanceObjectiveProvider: rebalanceObjectiveProvider,
+ st: st,
+ }
+}
+
+// SplitObjective returns the current split objective. Currently this tracks
+// 1:1 to the rebalance objective e.g. balancing QPS means also load based
+// splitting on QPS.
+func (c *replicaSplitConfig) SplitObjective() SplitObjective {
+ obj := c.rebalanceObjectiveProvider.Objective()
+ switch obj {
+ case LBRebalancingQueries:
+ return SplitQPS
+ case LBRebalancingCPU:
+ return SplitCPU
+ default:
+ panic(errors.AssertionFailedf("Unkown split objective %d", obj))
+ }
+}
+
+// NewLoadBasedSplitter returns a new LoadBasedSplitter that may be used to
+// find the midpoint based on recorded load.
+func (c *replicaSplitConfig) NewLoadBasedSplitter(startTime time.Time) split.LoadBasedSplitter {
+ obj := c.SplitObjective()
+ switch obj {
+ case SplitQPS:
+ return split.NewUnweightedFinder(startTime, c.randSource)
+ case SplitCPU:
+ return split.NewWeightedFinder(startTime, c.randSource)
+ default:
+ panic(errors.AssertionFailedf("Unkown rebalance objective %d", obj))
+ }
+}
+
+// StatRetention returns the duration that recorded load is to be retained.
+func (c *replicaSplitConfig) StatRetention() time.Duration {
+ return kvserverbase.SplitByLoadMergeDelay.Get(&c.st.SV)
+}
+
+// StatThreshold returns the threshold for load above which the range should be
+// considered split.
+func (c *replicaSplitConfig) StatThreshold() float64 {
+ obj := c.SplitObjective()
+ switch obj {
+ case SplitQPS:
+ return float64(SplitByLoadQPSThreshold.Get(&c.st.SV))
+ case SplitCPU:
+ return float64(SplitByLoadCPUThreshold.Get(&c.st.SV))
+ default:
+ panic(errors.AssertionFailedf("Unkown rebalance objective %d", obj))
+ }
}
// SplitByLoadEnabled returns whether load based splitting is enabled.
@@ -47,6 +174,22 @@ func (r *Replica) SplitByLoadEnabled() bool {
!r.store.TestingKnobs().DisableLoadBasedSplitting
}
+type loadSplitStat struct {
+ max float64
+ ok bool
+ typ SplitObjective
+}
+
+func (r *Replica) loadSplitStat(ctx context.Context) loadSplitStat {
+ max, ok := r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime())
+ lss := loadSplitStat{
+ max: max,
+ ok: ok,
+ typ: r.store.splitConfig.SplitObjective(),
+ }
+ return lss
+}
+
// getResponseBoundarySpan computes the union span of the true spans that were
// iterated over using the request span and the response's resumeSpan.
//
@@ -123,12 +266,31 @@ func getResponseBoundarySpan(
// recordBatchForLoadBasedSplitting records the batch's spans to be considered
// for load based splitting.
func (r *Replica) recordBatchForLoadBasedSplitting(
- ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse,
+ ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse, stat int,
) {
if !r.SplitByLoadEnabled() {
return
}
- shouldInitSplit := r.loadBasedSplitter.Record(ctx, timeutil.Now(), len(ba.Requests), func() roachpb.Span {
+
+ // There is nothing to do when either the batch request or batch response
+ // are nil as we cannot record the load to a keyspan.
+ if ba == nil || br == nil {
+ return
+ }
+
+ if len(ba.Requests) != len(br.Responses) {
+ log.KvDistribution.Errorf(ctx,
+ "Requests and responses should be equal lengths: # of requests = %d, # of responses = %d",
+ len(ba.Requests), len(br.Responses))
+ }
+
+ // When QPS splitting is enabled, use the number of requests rather than
+ // the given stat for recording load.
+ if r.store.splitConfig.SplitObjective() == SplitQPS {
+ stat = len(ba.Requests)
+ }
+
+ shouldInitSplit := r.loadBasedSplitter.Record(ctx, timeutil.Now(), stat, func() roachpb.Span {
return getResponseBoundarySpan(ba, br)
})
if shouldInitSplit {
diff --git a/pkg/kv/kvserver/split/BUILD.bazel b/pkg/kv/kvserver/split/BUILD.bazel
index 031270e152c7..514c4112e635 100644
--- a/pkg/kv/kvserver/split/BUILD.bazel
+++ b/pkg/kv/kvserver/split/BUILD.bazel
@@ -13,8 +13,6 @@ go_library(
deps = [
"//pkg/keys",
"//pkg/roachpb",
- "//pkg/settings",
- "//pkg/settings/cluster",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/syncutil",
diff --git a/pkg/kv/kvserver/split/decider.go b/pkg/kv/kvserver/split/decider.go
index e34937bb4641..e13e89c302be 100644
--- a/pkg/kv/kvserver/split/decider.go
+++ b/pkg/kv/kvserver/split/decider.go
@@ -20,8 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
- "github.com/cockroachdb/cockroach/pkg/settings"
- "github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
@@ -29,7 +27,7 @@ import (
const minSplitSuggestionInterval = time.Minute
const minNoSplitKeyLoggingMetricsInterval = time.Minute
-const minQueriesPerSecondSampleDuration = time.Second
+const minPerSecondSampleDuration = time.Second
type LoadBasedSplitter interface {
// Record informs the LoadBasedSplitter about where the span lies with regard
@@ -55,6 +53,17 @@ type LoadBasedSplitter interface {
PopularKeyFrequency() float64
}
+type LoadSplitConfig interface {
+ // NewLoadBasedSplitter returns a new LoadBasedSplitter that may be used to
+ // find the midpoint based on recorded load.
+ NewLoadBasedSplitter(time.Time) LoadBasedSplitter
+ // StatRetention returns the duration that recorded load is to be retained.
+ StatRetention() time.Duration
+ // StatThreshold returns the threshold for load above which the range
+ // should be considered split.
+ StatThreshold() float64
+}
+
type RandSource interface {
// Float64 returns, as a float64, a pseudo-random number in the half-open
// interval [0.0,1.0) from the RandSource.
@@ -86,16 +95,6 @@ func GlobalRandSource() RandSource {
return globalRandSource{}
}
-var enableUnweightedLBSplitFinder = settings.RegisterBoolSetting(
- settings.SystemOnly,
- "kv.unweighted_lb_split_finder.enabled",
- "if enabled, use the un-weighted finder for load-based splitting; "+
- "the unweighted finder will attempt to find a key during when splitting "+
- "a range based on load that evenly divides the QPS among the resulting "+
- "left and right hand side ranges",
- true,
-)
-
// A Decider collects measurements about the activity (measured in qps) on a
// Replica and, assuming that qps thresholds are exceeded, tries to determine a
// split key that would approximately result in halving the load on each of the
@@ -132,22 +131,19 @@ type LoadSplitterMetrics struct {
// incoming requests to find potential split keys and checks if sampled
// candidate split keys satisfy certain requirements.
type Decider struct {
- st *cluster.Settings // supplied to Init
- randSource RandSource // supplied to Init
- qpsThreshold func() float64 // supplied to Init
- qpsRetention func() time.Duration // supplied to Init
+ config LoadSplitConfig // supplied to Init
loadSplitterMetrics *LoadSplitterMetrics // supplied to Init
mu struct {
syncutil.Mutex
// Fields tracking the current qps sample.
- lastQPSRollover time.Time // most recent time recorded by requests.
- lastQPS float64 // last reqs/s rate as of lastQPSRollover
- count int64 // number of requests recorded since last rollover
+ lastStatRollover time.Time // most recent time recorded by requests.
+ lastStatVal float64 // last reqs/s rate as of lastStatRollover
+ count int64 // number of requests recorded since last rollover
// Fields tracking historical qps samples.
- maxQPS maxQPSTracker
+ maxStat maxStatTracker
// Fields tracking split key suggestions.
splitFinder LoadBasedSplitter // populated when engaged or decided
@@ -162,19 +158,9 @@ type Decider struct {
// embedding the Decider into a larger struct outside of the scope of this package
// without incurring a pointer reference. This is relevant since many Deciders
// may exist in the system at any given point in time.
-func Init(
- lbs *Decider,
- st *cluster.Settings,
- randSource RandSource,
- qpsThreshold func() float64,
- qpsRetention func() time.Duration,
- loadSplitterMetrics *LoadSplitterMetrics,
-) {
- lbs.st = st
- lbs.randSource = randSource
- lbs.qpsThreshold = qpsThreshold
- lbs.qpsRetention = qpsRetention
+func Init(lbs *Decider, config LoadSplitConfig, loadSplitterMetrics *LoadSplitterMetrics) {
lbs.loadSplitterMetrics = loadSplitterMetrics
+ lbs.config = config
}
// Record notifies the Decider that 'n' operations are being carried out which
@@ -198,32 +184,28 @@ func (d *Decider) recordLocked(
d.mu.count += int64(n)
// First compute requests per second since the last check.
- if d.mu.lastQPSRollover.IsZero() {
- d.mu.lastQPSRollover = now
+ if d.mu.lastStatRollover.IsZero() {
+ d.mu.lastStatRollover = now
}
- elapsedSinceLastQPS := now.Sub(d.mu.lastQPSRollover)
- if elapsedSinceLastQPS >= minQueriesPerSecondSampleDuration {
- // Update the latest QPS and reset the time and request counter.
- d.mu.lastQPS = (float64(d.mu.count) / float64(elapsedSinceLastQPS)) * 1e9
- d.mu.lastQPSRollover = now
+ elapsedSinceLastSample := now.Sub(d.mu.lastStatRollover)
+ if elapsedSinceLastSample >= minPerSecondSampleDuration {
+ // Update the latest stat value and reset the time and request counter.
+ d.mu.lastStatVal = (float64(d.mu.count) / float64(elapsedSinceLastSample)) * 1e9
+ d.mu.lastStatRollover = now
d.mu.count = 0
- // Record the latest QPS sample in the historical tracker.
- d.mu.maxQPS.record(now, d.qpsRetention(), d.mu.lastQPS)
+ // Record the latest stat sample in the historical tracker.
+ d.mu.maxStat.record(now, d.config.StatRetention(), d.mu.lastStatVal)
- // If the QPS for the range exceeds the threshold, start actively
+ // If the stat for the range exceeds the threshold, start actively
// tracking potential for splitting this range based on load.
// This tracking will begin by initiating a splitFinder so it can
// begin to Record requests so it can find a split point. If a
// splitFinder already exists, we check if a split point is ready
// to be used.
- if d.mu.lastQPS >= d.qpsThreshold() {
+ if d.mu.lastStatVal >= d.config.StatThreshold() {
if d.mu.splitFinder == nil {
- if d.st == nil || enableUnweightedLBSplitFinder.Get(&d.st.SV) {
- d.mu.splitFinder = NewUnweightedFinder(now, d.randSource)
- } else {
- d.mu.splitFinder = NewWeightedFinder(now, d.randSource)
- }
+ d.mu.splitFinder = d.config.NewLoadBasedSplitter(now)
}
} else {
d.mu.splitFinder = nil
@@ -261,34 +243,34 @@ func (d *Decider) recordLocked(
return false
}
-// RecordMax adds a QPS measurement directly into the Decider's historical QPS
-// tracker. The QPS sample is considered to have been captured at the provided
-// time.
+// RecordMax adds a stat measurement directly into the Decider's historical
+// stat value tracker. The stat sample is considered to have been captured at
+// the provided time.
func (d *Decider) RecordMax(now time.Time, qps float64) {
d.mu.Lock()
defer d.mu.Unlock()
- d.mu.maxQPS.record(now, d.qpsRetention(), qps)
+ d.mu.maxStat.record(now, d.config.StatRetention(), qps)
}
-// LastQPS returns the most recent QPS measurement.
-func (d *Decider) LastQPS(ctx context.Context, now time.Time) float64 {
+// LastStat returns the most recent stat measurement.
+func (d *Decider) LastStat(ctx context.Context, now time.Time) float64 {
d.mu.Lock()
defer d.mu.Unlock()
- d.recordLocked(ctx, now, 0, nil) // force QPS computation
- return d.mu.lastQPS
+ d.recordLocked(ctx, now, 0, nil) // force stat computation
+ return d.mu.lastStatVal
}
-// MaxQPS returns the maximum QPS measurement recorded over the retention
+// MaxStat returns the maximum stat measurement recorded over the retention
// period. If the Decider has not been recording for a full retention period,
// the method returns false.
-func (d *Decider) MaxQPS(ctx context.Context, now time.Time) (float64, bool) {
+func (d *Decider) MaxStat(ctx context.Context, now time.Time) (float64, bool) {
d.mu.Lock()
defer d.mu.Unlock()
- d.recordLocked(ctx, now, 0, nil) // force QPS computation
- return d.mu.maxQPS.maxQPS(now, d.qpsRetention())
+ d.recordLocked(ctx, now, 0, nil) // force stat computation
+ return d.mu.maxStat.max(now, d.config.StatRetention())
}
// MaybeSplitKey returns a key to perform a split at. The return value will be
@@ -345,21 +327,21 @@ func (d *Decider) MaybeSplitKey(ctx context.Context, now time.Time) roachpb.Key
}
// Reset deactivates any current attempt at determining a split key. The method
-// also discards any historical QPS tracking information.
+// also discards any historical stat tracking information.
func (d *Decider) Reset(now time.Time) {
d.mu.Lock()
defer d.mu.Unlock()
- d.mu.lastQPSRollover = time.Time{}
- d.mu.lastQPS = 0
+ d.mu.lastStatRollover = time.Time{}
+ d.mu.lastStatVal = 0
d.mu.count = 0
- d.mu.maxQPS.reset(now, d.qpsRetention())
+ d.mu.maxStat.reset(now, d.config.StatRetention())
d.mu.splitFinder = nil
d.mu.lastSplitSuggestion = time.Time{}
d.mu.lastNoSplitKeyLoggingMetrics = time.Time{}
}
-// maxQPSTracker collects a series of queries-per-second measurement samples and
+// maxStatTracker collects a series of stat per-second measurement samples and
// tracks the maximum observed over a period of time.
//
// The tracker internally uses a set of time windows in order to age out old
@@ -367,13 +349,13 @@ func (d *Decider) Reset(now time.Time) {
// a circular buffer of the last N windows of stats. We rotate through the
// circular buffer every so often as determined by `minRetention`.
//
-// The tracker can be queried through its `maxQPS` method, which returns the
+// The tracker can be queried through its `max` method, which returns the
// maximum of all queries-per-second samples recorded over the retention period.
// If the tracker has not been recording for a full retention period, then the
// method returns false.
//
-// The zero-value of a maxQPSTracker can be used immediately.
-type maxQPSTracker struct {
+// The zero-value of a maxStatTracker can be used immediately.
+type maxStatTracker struct {
windows [6]float64
curIdx int
curStart time.Time
@@ -382,15 +364,15 @@ type maxQPSTracker struct {
}
// record adds the qps sample to the tracker.
-func (t *maxQPSTracker) record(now time.Time, minRetention time.Duration, qps float64) {
+func (t *maxStatTracker) record(now time.Time, minRetention time.Duration, qps float64) {
t.maybeReset(now, minRetention)
t.maybeRotate(now)
t.windows[t.curIdx] = max(t.windows[t.curIdx], qps)
}
-// reset clears the tracker. maxQPS will begin returning false until a full
+// reset clears the tracker. maxStatTracker will begin returning false until a full
// minRetention period has elapsed.
-func (t *maxQPSTracker) reset(now time.Time, minRetention time.Duration) {
+func (t *maxStatTracker) reset(now time.Time, minRetention time.Duration) {
if minRetention <= 0 {
panic("minRetention must be positive")
}
@@ -401,18 +383,18 @@ func (t *maxQPSTracker) reset(now time.Time, minRetention time.Duration) {
t.minRetention = minRetention
}
-func (t *maxQPSTracker) maybeReset(now time.Time, minRetention time.Duration) {
+func (t *maxStatTracker) maybeReset(now time.Time, minRetention time.Duration) {
// If the retention period changes, simply reset the entire tracker. Merging
// or splitting windows would be a difficult task and could lead to samples
// either not being retained for long-enough, or being retained for too long.
- // Resetting indicates to maxQPS that a new retention period needs to be
+ // Resetting indicates to max that a new retention period needs to be
// measured before accurate results can be returned.
if minRetention != t.minRetention {
t.reset(now, minRetention)
}
}
-func (t *maxQPSTracker) maybeRotate(now time.Time) {
+func (t *maxStatTracker) maybeRotate(now time.Time) {
sinceLastRotate := now.Sub(t.curStart)
windowWidth := t.windowWidth()
if sinceLastRotate < windowWidth {
@@ -435,10 +417,10 @@ func (t *maxQPSTracker) maybeRotate(now time.Time) {
}
}
-// maxQPS returns the maximum queries-per-second samples recorded over the last
+// max returns the maximum queries-per-second samples recorded over the last
// retention period. If the tracker has not been recording for a full retention
// period, then the method returns false.
-func (t *maxQPSTracker) maxQPS(now time.Time, minRetention time.Duration) (float64, bool) {
+func (t *maxStatTracker) max(now time.Time, minRetention time.Duration) (float64, bool) {
t.record(now, minRetention, 0) // expire samples, if necessary
if now.Sub(t.lastReset) < t.minRetention {
@@ -453,7 +435,7 @@ func (t *maxQPSTracker) maxQPS(now time.Time, minRetention time.Duration) (float
return qps, true
}
-func (t *maxQPSTracker) windowWidth() time.Duration {
+func (t *maxStatTracker) windowWidth() time.Duration {
// NB: -1 because during a rotation, only len(t.windows)-1 windows survive.
return t.minRetention / time.Duration(len(t.windows)-1)
}
diff --git a/pkg/kv/kvserver/split/decider_test.go b/pkg/kv/kvserver/split/decider_test.go
index 062854e4125f..8b362d6748b4 100644
--- a/pkg/kv/kvserver/split/decider_test.go
+++ b/pkg/kv/kvserver/split/decider_test.go
@@ -26,6 +26,35 @@ import (
"github.com/stretchr/testify/require"
)
+// testLoadSplitConfig implements the LoadSplitConfig interface and may be used
+// in testing.
+type testLoadSplitConfig struct {
+ randSource RandSource
+ useWeighted bool
+ statRetention time.Duration
+ statThreshold float64
+}
+
+// NewLoadBasedSplitter returns a new LoadBasedSplitter that may be used to
+// find the midpoint based on recorded load.
+func (t *testLoadSplitConfig) NewLoadBasedSplitter(startTime time.Time) LoadBasedSplitter {
+ if t.useWeighted {
+ return NewWeightedFinder(startTime, t.randSource)
+ }
+ return NewUnweightedFinder(startTime, t.randSource)
+}
+
+// StatRetention returns the duration that recorded load is to be retained.
+func (t *testLoadSplitConfig) StatRetention() time.Duration {
+ return t.statRetention
+}
+
+// StatThreshold returns the threshold for load above which the range
+// should be considered split.
+func (t *testLoadSplitConfig) StatThreshold() float64 {
+ return t.statThreshold
+}
+
func ms(i int) time.Time {
ts, err := time.Parse(time.RFC3339, "2000-01-01T00:00:00Z")
if err != nil {
@@ -38,9 +67,15 @@ func TestDecider(t *testing.T) {
defer leaktest.AfterTest(t)()
rand := rand.New(rand.NewSource(12))
+ loadSplitConfig := testLoadSplitConfig{
+ randSource: rand,
+ useWeighted: false,
+ statRetention: 2 * time.Second,
+ statThreshold: 10,
+ }
var d Decider
- Init(&d, nil, rand, func() float64 { return 10.0 }, func() time.Duration { return 2 * time.Second }, &LoadSplitterMetrics{
+ Init(&d, &loadSplitConfig, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
@@ -49,42 +84,42 @@ func TestDecider(t *testing.T) {
return func() roachpb.Span { return roachpb.Span{Key: roachpb.Key(s)} }
}
- assertQPS := func(i int, expQPS float64) {
+ assertStat := func(i int, expStat float64) {
t.Helper()
- qps := d.LastQPS(context.Background(), ms(i))
- assert.Equal(t, expQPS, qps)
+ stat := d.LastStat(context.Background(), ms(i))
+ assert.Equal(t, expStat, stat)
}
- assertMaxQPS := func(i int, expMaxQPS float64, expOK bool) {
+ assertMaxStat := func(i int, expMaxStat float64, expOK bool) {
t.Helper()
- maxQPS, ok := d.MaxQPS(context.Background(), ms(i))
- assert.Equal(t, expMaxQPS, maxQPS)
+ maxStat, ok := d.MaxStat(context.Background(), ms(i))
+ assert.Equal(t, expMaxStat, maxStat)
assert.Equal(t, expOK, ok)
}
assert.Equal(t, false, d.Record(context.Background(), ms(100), 1, nil))
- assertQPS(100, 0)
- assertMaxQPS(100, 0, false)
+ assertStat(100, 0)
+ assertMaxStat(100, 0, false)
- assert.Equal(t, ms(100), d.mu.lastQPSRollover)
+ assert.Equal(t, ms(100), d.mu.lastStatRollover)
assert.EqualValues(t, 1, d.mu.count)
assert.Equal(t, false, d.Record(context.Background(), ms(400), 3, nil))
- assertQPS(100, 0)
- assertQPS(700, 0)
- assertMaxQPS(400, 0, false)
+ assertStat(100, 0)
+ assertStat(700, 0)
+ assertMaxStat(400, 0, false)
assert.Equal(t, false, d.Record(context.Background(), ms(300), 3, nil))
- assertQPS(100, 0)
- assertMaxQPS(300, 0, false)
+ assertStat(100, 0)
+ assertMaxStat(300, 0, false)
assert.Equal(t, false, d.Record(context.Background(), ms(900), 1, nil))
- assertQPS(0, 0)
- assertMaxQPS(900, 0, false)
+ assertStat(0, 0)
+ assertMaxStat(900, 0, false)
assert.Equal(t, false, d.Record(context.Background(), ms(1099), 1, nil))
- assertQPS(0, 0)
- assertMaxQPS(1099, 0, false)
+ assertStat(0, 0)
+ assertMaxStat(1099, 0, false)
// Now 9 operations happened in the interval [100, 1099]. The next higher
// timestamp will decide whether to engage the split finder.
@@ -92,20 +127,20 @@ func TestDecider(t *testing.T) {
// It won't engage because the duration between the rollovers is 1.1s, and
// we had 10 events over that interval.
assert.Equal(t, false, d.Record(context.Background(), ms(1200), 1, nil))
- assertQPS(0, float64(10)/float64(1.1))
- assert.Equal(t, ms(1200), d.mu.lastQPSRollover)
- assertMaxQPS(1099, 0, false)
+ assertStat(0, float64(10)/float64(1.1))
+ assert.Equal(t, ms(1200), d.mu.lastStatRollover)
+ assertMaxStat(1099, 0, false)
assert.Equal(t, nil, d.mu.splitFinder)
assert.Equal(t, false, d.Record(context.Background(), ms(2199), 12, nil))
assert.Equal(t, nil, d.mu.splitFinder)
- // 2200 is the next rollover point, and 12+1=13 qps should be computed.
+ // 2200 is the next rollover point, and 12+1=13 stat should be computed.
assert.Equal(t, false, d.Record(context.Background(), ms(2200), 1, op("a")))
- assert.Equal(t, ms(2200), d.mu.lastQPSRollover)
- assertQPS(0, float64(13))
- assertMaxQPS(2200, 13, true)
+ assert.Equal(t, ms(2200), d.mu.lastStatRollover)
+ assertStat(0, float64(13))
+ assertMaxStat(2200, 13, true)
assert.NotNil(t, d.mu.splitFinder)
assert.False(t, d.mu.splitFinder.Ready(ms(10)))
@@ -132,17 +167,17 @@ func TestDecider(t *testing.T) {
o = op("a")
}
assert.False(t, d.Record(context.Background(), ms(tick), 11, o))
- assert.True(t, d.LastQPS(context.Background(), ms(tick)) > 1.0)
+ assert.True(t, d.LastStat(context.Background(), ms(tick)) > 1.0)
// Even though the split key remains.
assert.Equal(t, roachpb.Key("z"), d.MaybeSplitKey(context.Background(), ms(tick+999)))
tick += 1000
}
// But after minSplitSuggestionInterval of ticks, we get another one.
assert.True(t, d.Record(context.Background(), ms(tick), 11, op("a")))
- assertQPS(tick, float64(11))
- assertMaxQPS(tick, 11, true)
+ assertStat(tick, float64(11))
+ assertMaxStat(tick, 11, true)
- // Split key suggestion vanishes once qps drops.
+ // Split key suggestion vanishes once stat drops.
tick += 1000
assert.False(t, d.Record(context.Background(), ms(tick), 9, op("a")))
assert.Equal(t, roachpb.Key(nil), d.MaybeSplitKey(context.Background(), ms(tick)))
@@ -192,24 +227,31 @@ func TestDecider(t *testing.T) {
assert.Nil(t, d.mu.splitFinder)
}
-func TestDecider_MaxQPS(t *testing.T) {
+func TestDecider_MaxStat(t *testing.T) {
defer leaktest.AfterTest(t)()
rand := rand.New(rand.NewSource(11))
+ loadSplitConfig := testLoadSplitConfig{
+ randSource: rand,
+ useWeighted: false,
+ statRetention: 10 * time.Second,
+ statThreshold: 100,
+ }
+
var d Decider
- Init(&d, nil, rand, func() float64 { return 100.0 }, func() time.Duration { return 10 * time.Second }, &LoadSplitterMetrics{
+ Init(&d, &loadSplitConfig, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
- assertMaxQPS := func(i int, expMaxQPS float64, expOK bool) {
+ assertMaxStat := func(i int, expMaxStat float64, expOK bool) {
t.Helper()
- maxQPS, ok := d.MaxQPS(context.Background(), ms(i))
- assert.Equal(t, expMaxQPS, maxQPS)
+ maxStat, ok := d.MaxStat(context.Background(), ms(i))
+ assert.Equal(t, expMaxStat, maxStat)
assert.Equal(t, expOK, ok)
}
- assertMaxQPS(1000, 0, false)
+ assertMaxStat(1000, 0, false)
// Record a large number of samples.
d.Record(context.Background(), ms(1500), 5, nil)
@@ -220,22 +262,22 @@ func TestDecider_MaxQPS(t *testing.T) {
d.Record(context.Background(), ms(8000), 5, nil)
d.Record(context.Background(), ms(10000), 9, nil)
- assertMaxQPS(10000, 0, false)
- assertMaxQPS(11000, 17, true)
+ assertMaxStat(10000, 0, false)
+ assertMaxStat(11000, 17, true)
- // Record more samples with a lower QPS.
+ // Record more samples with a lower Stat.
d.Record(context.Background(), ms(12000), 1, nil)
d.Record(context.Background(), ms(13000), 4, nil)
d.Record(context.Background(), ms(15000), 2, nil)
d.Record(context.Background(), ms(19000), 3, nil)
- assertMaxQPS(20000, 4.5, true)
- assertMaxQPS(21000, 4, true)
+ assertMaxStat(20000, 4.5, true)
+ assertMaxStat(21000, 4, true)
- // Add in a few QPS reading directly.
+ // Add in a few Stat reading directly.
d.RecordMax(ms(24000), 6)
- assertMaxQPS(25000, 6, true)
+ assertMaxStat(25000, 6, true)
}
func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) {
@@ -243,7 +285,14 @@ func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) {
rand := rand.New(rand.NewSource(11))
var d Decider
- Init(&d, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
+ loadSplitConfig := testLoadSplitConfig{
+ randSource: rand,
+ useWeighted: false,
+ statRetention: time.Second,
+ statThreshold: 1,
+ }
+
+ Init(&d, &loadSplitConfig, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
@@ -277,9 +326,16 @@ func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) {
func TestDeciderIgnoresEnsureSafeSplitKeyOnError(t *testing.T) {
defer leaktest.AfterTest(t)()
rand := rand.New(rand.NewSource(11))
-
var d Decider
- Init(&d, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
+
+ loadSplitConfig := testLoadSplitConfig{
+ randSource: rand,
+ useWeighted: false,
+ statRetention: time.Second,
+ statThreshold: 1,
+ }
+
+ Init(&d, &loadSplitConfig, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
@@ -314,19 +370,19 @@ func TestDeciderIgnoresEnsureSafeSplitKeyOnError(t *testing.T) {
require.Equal(t, c1().Key, k)
}
-func TestMaxQPSTracker(t *testing.T) {
+func TestMaxStatTracker(t *testing.T) {
defer leaktest.AfterTest(t)()
tick := 100
minRetention := time.Second
- var mt maxQPSTracker
+ var mt maxStatTracker
mt.reset(ms(tick), minRetention)
require.Equal(t, 200*time.Millisecond, mt.windowWidth())
- // Check the maxQPS returns false before any samples are recorded.
- qps, ok := mt.maxQPS(ms(tick), minRetention)
- require.Equal(t, 0.0, qps)
+ // Check the maxStat returns false before any samples are recorded.
+ stat, ok := mt.max(ms(tick), minRetention)
+ require.Equal(t, 0.0, stat)
require.Equal(t, false, ok)
require.Equal(t, [6]float64{0, 0, 0, 0, 0, 0}, mt.windows)
require.Equal(t, 0, mt.curIdx)
@@ -338,9 +394,9 @@ func TestMaxQPSTracker(t *testing.T) {
mt.record(ms(tick), minRetention, float64(10+i))
}
- // maxQPS should still return false, but some windows should have samples.
- qps, ok = mt.maxQPS(ms(tick), minRetention)
- require.Equal(t, 0.0, qps)
+ // maxStat should still return false, but some windows should have samples.
+ stat, ok = mt.max(ms(tick), minRetention)
+ require.Equal(t, 0.0, stat)
require.Equal(t, false, ok)
require.Equal(t, [6]float64{12, 16, 20, 24, 0, 0}, mt.windows)
require.Equal(t, 3, mt.curIdx)
@@ -351,10 +407,10 @@ func TestMaxQPSTracker(t *testing.T) {
mt.record(ms(tick), minRetention, float64(24+i))
}
- // maxQPS should now return the maximum qps observed during the measurement
+ // maxStat should now return the maximum stat observed during the measurement
// period.
- qps, ok = mt.maxQPS(ms(tick), minRetention)
- require.Equal(t, 38.0, qps)
+ stat, ok = mt.max(ms(tick), minRetention)
+ require.Equal(t, 38.0, stat)
require.Equal(t, true, ok)
require.Equal(t, [6]float64{35, 38, 20, 24, 27, 31}, mt.windows)
require.Equal(t, 1, mt.curIdx)
@@ -364,8 +420,8 @@ func TestMaxQPSTracker(t *testing.T) {
tick += 500
mt.record(ms(tick), minRetention, float64(17))
- qps, ok = mt.maxQPS(ms(tick), minRetention)
- require.Equal(t, 38.0, qps)
+ stat, ok = mt.max(ms(tick), minRetention)
+ require.Equal(t, 38.0, stat)
require.Equal(t, true, ok)
require.Equal(t, [6]float64{35, 38, 0, 0, 17, 31}, mt.windows)
require.Equal(t, 4, mt.curIdx)
@@ -373,8 +429,8 @@ func TestMaxQPSTracker(t *testing.T) {
// A query far in the future should return 0, because this indicates no
// recent activity.
tick += 1900
- qps, ok = mt.maxQPS(ms(tick), minRetention)
- require.Equal(t, 0.0, qps)
+ stat, ok = mt.max(ms(tick), minRetention)
+ require.Equal(t, 0.0, stat)
require.Equal(t, true, ok)
require.Equal(t, [6]float64{0, 0, 0, 0, 0, 0}, mt.windows)
require.Equal(t, 0, mt.curIdx)
@@ -386,8 +442,8 @@ func TestMaxQPSTracker(t *testing.T) {
mt.record(ms(tick), minRetention, float64(33+i))
}
- qps, ok = mt.maxQPS(ms(tick), minRetention)
- require.Equal(t, 47.0, qps)
+ stat, ok = mt.max(ms(tick), minRetention)
+ require.Equal(t, 47.0, stat)
require.Equal(t, true, ok)
require.Equal(t, [6]float64{35, 39, 43, 47, 0, 0}, mt.windows)
require.Equal(t, 3, mt.curIdx)
@@ -398,8 +454,8 @@ func TestMaxQPSTracker(t *testing.T) {
mt.record(ms(tick), minRetention, float64(13+i))
}
- qps, ok = mt.maxQPS(ms(tick), minRetention)
- require.Equal(t, 0.0, qps)
+ stat, ok = mt.max(ms(tick), minRetention)
+ require.Equal(t, 0.0, stat)
require.Equal(t, false, ok)
require.Equal(t, [6]float64{20, 27, 0, 0, 0, 0}, mt.windows)
require.Equal(t, 1, mt.curIdx)
@@ -411,7 +467,14 @@ func TestDeciderMetrics(t *testing.T) {
timeStart := 1000
var dPopular Decider
- Init(&dPopular, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
+ loadSplitConfig := testLoadSplitConfig{
+ randSource: rand,
+ useWeighted: false,
+ statRetention: time.Second,
+ statThreshold: 1,
+ }
+
+ Init(&dPopular, &loadSplitConfig, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
@@ -433,10 +496,11 @@ func TestDeciderMetrics(t *testing.T) {
// No split key, not popular key
var dNotPopular Decider
- Init(&dNotPopular, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
+ Init(&dNotPopular, &loadSplitConfig, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
+
for i := 0; i < 20; i++ {
dNotPopular.Record(context.Background(), ms(timeStart), 1, func() roachpb.Span {
return roachpb.Span{Key: keys.SystemSQLCodec.TablePrefix(uint32(0))}
@@ -453,7 +517,7 @@ func TestDeciderMetrics(t *testing.T) {
// No split key, all insufficient counters
var dAllInsufficientCounters Decider
- Init(&dAllInsufficientCounters, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
+ Init(&dAllInsufficientCounters, &loadSplitConfig, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
diff --git a/pkg/kv/kvserver/split_queue.go b/pkg/kv/kvserver/split_queue.go
index 56adf14df73b..bb16e18fa8aa 100644
--- a/pkg/kv/kvserver/split_queue.go
+++ b/pkg/kv/kvserver/split_queue.go
@@ -266,12 +266,14 @@ func (sq *splitQueue) processAttempt(
loadStats := r.loadStats.Stats()
batchHandledQPS := loadStats.QueriesPerSecond
raftAppliedQPS := loadStats.WriteKeysPerSecond
- splitQPS := r.loadBasedSplitter.LastQPS(ctx, now)
+ lastSplitStat := r.loadBasedSplitter.LastStat(ctx, now)
+ splitObj := sq.store.splitConfig.SplitObjective()
reason := fmt.Sprintf(
- "load at key %s (%.2f splitQPS, %.2f batches/sec, %.2f raft mutations/sec)",
+ "load at key %s (%s %s, %.2f batches/sec, %.2f raft mutations/sec)",
splitByLoadKey,
- splitQPS,
+ splitObj,
+ splitObj.Format(lastSplitStat),
batchHandledQPS,
raftAppliedQPS,
)
diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go
index 246016ba023d..8aa606fd8f06 100644
--- a/pkg/kv/kvserver/store.go
+++ b/pkg/kv/kvserver/store.go
@@ -758,6 +758,7 @@ type Store struct {
ctSender *sidetransport.Sender
storeGossip *StoreGossip
rebalanceObjManager *RebalanceObjectiveManager
+ splitConfig *replicaSplitConfig
coalescedMu struct {
syncutil.Mutex
@@ -1214,7 +1215,7 @@ func NewStore(
allocatorStorePool, /* storeDescProvider */
allocatorStorePool, /* capacityChangeNotifier */
)
-
+ s.splitConfig = newReplicaSplitConfig(s.cfg.Settings, s.rebalanceObjManager)
}
if cfg.RPCContext != nil {
s.allocator = allocatorimpl.MakeAllocator(
diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto
index 1e3ae8260480..c50553d27712 100644
--- a/pkg/roachpb/api.proto
+++ b/pkg/roachpb/api.proto
@@ -2121,6 +2121,14 @@ message RangeStatsResponse {
bool max_queries_per_second_set = 6;
+ // MaxCPUPerSecond is the maximum rate of cpu/s that the range has used at
+ // the leaseholder over a configured measurement period. Set to -1 if the
+ // replica serving the RangeStats request has not been the leaseholder long
+ // enough to have recorded CPU rates for at least a full measurement period.
+ // In such cases, the recipient should not consider the CPU value reliable
+ // enough to base important decisions off of.
+ double max_cpu_per_second = 7 [(gogoproto.customname) = "MaxCPUPerSecond"];
+
// range_info contains descriptor and lease information.
RangeInfo range_info = 4 [(gogoproto.nullable) = false];