From 3c26d1a72e5aa41aee42b65fc5b389590c1e1192 Mon Sep 17 00:00:00 2001 From: Austen McClernon <austen@cockroachlabs.com> Date: Thu, 12 Jan 2023 00:06:38 +0000 Subject: [PATCH] kvserver: introduce cpu load based splits This commit adds the ability to perform load based splitting with replica cpu usage rather than queries per second. Load based splitting now will use either cpu or qps for deciding split points, depending on the cluster setting `kv.allocator.load_based_rebalancing.objective`. When set to `qps`, qps is used in deciding split points and when splitting should occur; similarly, `cpu` means that request cpu against the leaseholder replica is to decide split points. The split threshold when using `cpu` is the cluster setting `kv.range_split.load_cpu_threshold` which defaults to `250ms` of cpu time per second, i.e. a replica using 1/4 processor of a machine consistently. The merge queue uses the load based splitter to make decisions on whether to merge two adjacent ranges due to low load. This commit also updates the merge queue to be consistent with the load based splitter signal. When switching between `qps` and `cpu`, the load based splitter for every replica is reset to avoid spurious results. resolves: #95377 Release note (ops change): Load based splitter now supports using request cpu usage to split ranges. This is introduced with the previous cluster setting `kv.allocator.load_based_rebalancing.objective`, which when set to `cpu`, will use request cpu usage. The threshold above which CPU usage of a range is considered for splitting is defined in the cluster setting `kv.range_split.load_cpu_threshold`, which has a default value of `250ms`. --- .../settings/settings-for-tenants.txt | 1 + docs/generated/settings/settings.html | 1 + pkg/kv/kvserver/asim/config/settings.go | 10 +- pkg/kv/kvserver/asim/state/impl.go | 5 +- pkg/kv/kvserver/asim/state/split_decider.go | 49 +++-- .../kvserver/asim/state/split_decider_test.go | 25 ++- pkg/kv/kvserver/batcheval/cmd_range_stats.go | 20 +- pkg/kv/kvserver/batcheval/eval_context.go | 15 +- pkg/kv/kvserver/client_merge_test.go | 129 ++++++++--- pkg/kv/kvserver/merge_queue.go | 143 +++++++++---- pkg/kv/kvserver/replica.go | 23 +- pkg/kv/kvserver/replica_eval_context_span.go | 6 + pkg/kv/kvserver/replica_init.go | 11 +- pkg/kv/kvserver/replica_send.go | 27 +-- pkg/kv/kvserver/replica_split_load.go | 172 ++++++++++++++- pkg/kv/kvserver/split/BUILD.bazel | 2 - pkg/kv/kvserver/split/decider.go | 138 ++++++------ pkg/kv/kvserver/split/decider_test.go | 200 ++++++++++++------ pkg/kv/kvserver/split_queue.go | 8 +- pkg/kv/kvserver/store.go | 3 +- pkg/roachpb/api.proto | 8 + 21 files changed, 696 insertions(+), 300 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 1f05e57e5828..175eed01c966 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -43,6 +43,7 @@ kv.closed_timestamp.follower_reads_enabled boolean true allow (all) replicas to kv.log_range_and_node_events.enabled boolean true set to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconciling jobs with protected timestamp records kv.range_split.by_load_enabled boolean true allow automatic splits of ranges based on where load is concentrated +kv.range_split.load_cpu_threshold duration 250ms the CPU use per second over which, the range becomes a candidate for load based splitting kv.range_split.load_qps_threshold integer 2500 the QPS over which, the range becomes a candidate for load based splitting kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled kv.rangefeed.range_stuck_threshold duration 1m0s restart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.closed_timestamp.side_transport_interval takes precedence) diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 4dee1dc4ece0..2b0228398f30 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -57,6 +57,7 @@ <tr><td><div id="setting-kv-log-range-and-node-events-enabled" class="anchored"><code>kv.log_range_and_node_events.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog</td></tr> <tr><td><div id="setting-kv-protectedts-reconciliation-interval" class="anchored"><code>kv.protectedts.reconciliation.interval</code></div></td><td>duration</td><td><code>5m0s</code></td><td>the frequency for reconciling jobs with protected timestamp records</td></tr> <tr><td><div id="setting-kv-range-split-by-load-enabled" class="anchored"><code>kv.range_split.by_load_enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>allow automatic splits of ranges based on where load is concentrated</td></tr> +<tr><td><div id="setting-kv-range-split-load-cpu-threshold" class="anchored"><code>kv.range_split.load_cpu_threshold</code></div></td><td>duration</td><td><code>250ms</code></td><td>the CPU use per second over which, the range becomes a candidate for load based splitting</td></tr> <tr><td><div id="setting-kv-range-split-load-qps-threshold" class="anchored"><code>kv.range_split.load_qps_threshold</code></div></td><td>integer</td><td><code>2500</code></td><td>the QPS over which, the range becomes a candidate for load based splitting</td></tr> <tr><td><div id="setting-kv-rangefeed-enabled" class="anchored"><code>kv.rangefeed.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if set, rangefeed registration is enabled</td></tr> <tr><td><div id="setting-kv-rangefeed-range-stuck-threshold" class="anchored"><code>kv.rangefeed.range_stuck_threshold</code></div></td><td>duration</td><td><code>1m0s</code></td><td>restart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.closed_timestamp.side_transport_interval takes precedence)</td></tr> diff --git a/pkg/kv/kvserver/asim/config/settings.go b/pkg/kv/kvserver/asim/config/settings.go index 1206c4ed6d2a..69cac7b7edc1 100644 --- a/pkg/kv/kvserver/asim/config/settings.go +++ b/pkg/kv/kvserver/asim/config/settings.go @@ -24,7 +24,7 @@ const ( defaultStateExchangeInterval = 10 * time.Second defaultStateExchangeDelay = 500 * time.Millisecond defaultSplitQPSThreshold = 2500 - defaultSplitQPSRetention = 10 * time.Minute + defaultSplitStatRetention = 10 * time.Minute defaultSeed = 42 defaultLBRebalancingMode = 2 // Leases and replicas. defaultLBRebalancingInterval = time.Minute @@ -89,9 +89,9 @@ type SimulationSettings struct { // SplitQPSThreshold is the threshold above which a range will be a // candidate for load based splitting. SplitQPSThreshold float64 - // SplitQPSRetention is the duration which recorded load will be retained + // SplitStatRetention is the duration which recorded load will be retained // and factored into load based splitting decisions. - SplitQPSRetention time.Duration + SplitStatRetention time.Duration // LBRebalancingMode controls if and when we do store-level rebalancing // based on load. It maps to kvserver.LBRebalancingMode. LBRebalancingMode int64 @@ -125,7 +125,7 @@ func DefaultSimulationSettings() *SimulationSettings { StateExchangeInterval: defaultStateExchangeInterval, StateExchangeDelay: defaultStateExchangeDelay, SplitQPSThreshold: defaultSplitQPSThreshold, - SplitQPSRetention: defaultSplitQPSRetention, + SplitStatRetention: defaultSplitStatRetention, LBRebalancingMode: defaultLBRebalancingMode, LBRebalancingObjective: defaultLBRebalancingObjective, LBRebalancingInterval: defaultLBRebalancingInterval, @@ -167,6 +167,6 @@ func (s *SimulationSettings) SplitQPSThresholdFn() func() float64 { // split decisions. func (s *SimulationSettings) SplitQPSRetentionFn() func() time.Duration { return func() time.Duration { - return s.SplitQPSRetention + return s.SplitStatRetention } } diff --git a/pkg/kv/kvserver/asim/state/impl.go b/pkg/kv/kvserver/asim/state/impl.go index 5304d2314600..c172c2b42136 100644 --- a/pkg/kv/kvserver/asim/state/impl.go +++ b/pkg/kv/kvserver/asim/state/impl.go @@ -333,10 +333,7 @@ func (s *state) AddStore(nodeID NodeID) (Store, bool) { s.stores[storeID] = store // Add a range load splitter for this store. - s.loadsplits[storeID] = NewSplitDecider(s.settings.Seed, - s.settings.SplitQPSThresholdFn(), - s.settings.SplitQPSRetentionFn(), - ) + s.loadsplits[storeID] = NewSplitDecider(s.settings) return store, true } diff --git a/pkg/kv/kvserver/asim/state/split_decider.go b/pkg/kv/kvserver/asim/state/split_decider.go index 3c62003b3b1f..822ea36c1b5d 100644 --- a/pkg/kv/kvserver/asim/state/split_decider.go +++ b/pkg/kv/kvserver/asim/state/split_decider.go @@ -15,6 +15,7 @@ import ( "math/rand" "time" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -38,32 +39,50 @@ type LoadSplitter interface { ResetRange(rangeID RangeID) } +type loadSplitConfig struct { + randSource split.RandSource + settings *config.SimulationSettings +} + +// NewLoadBasedSplitter returns a new LoadBasedSplitter that may be used to +// find the midpoint based on recorded load. +func (lsc loadSplitConfig) NewLoadBasedSplitter(startTime time.Time) split.LoadBasedSplitter { + return split.NewUnweightedFinder(startTime, lsc.randSource) +} + +// StatRetention returns the duration that recorded load is to be retained. +func (lsc loadSplitConfig) StatRetention() time.Duration { + return lsc.settings.SplitStatRetention +} + +// StatThreshold returns the threshold for load above which the range +// should be considered split. +func (lsc loadSplitConfig) StatThreshold() float64 { + return lsc.settings.SplitQPSThreshold +} + // SplitDecider implements the LoadSplitter interface. type SplitDecider struct { - deciders map[RangeID]*split.Decider - suggestions []RangeID - qpsThreshold func() float64 - qpsRetention func() time.Duration - seed int64 + deciders map[RangeID]*split.Decider + suggestions []RangeID + splitConfig split.LoadSplitConfig } // NewSplitDecider returns a new SplitDecider. -func NewSplitDecider( - seed int64, qpsThresholdFn func() float64, qpsRetentionFn func() time.Duration, -) *SplitDecider { +func NewSplitDecider(settings *config.SimulationSettings) *SplitDecider { return &SplitDecider{ - deciders: make(map[RangeID]*split.Decider), - suggestions: []RangeID{}, - seed: seed, - qpsThreshold: qpsThresholdFn, - qpsRetention: qpsRetentionFn, + deciders: make(map[RangeID]*split.Decider), + suggestions: []RangeID{}, + splitConfig: loadSplitConfig{ + randSource: rand.New(rand.NewSource(settings.Seed)), + settings: settings, + }, } } func (s *SplitDecider) newDecider() *split.Decider { - rand := rand.New(rand.NewSource(s.seed)) decider := &split.Decider{} - split.Init(decider, nil, rand, s.qpsThreshold, s.qpsRetention, &split.LoadSplitterMetrics{ + split.Init(decider, s.splitConfig, &split.LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) diff --git a/pkg/kv/kvserver/asim/state/split_decider_test.go b/pkg/kv/kvserver/asim/state/split_decider_test.go index 8bcc7c9d3b2a..bcec5a9e8e6d 100644 --- a/pkg/kv/kvserver/asim/state/split_decider_test.go +++ b/pkg/kv/kvserver/asim/state/split_decider_test.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" "github.com/stretchr/testify/require" ) @@ -21,11 +22,11 @@ import ( var testingSequence = []Key{10, 1, 9, 2, 8, 3, 4, 7, 5, 6} func TestSplitDecider(t *testing.T) { - testingSeed := 42 - testingThreshold := func() float64 { return 2500 } - testingRetention := func() time.Duration { return 60 * time.Second } + testSettings := config.DefaultSimulationSettings() + testSettings.SplitQPSThreshold = 2500 + testSettings.SplitStatRetention = 60 * time.Second startTime := TestingStartTime() - decider := NewSplitDecider(int64(testingSeed), testingThreshold, testingRetention) + decider := NewSplitDecider(testSettings) // A decider should be created for a range when a load event is first // recorded against it. @@ -45,8 +46,8 @@ func TestSplitDecider(t *testing.T) { sequence := testingSequence // Register load greater than the threshold. - for i := 0; int64(i) < int64(testingRetention()/time.Second); i++ { - for j := 0; j < int(testingThreshold())+100; j++ { + for i := 0; int64(i) < int64(testSettings.SplitStatRetention/time.Second); i++ { + for j := 0; j < int(testSettings.SplitQPSThreshold)+100; j++ { decider.Record( OffsetTick(startTime, int64(i)), 1, @@ -58,7 +59,7 @@ func TestSplitDecider(t *testing.T) { // There should now be 1 suggested range for splitting which corresponds to // the midpoint of the testing sequence. require.Equal(t, []RangeID{1}, decider.ClearSplitKeys()) - splitKey, found = decider.SplitKey(startTime.Add(testingRetention()), 1) + splitKey, found = decider.SplitKey(startTime.Add(testSettings.SplitStatRetention), 1) require.True(t, found) require.Equal(t, Key(6), splitKey) @@ -67,7 +68,6 @@ func TestSplitDecider(t *testing.T) { } func TestSplitDeciderWorkload(t *testing.T) { - testingSeed := 42 testingRangeID := FirstRangeID startTime := TestingStartTime() @@ -105,11 +105,10 @@ func TestSplitDeciderWorkload(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - splitDecider := NewSplitDecider( - int64(testingSeed), - func() float64 { return tc.threshold }, - func() time.Duration { return tc.retention }, - ) + testSettings := config.DefaultSimulationSettings() + testSettings.SplitQPSThreshold = tc.threshold + testSettings.SplitStatRetention = tc.retention + splitDecider := NewSplitDecider(testSettings) lastTick := int64(0) for _, tick := range tc.ticks { diff --git a/pkg/kv/kvserver/batcheval/cmd_range_stats.go b/pkg/kv/kvserver/batcheval/cmd_range_stats.go index 9456c59dc81e..0496fde939dd 100644 --- a/pkg/kv/kvserver/batcheval/cmd_range_stats.go +++ b/pkg/kv/kvserver/batcheval/cmd_range_stats.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/errors" ) func init() { @@ -44,12 +45,21 @@ func RangeStats( ) (result.Result, error) { reply := resp.(*roachpb.RangeStatsResponse) reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats() - if qps, ok := cArgs.EvalCtx.GetMaxSplitQPS(ctx); ok { - reply.MaxQueriesPerSecond = qps - } else { - // See comment on MaxQueriesPerSecond. -1 means !ok. - reply.MaxQueriesPerSecond = -1 + + maxQPS, qpsOK := cArgs.EvalCtx.GetMaxSplitQPS(ctx) + maxCPU, cpuOK := cArgs.EvalCtx.GetMaxSplitCPU(ctx) + // See comment on MaxQueriesPerSecond and MaxCPUPerSecond. -1 means !ok. + reply.MaxCPUPerSecond, reply.MaxQueriesPerSecond = -1, -1 + if qpsOK && cpuOK { + return result.Result{}, errors.AssertionFailedf("unexpected both QPS and CPU range statistics set") + } + if qpsOK { + reply.MaxQueriesPerSecond = maxQPS } + if cpuOK { + reply.MaxCPUPerSecond = maxCPU + } + reply.MaxQueriesPerSecondSet = true reply.RangeInfo = cArgs.EvalCtx.GetRangeInfo(ctx) return result.Result{}, nil diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index 6e5c9fa48b39..5c9aabacd3cb 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -87,13 +87,20 @@ type EvalContext interface { // results due to concurrent writes. GetMVCCStats() enginepb.MVCCStats - // GetMaxSplitQPS returns the Replicas maximum queries/s request rate over a - // configured retention period. + // GetMaxSplitQPS returns the Replica's maximum queries/s request rate over + // a configured retention period. // // NOTE: This should not be used when the load based splitting cluster setting // is disabled. GetMaxSplitQPS(context.Context) (float64, bool) + // GetMaxSplitCPU returns the Replica's maximum request cpu/s rate over a + // configured retention period. + // + // NOTE: This should not be used when the load based splitting cluster setting + // is disabled. + GetMaxSplitCPU(context.Context) (float64, bool) + GetGCThreshold() hlc.Timestamp ExcludeDataFromBackup() bool GetLastReplicaGCTimestamp(context.Context) (hlc.Timestamp, error) @@ -161,6 +168,7 @@ type MockEvalCtx struct { Clock *hlc.Clock Stats enginepb.MVCCStats QPS float64 + CPU float64 AbortSpan *abortspan.AbortSpan GCThreshold hlc.Timestamp Term, FirstIndex uint64 @@ -240,6 +248,9 @@ func (m *mockEvalCtxImpl) GetMVCCStats() enginepb.MVCCStats { func (m *mockEvalCtxImpl) GetMaxSplitQPS(context.Context) (float64, bool) { return m.QPS, true } +func (m *mockEvalCtxImpl) GetMaxSplitCPU(context.Context) (float64, bool) { + return m.CPU, true +} func (m *mockEvalCtxImpl) CanCreateTxnRecord( context.Context, uuid.UUID, []byte, hlc.Timestamp, ) (bool, roachpb.TransactionAbortedReason) { diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index b6e8ed9f9741..645d1700aae7 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -4364,10 +4364,17 @@ func TestMergeQueue(t *testing.T) { }) t.Run("load-based-merging", func(t *testing.T) { - const splitByLoadQPS = 10 - const mergeByLoadQPS = splitByLoadQPS / 2 // see conservativeLoadBasedSplitThreshold + // NB: It is possible for the ranges being checked to record load + // during the test. To avoid flakiness, we set the splitByLoadStat high + // enough that any recorded load from testing won't exceed it. + const splitByLoadStat = 10e9 + const mergeByLoadStat = splitByLoadStat / 2 // see conservativeLoadBasedSplitThreshold const splitByLoadMergeDelay = 500 * time.Millisecond + setSplitObjective := func(dim kvserver.LBRebalancingObjective) { + kvserver.LoadBasedRebalancingObjective.Override(ctx, sv, int64(dim)) + } + resetForLoadBasedSubtest := func(t *testing.T) { reset(t) @@ -4384,7 +4391,8 @@ func TestMergeQueue(t *testing.T) { // meaning that it was a maximum measurement over some extended period of // time. kvserver.SplitByLoadEnabled.Override(ctx, sv, true) - kvserver.SplitByLoadQPSThreshold.Override(ctx, sv, splitByLoadQPS) + kvserver.SplitByLoadQPSThreshold.Override(ctx, sv, splitByLoadStat) + kvserver.SplitByLoadCPUThreshold.Override(ctx, sv, splitByLoadStat) // Drop the load-based splitting merge delay setting, which also dictates // the duration that a leaseholder must measure QPS before considering its @@ -4399,47 +4407,104 @@ func TestMergeQueue(t *testing.T) { rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().PhysicalTime()) manualClock.Increment(splitByLoadMergeDelay.Nanoseconds()) } + for _, splitObjective := range []kvserver.LBRebalancingObjective{ + kvserver.LBRebalancingQueries, + kvserver.LBRebalancingCPU, + } { + setSplitObjective(splitObjective) + t.Run(fmt.Sprintf("unreliable-lhs-%s", splitObjective.ToDimension().String()), func(t *testing.T) { + resetForLoadBasedSubtest(t) - t.Run("unreliable-lhs-qps", func(t *testing.T) { - resetForLoadBasedSubtest(t) + lhs().LoadBasedSplitter().Reset(tc.Servers[0].Clock().PhysicalTime()) - lhs().LoadBasedSplitter().Reset(tc.Servers[0].Clock().PhysicalTime()) + clearRange(t, lhsStartKey, rhsEndKey) + verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) + }) - clearRange(t, lhsStartKey, rhsEndKey) - verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) - }) + t.Run(fmt.Sprintf("unreliable-rhs-%s", splitObjective.ToDimension().String()), func(t *testing.T) { + resetForLoadBasedSubtest(t) - t.Run("unreliable-rhs-qps", func(t *testing.T) { - resetForLoadBasedSubtest(t) + rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().PhysicalTime()) - rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().PhysicalTime()) + clearRange(t, lhsStartKey, rhsEndKey) + verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) + }) - clearRange(t, lhsStartKey, rhsEndKey) - verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) - }) + t.Run(fmt.Sprintf("combined-%s-above-threshold", splitObjective.ToDimension().String()), func(t *testing.T) { + resetForLoadBasedSubtest(t) + + moreThanHalfStat := mergeByLoadStat/2 + 1 + rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), moreThanHalfStat) + lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), moreThanHalfStat) - t.Run("combined-qps-above-threshold", func(t *testing.T) { - resetForLoadBasedSubtest(t) + clearRange(t, lhsStartKey, rhsEndKey) + verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) + }) - moreThanHalfQPS := mergeByLoadQPS/2 + 1 - rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), float64(moreThanHalfQPS)) - lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), float64(moreThanHalfQPS)) + t.Run(fmt.Sprintf("combined-%s-below-threshold", splitObjective.ToDimension().String()), func(t *testing.T) { + resetForLoadBasedSubtest(t) - clearRange(t, lhsStartKey, rhsEndKey) - verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) - }) + manualClock.Increment(splitByLoadMergeDelay.Nanoseconds()) + lessThanHalfStat := mergeByLoadStat/2 - 1 + rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), lessThanHalfStat) + lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), lessThanHalfStat) - t.Run("combined-qps-below-threshold", func(t *testing.T) { - resetForLoadBasedSubtest(t) + clearRange(t, lhsStartKey, rhsEndKey) + verifyMergedSoon(t, store, lhsStartKey, rhsStartKey) + }) - manualClock.Increment(splitByLoadMergeDelay.Nanoseconds()) - lessThanHalfQPS := mergeByLoadQPS/2 - 1 - rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), float64(lessThanHalfQPS)) - lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), float64(lessThanHalfQPS)) + // These nested tests assert that after changing the split + // dimension, any previous load is discarded and the range will not + // merge, even if the previous load was above or below the + // threshold. + for _, secondSplitObjective := range []kvserver.LBRebalancingObjective{ + kvserver.LBRebalancingQueries, + kvserver.LBRebalancingCPU, + } { + if splitObjective == secondSplitObjective { + // Nothing to do when there is no change. We expect the + // same outcome as the above tests. + continue + } + t.Run(fmt.Sprintf("switch-%s-to-%s-prev-combined-above-threshold", + splitObjective.ToDimension().String(), + secondSplitObjective.ToDimension().String(), + ), func(t *testing.T) { + // Set the split dimension again, since we have modified it + // at the bottom of this loop. + setSplitObjective(splitObjective) + resetForLoadBasedSubtest(t) + + moreThanHalfStat := mergeByLoadStat/2 + 1 + rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), moreThanHalfStat) + lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), moreThanHalfStat) + + clearRange(t, lhsStartKey, rhsEndKey) + // Switch the dimension, so that any recorded load should + // be discarded and despite being above the threshold (for + // both dimensions), it shouldn't merge. + setSplitObjective(secondSplitObjective) + verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) + }) - clearRange(t, lhsStartKey, rhsEndKey) - verifyMergedSoon(t, store, lhsStartKey, rhsStartKey) - }) + t.Run(fmt.Sprintf("switch-%s-to-%s-prev-combined-below-threshold", + splitObjective.ToDimension().String(), + secondSplitObjective.ToDimension().String(), + ), func(t *testing.T) { + setSplitObjective(splitObjective) + resetForLoadBasedSubtest(t) + + manualClock.Increment(splitByLoadMergeDelay.Nanoseconds()) + lessThanHalfStat := mergeByLoadStat/2 - 1 + rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), lessThanHalfStat) + lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), lessThanHalfStat) + + clearRange(t, lhsStartKey, rhsEndKey) + setSplitObjective(secondSplitObjective) + verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey) + }) + } + } }) t.Run("sticky-bit", func(t *testing.T) { diff --git a/pkg/kv/kvserver/merge_queue.go b/pkg/kv/kvserver/merge_queue.go index 4ab4cc0f196b..f7b72bbc832f 100644 --- a/pkg/kv/kvserver/merge_queue.go +++ b/pkg/kv/kvserver/merge_queue.go @@ -184,7 +184,7 @@ var _ PurgatoryError = rangeMergePurgatoryError{} func (mq *mergeQueue) requestRangeStats( ctx context.Context, key roachpb.Key, -) (desc *roachpb.RangeDescriptor, stats enginepb.MVCCStats, qps float64, qpsOK bool, err error) { +) (desc *roachpb.RangeDescriptor, stats enginepb.MVCCStats, ls loadSplitStat, err error) { ba := &roachpb.BatchRequest{} ba.Add(&roachpb.RangeStatsRequest{ @@ -193,15 +193,37 @@ func (mq *mergeQueue) requestRangeStats( br, pErr := mq.db.NonTransactionalSender().Send(ctx, ba) if pErr != nil { - return nil, enginepb.MVCCStats{}, 0, false, pErr.GoError() + return nil, enginepb.MVCCStats{}, loadSplitStat{}, pErr.GoError() } res := br.Responses[0].GetInner().(*roachpb.RangeStatsResponse) desc = &res.RangeInfo.Desc stats = res.MVCCStats - qps = res.MaxQueriesPerSecond - qpsOK = qps >= 0 - return desc, stats, qps, qpsOK, nil + + // When constructing RangeStats only one of MaxQueriesPerSecond or + // MaxCPUPerSecond will be set. See RangeStats() in cmd_range_stats.go. + // The load based splitter will only track the max of at most statistic at + // a time for load based splitting. This is either CPU or QPS. + if res.MaxCPUPerSecond >= 0 && res.MaxQueriesPerSecond >= 0 { + err = errors.AssertionFailedf( + "unexpected both max qps %.2f and max cpu %.2f set in range stats response", + res.MaxQueriesPerSecond, res.MaxCPUPerSecond) + return nil, enginepb.MVCCStats{}, loadSplitStat{}, err + } + + if res.MaxQueriesPerSecond >= 0 { + ls.max = res.MaxQueriesPerSecond + ls.ok = true + ls.typ = SplitQPS + } + + if res.MaxCPUPerSecond >= 0 { + ls.max = res.MaxCPUPerSecond + ls.ok = true + ls.typ = SplitCPU + } + + return desc, stats, ls, nil } func (mq *mergeQueue) process( @@ -214,7 +236,7 @@ func (mq *mergeQueue) process( lhsDesc := lhsRepl.Desc() lhsStats := lhsRepl.GetMVCCStats() - lhsQPS, lhsQPSOK := lhsRepl.GetMaxSplitQPS(ctx) + lhsLoadSplitStat := lhsRepl.loadSplitStat(ctx) minBytes := lhsRepl.GetMinBytes() if lhsStats.Total() >= minBytes { log.VEventf(ctx, 2, "skipping merge: LHS meets minimum size threshold %d with %d bytes", @@ -222,7 +244,7 @@ func (mq *mergeQueue) process( return false, nil } - rhsDesc, rhsStats, rhsQPS, rhsQPSOK, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey()) + rhsDesc, rhsStats, rhsLoadSplitStat, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey()) if err != nil { return false, err } @@ -248,41 +270,25 @@ func (mq *mergeQueue) process( mergedStats := lhsStats mergedStats.Add(rhsStats) - var mergedQPS float64 + var loadMergeReason string if lhsRepl.SplitByLoadEnabled() { - // When load is a consideration for splits and, by extension, merges, the - // mergeQueue is fairly conservative. In an effort to avoid thrashing and to - // avoid overreacting to temporary fluctuations in load, the mergeQueue will - // only consider a merge when the combined load across the RHS and LHS - // ranges is below half the threshold required to split a range due to load. - // Furthermore, to ensure that transient drops in load do not trigger range - // merges, the mergeQueue will only consider a merge when it deems the - // maximum qps measurement from both sides to be sufficiently stable and - // reliable, meaning that it was a maximum measurement over some extended - // period of time. - if !lhsQPSOK { - log.VEventf(ctx, 2, "skipping merge: LHS QPS measurement not yet reliable") + var canMergeLoad bool + if canMergeLoad, loadMergeReason = canMergeRangeLoad( + ctx, lhsLoadSplitStat, rhsLoadSplitStat, mq.store.splitConfig, + ); !canMergeLoad { + log.VEventf(ctx, 2, "skipping merge to avoid thrashing: merged range %s may split %s", + mergedDesc, loadMergeReason) return false, nil } - if !rhsQPSOK { - log.VEventf(ctx, 2, "skipping merge: RHS QPS measurement not yet reliable") - return false, nil - } - mergedQPS = lhsQPS + rhsQPS } - // Check if the merged range would need to be split, if so, skip merge. - // Use a lower threshold for load based splitting so we don't find ourselves - // in a situation where we keep merging ranges that would be split soon after - // by a small increase in load. - conservativeLoadBasedSplitThreshold := 0.5 * lhsRepl.SplitByLoadQPSThreshold() shouldSplit, _ := shouldSplitRange(ctx, mergedDesc, mergedStats, lhsRepl.GetMaxBytes(), lhsRepl.shouldBackpressureWrites(), confReader) - if shouldSplit || mergedQPS >= conservativeLoadBasedSplitThreshold { + if shouldSplit { log.VEventf(ctx, 2, "skipping merge to avoid thrashing: merged range %s may split "+ - "(estimated size, estimated QPS: %d, %v)", - mergedDesc, mergedStats.Total(), mergedQPS) + "(estimated size: %d)", + mergedDesc, mergedStats.Total()) return false, nil } @@ -360,7 +366,7 @@ func (mq *mergeQueue) process( } // Refresh RHS descriptor. - rhsDesc, _, _, _, err = mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey()) + rhsDesc, _, _, err = mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey()) if err != nil { return false, err } @@ -378,15 +384,12 @@ func (mq *mergeQueue) process( } log.VEventf(ctx, 2, "merging to produce range: %s-%s", mergedDesc.StartKey, mergedDesc.EndKey) - reason := fmt.Sprintf("lhs+rhs has (size=%s+%s=%s qps=%.2f+%.2f=%.2fqps) below threshold (size=%s, qps=%.2f)", + reason := fmt.Sprintf("lhs+rhs size (%s+%s=%s) below threshold (%s) %s", humanizeutil.IBytes(lhsStats.Total()), humanizeutil.IBytes(rhsStats.Total()), humanizeutil.IBytes(mergedStats.Total()), - lhsQPS, - rhsQPS, - mergedQPS, humanizeutil.IBytes(minBytes), - conservativeLoadBasedSplitThreshold, + loadMergeReason, ) _, pErr := lhsRepl.AdminMerge(ctx, roachpb.AdminMergeRequest{ RequestHeader: roachpb.RequestHeader{Key: lhsRepl.Desc().StartKey.AsRawKey()}, @@ -417,8 +420,8 @@ func (mq *mergeQueue) process( // Adjust the splitter to account for the additional load from the RHS. We // could just Reset the splitter, but then we'd need to wait out a full // measurement period (default of 5m) before merging this range again. - if mergedQPS != 0 { - lhsRepl.loadBasedSplitter.RecordMax(mq.store.Clock().PhysicalTime(), mergedQPS) + if mergedLoadSplitStat := lhsLoadSplitStat.max + rhsLoadSplitStat.max; mergedLoadSplitStat != 0 { + lhsRepl.loadBasedSplitter.RecordMax(mq.store.Clock().PhysicalTime(), mergedLoadSplitStat) } return true, nil } @@ -439,3 +442,61 @@ func (mq *mergeQueue) purgatoryChan() <-chan time.Time { func (mq *mergeQueue) updateChan() <-chan time.Time { return nil } + +func canMergeRangeLoad( + ctx context.Context, lhs, rhs loadSplitStat, rsc *replicaSplitConfig, +) (can bool, reason string) { + // When the lhs and rhs split stats are of different types, or do not match + // the current split objective they cannot merge together. This could occur + // just after changing the split objective to a different value, where + // there is a mismatch. + splitObjective := rsc.SplitObjective() + if lhs.typ != splitObjective { + return false, "LHS load measurement is a different type (%s) than current split objective (%s)" + } + if rhs.typ != splitObjective { + return false, "RHS load measurement is a different type (%s) than current split objective (%s)" + } + + // When load is a consideration for splits and, by extension, merges, the + // mergeQueue is fairly conservative. In an effort to avoid thrashing and to + // avoid overreacting to temporary fluctuations in load, the mergeQueue will + // only consider a merge when the combined load across the RHS and LHS + // ranges is below half the threshold required to split a range due to load. + // Furthermore, to ensure that transient drops in load do not trigger range + // merges, the mergeQueue will only consider a merge when it deems the + // maximum qps measurement from both sides to be sufficiently stable and + // reliable, meaning that it was a maximum measurement over some extended + // period of time. + if !lhs.ok { + return false, "LHS load measurement not yet reliable" + } + if !rhs.ok { + return false, "RHS load measurement not yet reliable" + } + + // Check if the merged range would need to be split, if so, skip merge. + // Use a lower threshold for load based splitting so we don't find ourselves + // in a situation where we keep merging ranges that would be split soon after + // by a small increase in load. + merged := lhs.max + rhs.max + conservativeLoadBasedSplitThreshold := 0.5 * rsc.StatThreshold() + + if merged >= conservativeLoadBasedSplitThreshold { + return false, fmt.Sprintf("lhs+rhs %s (%s+%s=%s) above threshold (%s)", + splitObjective, + splitObjective.Format(lhs.max), + splitObjective.Format(lhs.max), + splitObjective.Format(merged), + splitObjective.Format(conservativeLoadBasedSplitThreshold), + ) + } + + return true, fmt.Sprintf("lhs+rhs %s (%s+%s=%s) below threshold (%s)", + splitObjective, + splitObjective.Format(lhs.max), + splitObjective.Format(lhs.max), + splitObjective.Format(merged), + splitObjective.Format(conservativeLoadBasedSplitThreshold), + ) +} diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index b51f0b1fd0d8..ec55514e875d 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -1293,9 +1293,28 @@ func (r *Replica) SetMVCCStatsForTesting(stats *enginepb.MVCCStats) { // NOTE: This should only be used for load based splitting, only // works when the load based splitting cluster setting is enabled. // -// Use QueriesPerSecond() for current QPS stats for all other purposes. +// Use LoadStats.QueriesPerSecond for all other purposes. func (r *Replica) GetMaxSplitQPS(ctx context.Context) (float64, bool) { - return r.loadBasedSplitter.MaxQPS(ctx, r.Clock().PhysicalTime()) + if r.store.splitConfig.SplitObjective() != SplitQPS { + return 0, false + } + return r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime()) +} + +// GetMaxSplitCPU returns the Replica's maximum CPU/s rate over a configured +// measurement period. If the Replica has not been recording CPU for at least +// an entire measurement period, the method will return false. +// +// NOTE: This should only be used for load based splitting, only +// works when the load based splitting cluster setting is enabled. +// +// Use LoadStats.RaftCPUNanosPerSecond and RequestCPUNanosPerSecond for current +// CPU stats for all other purposes. +func (r *Replica) GetMaxSplitCPU(ctx context.Context) (float64, bool) { + if r.store.splitConfig.SplitObjective() != SplitCPU { + return 0, false + } + return r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime()) } // ContainsKey returns whether this range contains the specified key. diff --git a/pkg/kv/kvserver/replica_eval_context_span.go b/pkg/kv/kvserver/replica_eval_context_span.go index 7551a07c402b..f626b10e5f9c 100644 --- a/pkg/kv/kvserver/replica_eval_context_span.go +++ b/pkg/kv/kvserver/replica_eval_context_span.go @@ -135,6 +135,12 @@ func (rec SpanSetReplicaEvalContext) GetMaxSplitQPS(ctx context.Context) (float6 return rec.i.GetMaxSplitQPS(ctx) } +// GetMaxSplitCPU returns the Replica's maximum CPU/s rate for splitting and +// merging purposes. +func (rec SpanSetReplicaEvalContext) GetMaxSplitCPU(ctx context.Context) (float64, bool) { + return rec.i.GetMaxSplitCPU(ctx) +} + // CanCreateTxnRecord determines whether a transaction record can be created // for the provided transaction information. See Replica.CanCreateTxnRecord // for details about its arguments, return values, and preconditions. diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 4b8caf4aa77c..b12756688f75 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -107,11 +107,12 @@ func newUninitializedReplica( r.mu.stateLoader = stateloader.Make(rangeID) r.mu.quiescent = true r.mu.conf = store.cfg.DefaultSpanConfig - split.Init(&r.loadBasedSplitter, store.cfg.Settings, split.GlobalRandSource(), func() float64 { - return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV)) - }, func() time.Duration { - return kvserverbase.SplitByLoadMergeDelay.Get(&store.cfg.Settings.SV) - }, store.metrics.LoadSplitterMetrics) + split.Init( + &r.loadBasedSplitter, + store.splitConfig, + store.metrics.LoadSplitterMetrics, + ) + r.mu.proposals = map[kvserverbase.CmdIDKey]*ProposalData{} r.mu.checksums = map[uuid.UUID]*replicaChecksum{} r.mu.proposalBuf.Init((*replicaProposer)(r), tracker.NewLockfreeTracker(), r.Clock(), r.ClusterSettings()) diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 6879b6ce15f5..96be9a7f7242 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -134,8 +134,8 @@ func (r *Replica) SendWithWriteBytes( // Record the CPU time processing the request for this replica. This is // recorded regardless of errors that are encountered. - defer r.MeasureReqCPUNanos(grunning.Time()) - + startCPU := grunning.Time() + defer r.MeasureReqCPUNanos(startCPU) // Record summary throughput information about the batch request for // accounting. r.recordBatchRequestLoad(ctx, ba) @@ -201,12 +201,14 @@ func (r *Replica) SendWithWriteBytes( } } - // Return range information if it was requested. Note that we don't return it - // on errors because the code doesn't currently support returning both a br - // and a pErr here. Also, some errors (e.g. NotLeaseholderError) have custom - // ways of returning range info. if pErr == nil { + // Return range information if it was requested. Note that we don't return it + // on errors because the code doesn't currently support returning both a br + // and a pErr here. Also, some errors (e.g. NotLeaseholderError) have custom + // ways of returning range info. r.maybeAddRangeInfoToResponse(ctx, ba, br) + // Handle load-based splitting, if necessary. + r.recordBatchForLoadBasedSplitting(ctx, ba, br, int(grunning.Difference(startCPU, grunning.Time()))) } r.recordRequestWriteBytes(writeBytes) @@ -405,17 +407,6 @@ func (r *Replica) executeBatchWithConcurrencyRetries( var requestEvalKind concurrency.RequestEvalKind var g *concurrency.Guard defer func() { - // Handle load-based splitting, if necessary. - if pErr == nil && br != nil { - if len(ba.Requests) != len(br.Responses) { - log.KvDistribution.Errorf(ctx, - "Requests and responses should be equal lengths: # of requests = %d, # of responses = %d", - len(ba.Requests), len(br.Responses)) - } else { - r.recordBatchForLoadBasedSplitting(ctx, ba, br) - } - } - // NB: wrapped to delay g evaluation to its value when returning. if g != nil { r.concMgr.FinishReq(g) @@ -1007,8 +998,6 @@ func (r *Replica) executeAdminBatch( return br, nil } -// recordBatchRequestLoad records the load information about a batch request issued -// against this replica. func (r *Replica) recordBatchRequestLoad(ctx context.Context, ba *roachpb.BatchRequest) { if r.loadStats == nil { log.VEventf( diff --git a/pkg/kv/kvserver/replica_split_load.go b/pkg/kv/kvserver/replica_split_load.go index 3cc43ae28e39..fe73ed7605aa 100644 --- a/pkg/kv/kvserver/replica_split_load.go +++ b/pkg/kv/kvserver/replica_split_load.go @@ -12,10 +12,18 @@ package kvserver import ( "context" + "fmt" + "time" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" ) // SplitByLoadEnabled wraps "kv.range_split.by_load_enabled". @@ -34,9 +42,128 @@ var SplitByLoadQPSThreshold = settings.RegisterIntSetting( 2500, // 2500 req/s ).WithPublic() -// SplitByLoadQPSThreshold returns the QPS request rate for a given replica. -func (r *Replica) SplitByLoadQPSThreshold() float64 { - return float64(SplitByLoadQPSThreshold.Get(&r.store.cfg.Settings.SV)) +// SplitByLoadCPUThreshold wraps "kv.range_split.load_cpu_threshold". The +// default threshold of 250ms translates to a replica utilizing 25% of a CPU +// core processing requests. In practice, the "real" CPU usage of a replica all +// things considered (sql,compactions, gc) tends to be around 3x the attributed +// usage which this threshold is checked against. This means that in a static +// state we would expect no more than (number of cores) / 0.75 load based +// splits. In practice however, workload patterns change. +// TODO(kvoli): Benchmark ycsb, kv0, kv95 on three nodes and bisect a value +// that achieves the highest throughput. The current value was selected by +// observing the performance of the cluster from a rebalancing perspective. The +// specific criteria was to constrain the occurences of a store being overfull +// relative to the mean but not having any actions available to resolve being +// overfull. When running TPCE (50k), CPU splitting with a 250ms threshold +// performed 1 load based split whilst QPS splitting (2500) performed 12.5. +// When running the allocbench/*/kv roachtest suite, CPU splitting (250ms) +// tended to make between 33-100% more load based splits than QPS splitting +// (2500) on workloads involving reads (usually large scans), whilst on the +// write heavy workloads the number of load based splits was identically low. +var SplitByLoadCPUThreshold = settings.RegisterDurationSetting( + settings.TenantWritable, + "kv.range_split.load_cpu_threshold", + "the CPU use per second over which, the range becomes a candidate for load based splitting", + 250*time.Millisecond, +).WithPublic() + +// SplitObjective is a type that specifies a load based splitting objective. +type SplitObjective int + +const ( + // SplitQPS will track and split QPS (queries-per-second) over a range. + SplitQPS SplitObjective = iota + // SplitCPU will track and split CPU (cpu-per-second) over a range. + SplitCPU +) + +// String returns a human readable string representation of the dimension. +func (d SplitObjective) String() string { + switch d { + case SplitQPS: + return "qps" + case SplitCPU: + return "cpu" + default: + panic(fmt.Sprintf("cannot name: unknown objective with ordinal %d", d)) + } +} + +// Format returns a formatted string for a value. +func (d SplitObjective) Format(value float64) string { + switch d { + case SplitQPS: + return fmt.Sprintf("%.1f", value) + case SplitCPU: + return string(humanizeutil.Duration(time.Duration(int64(value)))) + default: + panic(fmt.Sprintf("cannot format value: unknown objective with ordinal %d", d)) + } +} + +// replicaSplitConfig implements the split.SplitConfig interface. +type replicaSplitConfig struct { + randSource split.RandSource + rebalanceObjectiveProvider RebalanceObjectiveProvider + st *cluster.Settings +} + +func newReplicaSplitConfig( + st *cluster.Settings, rebalanceObjectiveProvider RebalanceObjectiveProvider, +) *replicaSplitConfig { + return &replicaSplitConfig{ + randSource: split.GlobalRandSource(), + rebalanceObjectiveProvider: rebalanceObjectiveProvider, + st: st, + } +} + +// SplitObjective returns the current split objective. Currently this tracks +// 1:1 to the rebalance objective e.g. balancing QPS means also load based +// splitting on QPS. +func (c *replicaSplitConfig) SplitObjective() SplitObjective { + obj := c.rebalanceObjectiveProvider.Objective() + switch obj { + case LBRebalancingQueries: + return SplitQPS + case LBRebalancingCPU: + return SplitCPU + default: + panic(errors.AssertionFailedf("Unkown split objective %d", obj)) + } +} + +// NewLoadBasedSplitter returns a new LoadBasedSplitter that may be used to +// find the midpoint based on recorded load. +func (c *replicaSplitConfig) NewLoadBasedSplitter(startTime time.Time) split.LoadBasedSplitter { + obj := c.SplitObjective() + switch obj { + case SplitQPS: + return split.NewUnweightedFinder(startTime, c.randSource) + case SplitCPU: + return split.NewWeightedFinder(startTime, c.randSource) + default: + panic(errors.AssertionFailedf("Unkown rebalance objective %d", obj)) + } +} + +// StatRetention returns the duration that recorded load is to be retained. +func (c *replicaSplitConfig) StatRetention() time.Duration { + return kvserverbase.SplitByLoadMergeDelay.Get(&c.st.SV) +} + +// StatThreshold returns the threshold for load above which the range should be +// considered split. +func (c *replicaSplitConfig) StatThreshold() float64 { + obj := c.SplitObjective() + switch obj { + case SplitQPS: + return float64(SplitByLoadQPSThreshold.Get(&c.st.SV)) + case SplitCPU: + return float64(SplitByLoadCPUThreshold.Get(&c.st.SV)) + default: + panic(errors.AssertionFailedf("Unkown rebalance objective %d", obj)) + } } // SplitByLoadEnabled returns whether load based splitting is enabled. @@ -47,6 +174,22 @@ func (r *Replica) SplitByLoadEnabled() bool { !r.store.TestingKnobs().DisableLoadBasedSplitting } +type loadSplitStat struct { + max float64 + ok bool + typ SplitObjective +} + +func (r *Replica) loadSplitStat(ctx context.Context) loadSplitStat { + max, ok := r.loadBasedSplitter.MaxStat(ctx, r.Clock().PhysicalTime()) + lss := loadSplitStat{ + max: max, + ok: ok, + typ: r.store.splitConfig.SplitObjective(), + } + return lss +} + // getResponseBoundarySpan computes the union span of the true spans that were // iterated over using the request span and the response's resumeSpan. // @@ -123,12 +266,31 @@ func getResponseBoundarySpan( // recordBatchForLoadBasedSplitting records the batch's spans to be considered // for load based splitting. func (r *Replica) recordBatchForLoadBasedSplitting( - ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse, + ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse, stat int, ) { if !r.SplitByLoadEnabled() { return } - shouldInitSplit := r.loadBasedSplitter.Record(ctx, timeutil.Now(), len(ba.Requests), func() roachpb.Span { + + // There is nothing to do when either the batch request or batch response + // are nil as we cannot record the load to a keyspan. + if ba == nil || br == nil { + return + } + + if len(ba.Requests) != len(br.Responses) { + log.KvDistribution.Errorf(ctx, + "Requests and responses should be equal lengths: # of requests = %d, # of responses = %d", + len(ba.Requests), len(br.Responses)) + } + + // When QPS splitting is enabled, use the number of requests rather than + // the given stat for recording load. + if r.store.splitConfig.SplitObjective() == SplitQPS { + stat = len(ba.Requests) + } + + shouldInitSplit := r.loadBasedSplitter.Record(ctx, timeutil.Now(), stat, func() roachpb.Span { return getResponseBoundarySpan(ba, br) }) if shouldInitSplit { diff --git a/pkg/kv/kvserver/split/BUILD.bazel b/pkg/kv/kvserver/split/BUILD.bazel index 031270e152c7..514c4112e635 100644 --- a/pkg/kv/kvserver/split/BUILD.bazel +++ b/pkg/kv/kvserver/split/BUILD.bazel @@ -13,8 +13,6 @@ go_library( deps = [ "//pkg/keys", "//pkg/roachpb", - "//pkg/settings", - "//pkg/settings/cluster", "//pkg/util/log", "//pkg/util/metric", "//pkg/util/syncutil", diff --git a/pkg/kv/kvserver/split/decider.go b/pkg/kv/kvserver/split/decider.go index e34937bb4641..e13e89c302be 100644 --- a/pkg/kv/kvserver/split/decider.go +++ b/pkg/kv/kvserver/split/decider.go @@ -20,8 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/settings" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -29,7 +27,7 @@ import ( const minSplitSuggestionInterval = time.Minute const minNoSplitKeyLoggingMetricsInterval = time.Minute -const minQueriesPerSecondSampleDuration = time.Second +const minPerSecondSampleDuration = time.Second type LoadBasedSplitter interface { // Record informs the LoadBasedSplitter about where the span lies with regard @@ -55,6 +53,17 @@ type LoadBasedSplitter interface { PopularKeyFrequency() float64 } +type LoadSplitConfig interface { + // NewLoadBasedSplitter returns a new LoadBasedSplitter that may be used to + // find the midpoint based on recorded load. + NewLoadBasedSplitter(time.Time) LoadBasedSplitter + // StatRetention returns the duration that recorded load is to be retained. + StatRetention() time.Duration + // StatThreshold returns the threshold for load above which the range + // should be considered split. + StatThreshold() float64 +} + type RandSource interface { // Float64 returns, as a float64, a pseudo-random number in the half-open // interval [0.0,1.0) from the RandSource. @@ -86,16 +95,6 @@ func GlobalRandSource() RandSource { return globalRandSource{} } -var enableUnweightedLBSplitFinder = settings.RegisterBoolSetting( - settings.SystemOnly, - "kv.unweighted_lb_split_finder.enabled", - "if enabled, use the un-weighted finder for load-based splitting; "+ - "the unweighted finder will attempt to find a key during when splitting "+ - "a range based on load that evenly divides the QPS among the resulting "+ - "left and right hand side ranges", - true, -) - // A Decider collects measurements about the activity (measured in qps) on a // Replica and, assuming that qps thresholds are exceeded, tries to determine a // split key that would approximately result in halving the load on each of the @@ -132,22 +131,19 @@ type LoadSplitterMetrics struct { // incoming requests to find potential split keys and checks if sampled // candidate split keys satisfy certain requirements. type Decider struct { - st *cluster.Settings // supplied to Init - randSource RandSource // supplied to Init - qpsThreshold func() float64 // supplied to Init - qpsRetention func() time.Duration // supplied to Init + config LoadSplitConfig // supplied to Init loadSplitterMetrics *LoadSplitterMetrics // supplied to Init mu struct { syncutil.Mutex // Fields tracking the current qps sample. - lastQPSRollover time.Time // most recent time recorded by requests. - lastQPS float64 // last reqs/s rate as of lastQPSRollover - count int64 // number of requests recorded since last rollover + lastStatRollover time.Time // most recent time recorded by requests. + lastStatVal float64 // last reqs/s rate as of lastStatRollover + count int64 // number of requests recorded since last rollover // Fields tracking historical qps samples. - maxQPS maxQPSTracker + maxStat maxStatTracker // Fields tracking split key suggestions. splitFinder LoadBasedSplitter // populated when engaged or decided @@ -162,19 +158,9 @@ type Decider struct { // embedding the Decider into a larger struct outside of the scope of this package // without incurring a pointer reference. This is relevant since many Deciders // may exist in the system at any given point in time. -func Init( - lbs *Decider, - st *cluster.Settings, - randSource RandSource, - qpsThreshold func() float64, - qpsRetention func() time.Duration, - loadSplitterMetrics *LoadSplitterMetrics, -) { - lbs.st = st - lbs.randSource = randSource - lbs.qpsThreshold = qpsThreshold - lbs.qpsRetention = qpsRetention +func Init(lbs *Decider, config LoadSplitConfig, loadSplitterMetrics *LoadSplitterMetrics) { lbs.loadSplitterMetrics = loadSplitterMetrics + lbs.config = config } // Record notifies the Decider that 'n' operations are being carried out which @@ -198,32 +184,28 @@ func (d *Decider) recordLocked( d.mu.count += int64(n) // First compute requests per second since the last check. - if d.mu.lastQPSRollover.IsZero() { - d.mu.lastQPSRollover = now + if d.mu.lastStatRollover.IsZero() { + d.mu.lastStatRollover = now } - elapsedSinceLastQPS := now.Sub(d.mu.lastQPSRollover) - if elapsedSinceLastQPS >= minQueriesPerSecondSampleDuration { - // Update the latest QPS and reset the time and request counter. - d.mu.lastQPS = (float64(d.mu.count) / float64(elapsedSinceLastQPS)) * 1e9 - d.mu.lastQPSRollover = now + elapsedSinceLastSample := now.Sub(d.mu.lastStatRollover) + if elapsedSinceLastSample >= minPerSecondSampleDuration { + // Update the latest stat value and reset the time and request counter. + d.mu.lastStatVal = (float64(d.mu.count) / float64(elapsedSinceLastSample)) * 1e9 + d.mu.lastStatRollover = now d.mu.count = 0 - // Record the latest QPS sample in the historical tracker. - d.mu.maxQPS.record(now, d.qpsRetention(), d.mu.lastQPS) + // Record the latest stat sample in the historical tracker. + d.mu.maxStat.record(now, d.config.StatRetention(), d.mu.lastStatVal) - // If the QPS for the range exceeds the threshold, start actively + // If the stat for the range exceeds the threshold, start actively // tracking potential for splitting this range based on load. // This tracking will begin by initiating a splitFinder so it can // begin to Record requests so it can find a split point. If a // splitFinder already exists, we check if a split point is ready // to be used. - if d.mu.lastQPS >= d.qpsThreshold() { + if d.mu.lastStatVal >= d.config.StatThreshold() { if d.mu.splitFinder == nil { - if d.st == nil || enableUnweightedLBSplitFinder.Get(&d.st.SV) { - d.mu.splitFinder = NewUnweightedFinder(now, d.randSource) - } else { - d.mu.splitFinder = NewWeightedFinder(now, d.randSource) - } + d.mu.splitFinder = d.config.NewLoadBasedSplitter(now) } } else { d.mu.splitFinder = nil @@ -261,34 +243,34 @@ func (d *Decider) recordLocked( return false } -// RecordMax adds a QPS measurement directly into the Decider's historical QPS -// tracker. The QPS sample is considered to have been captured at the provided -// time. +// RecordMax adds a stat measurement directly into the Decider's historical +// stat value tracker. The stat sample is considered to have been captured at +// the provided time. func (d *Decider) RecordMax(now time.Time, qps float64) { d.mu.Lock() defer d.mu.Unlock() - d.mu.maxQPS.record(now, d.qpsRetention(), qps) + d.mu.maxStat.record(now, d.config.StatRetention(), qps) } -// LastQPS returns the most recent QPS measurement. -func (d *Decider) LastQPS(ctx context.Context, now time.Time) float64 { +// LastStat returns the most recent stat measurement. +func (d *Decider) LastStat(ctx context.Context, now time.Time) float64 { d.mu.Lock() defer d.mu.Unlock() - d.recordLocked(ctx, now, 0, nil) // force QPS computation - return d.mu.lastQPS + d.recordLocked(ctx, now, 0, nil) // force stat computation + return d.mu.lastStatVal } -// MaxQPS returns the maximum QPS measurement recorded over the retention +// MaxStat returns the maximum stat measurement recorded over the retention // period. If the Decider has not been recording for a full retention period, // the method returns false. -func (d *Decider) MaxQPS(ctx context.Context, now time.Time) (float64, bool) { +func (d *Decider) MaxStat(ctx context.Context, now time.Time) (float64, bool) { d.mu.Lock() defer d.mu.Unlock() - d.recordLocked(ctx, now, 0, nil) // force QPS computation - return d.mu.maxQPS.maxQPS(now, d.qpsRetention()) + d.recordLocked(ctx, now, 0, nil) // force stat computation + return d.mu.maxStat.max(now, d.config.StatRetention()) } // MaybeSplitKey returns a key to perform a split at. The return value will be @@ -345,21 +327,21 @@ func (d *Decider) MaybeSplitKey(ctx context.Context, now time.Time) roachpb.Key } // Reset deactivates any current attempt at determining a split key. The method -// also discards any historical QPS tracking information. +// also discards any historical stat tracking information. func (d *Decider) Reset(now time.Time) { d.mu.Lock() defer d.mu.Unlock() - d.mu.lastQPSRollover = time.Time{} - d.mu.lastQPS = 0 + d.mu.lastStatRollover = time.Time{} + d.mu.lastStatVal = 0 d.mu.count = 0 - d.mu.maxQPS.reset(now, d.qpsRetention()) + d.mu.maxStat.reset(now, d.config.StatRetention()) d.mu.splitFinder = nil d.mu.lastSplitSuggestion = time.Time{} d.mu.lastNoSplitKeyLoggingMetrics = time.Time{} } -// maxQPSTracker collects a series of queries-per-second measurement samples and +// maxStatTracker collects a series of stat per-second measurement samples and // tracks the maximum observed over a period of time. // // The tracker internally uses a set of time windows in order to age out old @@ -367,13 +349,13 @@ func (d *Decider) Reset(now time.Time) { // a circular buffer of the last N windows of stats. We rotate through the // circular buffer every so often as determined by `minRetention`. // -// The tracker can be queried through its `maxQPS` method, which returns the +// The tracker can be queried through its `max` method, which returns the // maximum of all queries-per-second samples recorded over the retention period. // If the tracker has not been recording for a full retention period, then the // method returns false. // -// The zero-value of a maxQPSTracker can be used immediately. -type maxQPSTracker struct { +// The zero-value of a maxStatTracker can be used immediately. +type maxStatTracker struct { windows [6]float64 curIdx int curStart time.Time @@ -382,15 +364,15 @@ type maxQPSTracker struct { } // record adds the qps sample to the tracker. -func (t *maxQPSTracker) record(now time.Time, minRetention time.Duration, qps float64) { +func (t *maxStatTracker) record(now time.Time, minRetention time.Duration, qps float64) { t.maybeReset(now, minRetention) t.maybeRotate(now) t.windows[t.curIdx] = max(t.windows[t.curIdx], qps) } -// reset clears the tracker. maxQPS will begin returning false until a full +// reset clears the tracker. maxStatTracker will begin returning false until a full // minRetention period has elapsed. -func (t *maxQPSTracker) reset(now time.Time, minRetention time.Duration) { +func (t *maxStatTracker) reset(now time.Time, minRetention time.Duration) { if minRetention <= 0 { panic("minRetention must be positive") } @@ -401,18 +383,18 @@ func (t *maxQPSTracker) reset(now time.Time, minRetention time.Duration) { t.minRetention = minRetention } -func (t *maxQPSTracker) maybeReset(now time.Time, minRetention time.Duration) { +func (t *maxStatTracker) maybeReset(now time.Time, minRetention time.Duration) { // If the retention period changes, simply reset the entire tracker. Merging // or splitting windows would be a difficult task and could lead to samples // either not being retained for long-enough, or being retained for too long. - // Resetting indicates to maxQPS that a new retention period needs to be + // Resetting indicates to max that a new retention period needs to be // measured before accurate results can be returned. if minRetention != t.minRetention { t.reset(now, minRetention) } } -func (t *maxQPSTracker) maybeRotate(now time.Time) { +func (t *maxStatTracker) maybeRotate(now time.Time) { sinceLastRotate := now.Sub(t.curStart) windowWidth := t.windowWidth() if sinceLastRotate < windowWidth { @@ -435,10 +417,10 @@ func (t *maxQPSTracker) maybeRotate(now time.Time) { } } -// maxQPS returns the maximum queries-per-second samples recorded over the last +// max returns the maximum queries-per-second samples recorded over the last // retention period. If the tracker has not been recording for a full retention // period, then the method returns false. -func (t *maxQPSTracker) maxQPS(now time.Time, minRetention time.Duration) (float64, bool) { +func (t *maxStatTracker) max(now time.Time, minRetention time.Duration) (float64, bool) { t.record(now, minRetention, 0) // expire samples, if necessary if now.Sub(t.lastReset) < t.minRetention { @@ -453,7 +435,7 @@ func (t *maxQPSTracker) maxQPS(now time.Time, minRetention time.Duration) (float return qps, true } -func (t *maxQPSTracker) windowWidth() time.Duration { +func (t *maxStatTracker) windowWidth() time.Duration { // NB: -1 because during a rotation, only len(t.windows)-1 windows survive. return t.minRetention / time.Duration(len(t.windows)-1) } diff --git a/pkg/kv/kvserver/split/decider_test.go b/pkg/kv/kvserver/split/decider_test.go index 062854e4125f..8b362d6748b4 100644 --- a/pkg/kv/kvserver/split/decider_test.go +++ b/pkg/kv/kvserver/split/decider_test.go @@ -26,6 +26,35 @@ import ( "github.com/stretchr/testify/require" ) +// testLoadSplitConfig implements the LoadSplitConfig interface and may be used +// in testing. +type testLoadSplitConfig struct { + randSource RandSource + useWeighted bool + statRetention time.Duration + statThreshold float64 +} + +// NewLoadBasedSplitter returns a new LoadBasedSplitter that may be used to +// find the midpoint based on recorded load. +func (t *testLoadSplitConfig) NewLoadBasedSplitter(startTime time.Time) LoadBasedSplitter { + if t.useWeighted { + return NewWeightedFinder(startTime, t.randSource) + } + return NewUnweightedFinder(startTime, t.randSource) +} + +// StatRetention returns the duration that recorded load is to be retained. +func (t *testLoadSplitConfig) StatRetention() time.Duration { + return t.statRetention +} + +// StatThreshold returns the threshold for load above which the range +// should be considered split. +func (t *testLoadSplitConfig) StatThreshold() float64 { + return t.statThreshold +} + func ms(i int) time.Time { ts, err := time.Parse(time.RFC3339, "2000-01-01T00:00:00Z") if err != nil { @@ -38,9 +67,15 @@ func TestDecider(t *testing.T) { defer leaktest.AfterTest(t)() rand := rand.New(rand.NewSource(12)) + loadSplitConfig := testLoadSplitConfig{ + randSource: rand, + useWeighted: false, + statRetention: 2 * time.Second, + statThreshold: 10, + } var d Decider - Init(&d, nil, rand, func() float64 { return 10.0 }, func() time.Duration { return 2 * time.Second }, &LoadSplitterMetrics{ + Init(&d, &loadSplitConfig, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) @@ -49,42 +84,42 @@ func TestDecider(t *testing.T) { return func() roachpb.Span { return roachpb.Span{Key: roachpb.Key(s)} } } - assertQPS := func(i int, expQPS float64) { + assertStat := func(i int, expStat float64) { t.Helper() - qps := d.LastQPS(context.Background(), ms(i)) - assert.Equal(t, expQPS, qps) + stat := d.LastStat(context.Background(), ms(i)) + assert.Equal(t, expStat, stat) } - assertMaxQPS := func(i int, expMaxQPS float64, expOK bool) { + assertMaxStat := func(i int, expMaxStat float64, expOK bool) { t.Helper() - maxQPS, ok := d.MaxQPS(context.Background(), ms(i)) - assert.Equal(t, expMaxQPS, maxQPS) + maxStat, ok := d.MaxStat(context.Background(), ms(i)) + assert.Equal(t, expMaxStat, maxStat) assert.Equal(t, expOK, ok) } assert.Equal(t, false, d.Record(context.Background(), ms(100), 1, nil)) - assertQPS(100, 0) - assertMaxQPS(100, 0, false) + assertStat(100, 0) + assertMaxStat(100, 0, false) - assert.Equal(t, ms(100), d.mu.lastQPSRollover) + assert.Equal(t, ms(100), d.mu.lastStatRollover) assert.EqualValues(t, 1, d.mu.count) assert.Equal(t, false, d.Record(context.Background(), ms(400), 3, nil)) - assertQPS(100, 0) - assertQPS(700, 0) - assertMaxQPS(400, 0, false) + assertStat(100, 0) + assertStat(700, 0) + assertMaxStat(400, 0, false) assert.Equal(t, false, d.Record(context.Background(), ms(300), 3, nil)) - assertQPS(100, 0) - assertMaxQPS(300, 0, false) + assertStat(100, 0) + assertMaxStat(300, 0, false) assert.Equal(t, false, d.Record(context.Background(), ms(900), 1, nil)) - assertQPS(0, 0) - assertMaxQPS(900, 0, false) + assertStat(0, 0) + assertMaxStat(900, 0, false) assert.Equal(t, false, d.Record(context.Background(), ms(1099), 1, nil)) - assertQPS(0, 0) - assertMaxQPS(1099, 0, false) + assertStat(0, 0) + assertMaxStat(1099, 0, false) // Now 9 operations happened in the interval [100, 1099]. The next higher // timestamp will decide whether to engage the split finder. @@ -92,20 +127,20 @@ func TestDecider(t *testing.T) { // It won't engage because the duration between the rollovers is 1.1s, and // we had 10 events over that interval. assert.Equal(t, false, d.Record(context.Background(), ms(1200), 1, nil)) - assertQPS(0, float64(10)/float64(1.1)) - assert.Equal(t, ms(1200), d.mu.lastQPSRollover) - assertMaxQPS(1099, 0, false) + assertStat(0, float64(10)/float64(1.1)) + assert.Equal(t, ms(1200), d.mu.lastStatRollover) + assertMaxStat(1099, 0, false) assert.Equal(t, nil, d.mu.splitFinder) assert.Equal(t, false, d.Record(context.Background(), ms(2199), 12, nil)) assert.Equal(t, nil, d.mu.splitFinder) - // 2200 is the next rollover point, and 12+1=13 qps should be computed. + // 2200 is the next rollover point, and 12+1=13 stat should be computed. assert.Equal(t, false, d.Record(context.Background(), ms(2200), 1, op("a"))) - assert.Equal(t, ms(2200), d.mu.lastQPSRollover) - assertQPS(0, float64(13)) - assertMaxQPS(2200, 13, true) + assert.Equal(t, ms(2200), d.mu.lastStatRollover) + assertStat(0, float64(13)) + assertMaxStat(2200, 13, true) assert.NotNil(t, d.mu.splitFinder) assert.False(t, d.mu.splitFinder.Ready(ms(10))) @@ -132,17 +167,17 @@ func TestDecider(t *testing.T) { o = op("a") } assert.False(t, d.Record(context.Background(), ms(tick), 11, o)) - assert.True(t, d.LastQPS(context.Background(), ms(tick)) > 1.0) + assert.True(t, d.LastStat(context.Background(), ms(tick)) > 1.0) // Even though the split key remains. assert.Equal(t, roachpb.Key("z"), d.MaybeSplitKey(context.Background(), ms(tick+999))) tick += 1000 } // But after minSplitSuggestionInterval of ticks, we get another one. assert.True(t, d.Record(context.Background(), ms(tick), 11, op("a"))) - assertQPS(tick, float64(11)) - assertMaxQPS(tick, 11, true) + assertStat(tick, float64(11)) + assertMaxStat(tick, 11, true) - // Split key suggestion vanishes once qps drops. + // Split key suggestion vanishes once stat drops. tick += 1000 assert.False(t, d.Record(context.Background(), ms(tick), 9, op("a"))) assert.Equal(t, roachpb.Key(nil), d.MaybeSplitKey(context.Background(), ms(tick))) @@ -192,24 +227,31 @@ func TestDecider(t *testing.T) { assert.Nil(t, d.mu.splitFinder) } -func TestDecider_MaxQPS(t *testing.T) { +func TestDecider_MaxStat(t *testing.T) { defer leaktest.AfterTest(t)() rand := rand.New(rand.NewSource(11)) + loadSplitConfig := testLoadSplitConfig{ + randSource: rand, + useWeighted: false, + statRetention: 10 * time.Second, + statThreshold: 100, + } + var d Decider - Init(&d, nil, rand, func() float64 { return 100.0 }, func() time.Duration { return 10 * time.Second }, &LoadSplitterMetrics{ + Init(&d, &loadSplitConfig, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) - assertMaxQPS := func(i int, expMaxQPS float64, expOK bool) { + assertMaxStat := func(i int, expMaxStat float64, expOK bool) { t.Helper() - maxQPS, ok := d.MaxQPS(context.Background(), ms(i)) - assert.Equal(t, expMaxQPS, maxQPS) + maxStat, ok := d.MaxStat(context.Background(), ms(i)) + assert.Equal(t, expMaxStat, maxStat) assert.Equal(t, expOK, ok) } - assertMaxQPS(1000, 0, false) + assertMaxStat(1000, 0, false) // Record a large number of samples. d.Record(context.Background(), ms(1500), 5, nil) @@ -220,22 +262,22 @@ func TestDecider_MaxQPS(t *testing.T) { d.Record(context.Background(), ms(8000), 5, nil) d.Record(context.Background(), ms(10000), 9, nil) - assertMaxQPS(10000, 0, false) - assertMaxQPS(11000, 17, true) + assertMaxStat(10000, 0, false) + assertMaxStat(11000, 17, true) - // Record more samples with a lower QPS. + // Record more samples with a lower Stat. d.Record(context.Background(), ms(12000), 1, nil) d.Record(context.Background(), ms(13000), 4, nil) d.Record(context.Background(), ms(15000), 2, nil) d.Record(context.Background(), ms(19000), 3, nil) - assertMaxQPS(20000, 4.5, true) - assertMaxQPS(21000, 4, true) + assertMaxStat(20000, 4.5, true) + assertMaxStat(21000, 4, true) - // Add in a few QPS reading directly. + // Add in a few Stat reading directly. d.RecordMax(ms(24000), 6) - assertMaxQPS(25000, 6, true) + assertMaxStat(25000, 6, true) } func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) { @@ -243,7 +285,14 @@ func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) { rand := rand.New(rand.NewSource(11)) var d Decider - Init(&d, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ + loadSplitConfig := testLoadSplitConfig{ + randSource: rand, + useWeighted: false, + statRetention: time.Second, + statThreshold: 1, + } + + Init(&d, &loadSplitConfig, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) @@ -277,9 +326,16 @@ func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) { func TestDeciderIgnoresEnsureSafeSplitKeyOnError(t *testing.T) { defer leaktest.AfterTest(t)() rand := rand.New(rand.NewSource(11)) - var d Decider - Init(&d, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ + + loadSplitConfig := testLoadSplitConfig{ + randSource: rand, + useWeighted: false, + statRetention: time.Second, + statThreshold: 1, + } + + Init(&d, &loadSplitConfig, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) @@ -314,19 +370,19 @@ func TestDeciderIgnoresEnsureSafeSplitKeyOnError(t *testing.T) { require.Equal(t, c1().Key, k) } -func TestMaxQPSTracker(t *testing.T) { +func TestMaxStatTracker(t *testing.T) { defer leaktest.AfterTest(t)() tick := 100 minRetention := time.Second - var mt maxQPSTracker + var mt maxStatTracker mt.reset(ms(tick), minRetention) require.Equal(t, 200*time.Millisecond, mt.windowWidth()) - // Check the maxQPS returns false before any samples are recorded. - qps, ok := mt.maxQPS(ms(tick), minRetention) - require.Equal(t, 0.0, qps) + // Check the maxStat returns false before any samples are recorded. + stat, ok := mt.max(ms(tick), minRetention) + require.Equal(t, 0.0, stat) require.Equal(t, false, ok) require.Equal(t, [6]float64{0, 0, 0, 0, 0, 0}, mt.windows) require.Equal(t, 0, mt.curIdx) @@ -338,9 +394,9 @@ func TestMaxQPSTracker(t *testing.T) { mt.record(ms(tick), minRetention, float64(10+i)) } - // maxQPS should still return false, but some windows should have samples. - qps, ok = mt.maxQPS(ms(tick), minRetention) - require.Equal(t, 0.0, qps) + // maxStat should still return false, but some windows should have samples. + stat, ok = mt.max(ms(tick), minRetention) + require.Equal(t, 0.0, stat) require.Equal(t, false, ok) require.Equal(t, [6]float64{12, 16, 20, 24, 0, 0}, mt.windows) require.Equal(t, 3, mt.curIdx) @@ -351,10 +407,10 @@ func TestMaxQPSTracker(t *testing.T) { mt.record(ms(tick), minRetention, float64(24+i)) } - // maxQPS should now return the maximum qps observed during the measurement + // maxStat should now return the maximum stat observed during the measurement // period. - qps, ok = mt.maxQPS(ms(tick), minRetention) - require.Equal(t, 38.0, qps) + stat, ok = mt.max(ms(tick), minRetention) + require.Equal(t, 38.0, stat) require.Equal(t, true, ok) require.Equal(t, [6]float64{35, 38, 20, 24, 27, 31}, mt.windows) require.Equal(t, 1, mt.curIdx) @@ -364,8 +420,8 @@ func TestMaxQPSTracker(t *testing.T) { tick += 500 mt.record(ms(tick), minRetention, float64(17)) - qps, ok = mt.maxQPS(ms(tick), minRetention) - require.Equal(t, 38.0, qps) + stat, ok = mt.max(ms(tick), minRetention) + require.Equal(t, 38.0, stat) require.Equal(t, true, ok) require.Equal(t, [6]float64{35, 38, 0, 0, 17, 31}, mt.windows) require.Equal(t, 4, mt.curIdx) @@ -373,8 +429,8 @@ func TestMaxQPSTracker(t *testing.T) { // A query far in the future should return 0, because this indicates no // recent activity. tick += 1900 - qps, ok = mt.maxQPS(ms(tick), minRetention) - require.Equal(t, 0.0, qps) + stat, ok = mt.max(ms(tick), minRetention) + require.Equal(t, 0.0, stat) require.Equal(t, true, ok) require.Equal(t, [6]float64{0, 0, 0, 0, 0, 0}, mt.windows) require.Equal(t, 0, mt.curIdx) @@ -386,8 +442,8 @@ func TestMaxQPSTracker(t *testing.T) { mt.record(ms(tick), minRetention, float64(33+i)) } - qps, ok = mt.maxQPS(ms(tick), minRetention) - require.Equal(t, 47.0, qps) + stat, ok = mt.max(ms(tick), minRetention) + require.Equal(t, 47.0, stat) require.Equal(t, true, ok) require.Equal(t, [6]float64{35, 39, 43, 47, 0, 0}, mt.windows) require.Equal(t, 3, mt.curIdx) @@ -398,8 +454,8 @@ func TestMaxQPSTracker(t *testing.T) { mt.record(ms(tick), minRetention, float64(13+i)) } - qps, ok = mt.maxQPS(ms(tick), minRetention) - require.Equal(t, 0.0, qps) + stat, ok = mt.max(ms(tick), minRetention) + require.Equal(t, 0.0, stat) require.Equal(t, false, ok) require.Equal(t, [6]float64{20, 27, 0, 0, 0, 0}, mt.windows) require.Equal(t, 1, mt.curIdx) @@ -411,7 +467,14 @@ func TestDeciderMetrics(t *testing.T) { timeStart := 1000 var dPopular Decider - Init(&dPopular, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ + loadSplitConfig := testLoadSplitConfig{ + randSource: rand, + useWeighted: false, + statRetention: time.Second, + statThreshold: 1, + } + + Init(&dPopular, &loadSplitConfig, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) @@ -433,10 +496,11 @@ func TestDeciderMetrics(t *testing.T) { // No split key, not popular key var dNotPopular Decider - Init(&dNotPopular, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ + Init(&dNotPopular, &loadSplitConfig, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) + for i := 0; i < 20; i++ { dNotPopular.Record(context.Background(), ms(timeStart), 1, func() roachpb.Span { return roachpb.Span{Key: keys.SystemSQLCodec.TablePrefix(uint32(0))} @@ -453,7 +517,7 @@ func TestDeciderMetrics(t *testing.T) { // No split key, all insufficient counters var dAllInsufficientCounters Decider - Init(&dAllInsufficientCounters, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ + Init(&dAllInsufficientCounters, &loadSplitConfig, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) diff --git a/pkg/kv/kvserver/split_queue.go b/pkg/kv/kvserver/split_queue.go index 56adf14df73b..bb16e18fa8aa 100644 --- a/pkg/kv/kvserver/split_queue.go +++ b/pkg/kv/kvserver/split_queue.go @@ -266,12 +266,14 @@ func (sq *splitQueue) processAttempt( loadStats := r.loadStats.Stats() batchHandledQPS := loadStats.QueriesPerSecond raftAppliedQPS := loadStats.WriteKeysPerSecond - splitQPS := r.loadBasedSplitter.LastQPS(ctx, now) + lastSplitStat := r.loadBasedSplitter.LastStat(ctx, now) + splitObj := sq.store.splitConfig.SplitObjective() reason := fmt.Sprintf( - "load at key %s (%.2f splitQPS, %.2f batches/sec, %.2f raft mutations/sec)", + "load at key %s (%s %s, %.2f batches/sec, %.2f raft mutations/sec)", splitByLoadKey, - splitQPS, + splitObj, + splitObj.Format(lastSplitStat), batchHandledQPS, raftAppliedQPS, ) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 246016ba023d..8aa606fd8f06 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -758,6 +758,7 @@ type Store struct { ctSender *sidetransport.Sender storeGossip *StoreGossip rebalanceObjManager *RebalanceObjectiveManager + splitConfig *replicaSplitConfig coalescedMu struct { syncutil.Mutex @@ -1214,7 +1215,7 @@ func NewStore( allocatorStorePool, /* storeDescProvider */ allocatorStorePool, /* capacityChangeNotifier */ ) - + s.splitConfig = newReplicaSplitConfig(s.cfg.Settings, s.rebalanceObjManager) } if cfg.RPCContext != nil { s.allocator = allocatorimpl.MakeAllocator( diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index 1e3ae8260480..c50553d27712 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -2121,6 +2121,14 @@ message RangeStatsResponse { bool max_queries_per_second_set = 6; + // MaxCPUPerSecond is the maximum rate of cpu/s that the range has used at + // the leaseholder over a configured measurement period. Set to -1 if the + // replica serving the RangeStats request has not been the leaseholder long + // enough to have recorded CPU rates for at least a full measurement period. + // In such cases, the recipient should not consider the CPU value reliable + // enough to base important decisions off of. + double max_cpu_per_second = 7 [(gogoproto.customname) = "MaxCPUPerSecond"]; + // range_info contains descriptor and lease information. RangeInfo range_info = 4 [(gogoproto.nullable) = false];