Skip to content

Commit

Permalink
kvserver: introduce cpu load based splits
Browse files Browse the repository at this point in the history
This commit adds the ability to perform load based splitting with replica
cpu usage rather than queries per second. Load based splitting now will
use either cpu or qps for deciding split points, depending on the
cluster setting `kv.allocator.load_based_rebalancing.objective`.

When set to `qps`, qps is used in deciding split points and when
splitting should occur; similarly, `cpu` means that request cpu against
the leaseholder replica is to decide split points.

The split threshold when using `cpu` is the cluster setting
`kv.range_split.load_cpu_threshold` which defaults to `250ms` of cpu
time per second, i.e. a replica using 1/4 processor of a machine
consistently.

The merge queue uses the load based splitter to make decisions on
whether to merge two adjacent ranges due to low load. This commit also
updates the merge queue to be consistent with the load based splitter
signal. When switching between `qps` and `cpu`, the load based splitter
for every replica is reset to avoid spurious results.

resolves: #95377

Release note (ops change): Load based splitter now supports using request
cpu usage to split ranges. This is introduced with the previous cluster
setting `kv.allocator.load_based_rebalancing.objective`, which when set
to `cpu`, will use request cpu usage. The threshold above which
CPU usage of a range is considered for splitting is defined in the
cluster setting `kv.range_split.load_cpu_threshold`, which has a default
value of `250ms`.
  • Loading branch information
kvoli committed Feb 10, 2023
1 parent cc9ecdf commit 408e227
Show file tree
Hide file tree
Showing 21 changed files with 689 additions and 300 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ kv.closed_timestamp.follower_reads_enabled boolean true allow (all) replicas to
kv.log_range_and_node_events.enabled boolean true set to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog
kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconciling jobs with protected timestamp records
kv.range_split.by_load_enabled boolean true allow automatic splits of ranges based on where load is concentrated
kv.range_split.load_cpu_threshold duration 250ms the CPU use per second over which, the range becomes a candidate for load based splitting
kv.range_split.load_qps_threshold integer 2500 the QPS over which, the range becomes a candidate for load based splitting
kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled
kv.rangefeed.range_stuck_threshold duration 1m0s restart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.closed_timestamp.side_transport_interval takes precedence)
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
<tr><td><div id="setting-kv-log-range-and-node-events-enabled" class="anchored"><code>kv.log_range_and_node_events.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog</td></tr>
<tr><td><div id="setting-kv-protectedts-reconciliation-interval" class="anchored"><code>kv.protectedts.reconciliation.interval</code></div></td><td>duration</td><td><code>5m0s</code></td><td>the frequency for reconciling jobs with protected timestamp records</td></tr>
<tr><td><div id="setting-kv-range-split-by-load-enabled" class="anchored"><code>kv.range_split.by_load_enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>allow automatic splits of ranges based on where load is concentrated</td></tr>
<tr><td><div id="setting-kv-range-split-load-cpu-threshold" class="anchored"><code>kv.range_split.load_cpu_threshold</code></div></td><td>duration</td><td><code>250ms</code></td><td>the CPU use per second over which, the range becomes a candidate for load based splitting</td></tr>
<tr><td><div id="setting-kv-range-split-load-qps-threshold" class="anchored"><code>kv.range_split.load_qps_threshold</code></div></td><td>integer</td><td><code>2500</code></td><td>the QPS over which, the range becomes a candidate for load based splitting</td></tr>
<tr><td><div id="setting-kv-rangefeed-enabled" class="anchored"><code>kv.rangefeed.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if set, rangefeed registration is enabled</td></tr>
<tr><td><div id="setting-kv-rangefeed-range-stuck-threshold" class="anchored"><code>kv.rangefeed.range_stuck_threshold</code></div></td><td>duration</td><td><code>1m0s</code></td><td>restart rangefeeds if they don&#39;t emit anything for the specified threshold; 0 disables (kv.closed_timestamp.side_transport_interval takes precedence)</td></tr>
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/asim/config/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
defaultStateExchangeInterval = 10 * time.Second
defaultStateExchangeDelay = 500 * time.Millisecond
defaultSplitQPSThreshold = 2500
defaultSplitQPSRetention = 10 * time.Minute
defaultSplitStatRetention = 10 * time.Minute
defaultSeed = 42
defaultLBRebalancingMode = 2 // Leases and replicas.
defaultLBRebalancingInterval = time.Minute
Expand Down Expand Up @@ -89,9 +89,9 @@ type SimulationSettings struct {
// SplitQPSThreshold is the threshold above which a range will be a
// candidate for load based splitting.
SplitQPSThreshold float64
// SplitQPSRetention is the duration which recorded load will be retained
// SplitStatRetention is the duration which recorded load will be retained
// and factored into load based splitting decisions.
SplitQPSRetention time.Duration
SplitStatRetention time.Duration
// LBRebalancingMode controls if and when we do store-level rebalancing
// based on load. It maps to kvserver.LBRebalancingMode.
LBRebalancingMode int64
Expand Down Expand Up @@ -125,7 +125,7 @@ func DefaultSimulationSettings() *SimulationSettings {
StateExchangeInterval: defaultStateExchangeInterval,
StateExchangeDelay: defaultStateExchangeDelay,
SplitQPSThreshold: defaultSplitQPSThreshold,
SplitQPSRetention: defaultSplitQPSRetention,
SplitStatRetention: defaultSplitStatRetention,
LBRebalancingMode: defaultLBRebalancingMode,
LBRebalancingObjective: defaultLBRebalancingObjective,
LBRebalancingInterval: defaultLBRebalancingInterval,
Expand Down Expand Up @@ -167,6 +167,6 @@ func (s *SimulationSettings) SplitQPSThresholdFn() func() float64 {
// split decisions.
func (s *SimulationSettings) SplitQPSRetentionFn() func() time.Duration {
return func() time.Duration {
return s.SplitQPSRetention
return s.SplitStatRetention
}
}
5 changes: 1 addition & 4 deletions pkg/kv/kvserver/asim/state/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,7 @@ func (s *state) AddStore(nodeID NodeID) (Store, bool) {
s.stores[storeID] = store

// Add a range load splitter for this store.
s.loadsplits[storeID] = NewSplitDecider(s.settings.Seed,
s.settings.SplitQPSThresholdFn(),
s.settings.SplitQPSRetentionFn(),
)
s.loadsplits[storeID] = NewSplitDecider(s.settings)

return store, true
}
Expand Down
49 changes: 34 additions & 15 deletions pkg/kv/kvserver/asim/state/split_decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -38,32 +39,50 @@ type LoadSplitter interface {
ResetRange(rangeID RangeID)
}

type loadSplitConfig struct {
randSource split.RandSource
settings *config.SimulationSettings
}

// NewLoadBasedSplitter returns a new LoadBasedSplitter that may be used to
// find the midpoint based on recorded load.
func (lsc loadSplitConfig) NewLoadBasedSplitter(startTime time.Time) split.LoadBasedSplitter {
return split.NewUnweightedFinder(startTime, lsc.randSource)
}

// StatRetention returns the duration that recorded load is to be retained.
func (lsc loadSplitConfig) StatRetention() time.Duration {
return lsc.settings.SplitStatRetention
}

// StatThreshold returns the threshold for load above which the range
// should be considered split.
func (lsc loadSplitConfig) StatThreshold() float64 {
return lsc.settings.SplitQPSThreshold
}

// SplitDecider implements the LoadSplitter interface.
type SplitDecider struct {
deciders map[RangeID]*split.Decider
suggestions []RangeID
qpsThreshold func() float64
qpsRetention func() time.Duration
seed int64
deciders map[RangeID]*split.Decider
suggestions []RangeID
splitConfig split.LoadSplitConfig
}

// NewSplitDecider returns a new SplitDecider.
func NewSplitDecider(
seed int64, qpsThresholdFn func() float64, qpsRetentionFn func() time.Duration,
) *SplitDecider {
func NewSplitDecider(settings *config.SimulationSettings) *SplitDecider {
return &SplitDecider{
deciders: make(map[RangeID]*split.Decider),
suggestions: []RangeID{},
seed: seed,
qpsThreshold: qpsThresholdFn,
qpsRetention: qpsRetentionFn,
deciders: make(map[RangeID]*split.Decider),
suggestions: []RangeID{},
splitConfig: loadSplitConfig{
randSource: rand.New(rand.NewSource(settings.Seed)),
settings: settings,
},
}
}

func (s *SplitDecider) newDecider() *split.Decider {
rand := rand.New(rand.NewSource(s.seed))
decider := &split.Decider{}
split.Init(decider, nil, rand, s.qpsThreshold, s.qpsRetention, &split.LoadSplitterMetrics{
split.Init(decider, s.splitConfig, &split.LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down
25 changes: 12 additions & 13 deletions pkg/kv/kvserver/asim/state/split_decider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,19 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload"
"github.com/stretchr/testify/require"
)

var testingSequence = []Key{10, 1, 9, 2, 8, 3, 4, 7, 5, 6}

func TestSplitDecider(t *testing.T) {
testingSeed := 42
testingThreshold := func() float64 { return 2500 }
testingRetention := func() time.Duration { return 60 * time.Second }
testSettings := config.DefaultSimulationSettings()
testSettings.SplitQPSThreshold = 2500
testSettings.SplitStatRetention = 60 * time.Second
startTime := TestingStartTime()
decider := NewSplitDecider(int64(testingSeed), testingThreshold, testingRetention)
decider := NewSplitDecider(testSettings)

// A decider should be created for a range when a load event is first
// recorded against it.
Expand All @@ -45,8 +46,8 @@ func TestSplitDecider(t *testing.T) {
sequence := testingSequence

// Register load greater than the threshold.
for i := 0; int64(i) < int64(testingRetention()/time.Second); i++ {
for j := 0; j < int(testingThreshold())+100; j++ {
for i := 0; int64(i) < int64(testSettings.SplitStatRetention/time.Second); i++ {
for j := 0; j < int(testSettings.SplitQPSThreshold)+100; j++ {
decider.Record(
OffsetTick(startTime, int64(i)),
1,
Expand All @@ -58,7 +59,7 @@ func TestSplitDecider(t *testing.T) {
// There should now be 1 suggested range for splitting which corresponds to
// the midpoint of the testing sequence.
require.Equal(t, []RangeID{1}, decider.ClearSplitKeys())
splitKey, found = decider.SplitKey(startTime.Add(testingRetention()), 1)
splitKey, found = decider.SplitKey(startTime.Add(testSettings.SplitStatRetention), 1)
require.True(t, found)
require.Equal(t, Key(6), splitKey)

Expand All @@ -67,7 +68,6 @@ func TestSplitDecider(t *testing.T) {
}

func TestSplitDeciderWorkload(t *testing.T) {
testingSeed := 42
testingRangeID := FirstRangeID
startTime := TestingStartTime()

Expand Down Expand Up @@ -105,11 +105,10 @@ func TestSplitDeciderWorkload(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
splitDecider := NewSplitDecider(
int64(testingSeed),
func() float64 { return tc.threshold },
func() time.Duration { return tc.retention },
)
testSettings := config.DefaultSimulationSettings()
testSettings.SplitQPSThreshold = tc.threshold
testSettings.SplitStatRetention = tc.retention
splitDecider := NewSplitDecider(testSettings)
lastTick := int64(0)

for _, tick := range tc.ticks {
Expand Down
18 changes: 13 additions & 5 deletions pkg/kv/kvserver/batcheval/cmd_range_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,20 @@ func RangeStats(
) (result.Result, error) {
reply := resp.(*roachpb.RangeStatsResponse)
reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats()
if qps, ok := cArgs.EvalCtx.GetMaxSplitQPS(ctx); ok {
reply.MaxQueriesPerSecond = qps
} else {
// See comment on MaxQueriesPerSecond. -1 means !ok.
reply.MaxQueriesPerSecond = -1

maxQPS, qpsOK := cArgs.EvalCtx.GetMaxSplitQPS(ctx)
maxCPU, cpuOK := cArgs.EvalCtx.GetMaxSplitCPU(ctx)
// See comment on MaxQueriesPerSecond and MaxCPUPerSecond. -1 means !ok.
reply.MaxCPUPerSecond, reply.MaxQueriesPerSecond = -1, -1
// NB: We don't expect both cpuOk and qpsOK to be true, however we don't
// prevent both being set.
if qpsOK {
reply.MaxQueriesPerSecond = maxQPS
}
if cpuOK {
reply.MaxCPUPerSecond = maxCPU
}

reply.MaxQueriesPerSecondSet = true
reply.RangeInfo = cArgs.EvalCtx.GetRangeInfo(ctx)
return result.Result{}, nil
Expand Down
15 changes: 13 additions & 2 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,20 @@ type EvalContext interface {
// results due to concurrent writes.
GetMVCCStats() enginepb.MVCCStats

// GetMaxSplitQPS returns the Replicas maximum queries/s request rate over a
// configured retention period.
// GetMaxSplitQPS returns the Replica's maximum queries/s request rate over
// a configured retention period.
//
// NOTE: This should not be used when the load based splitting cluster setting
// is disabled.
GetMaxSplitQPS(context.Context) (float64, bool)

// GetMaxSplitCPU returns the Replica's maximum request cpu/s rate over a
// configured retention period.
//
// NOTE: This should not be used when the load based splitting cluster setting
// is disabled.
GetMaxSplitCPU(context.Context) (float64, bool)

GetGCThreshold() hlc.Timestamp
ExcludeDataFromBackup() bool
GetLastReplicaGCTimestamp(context.Context) (hlc.Timestamp, error)
Expand Down Expand Up @@ -161,6 +168,7 @@ type MockEvalCtx struct {
Clock *hlc.Clock
Stats enginepb.MVCCStats
QPS float64
CPU float64
AbortSpan *abortspan.AbortSpan
GCThreshold hlc.Timestamp
Term, FirstIndex uint64
Expand Down Expand Up @@ -240,6 +248,9 @@ func (m *mockEvalCtxImpl) GetMVCCStats() enginepb.MVCCStats {
func (m *mockEvalCtxImpl) GetMaxSplitQPS(context.Context) (float64, bool) {
return m.QPS, true
}
func (m *mockEvalCtxImpl) GetMaxSplitCPU(context.Context) (float64, bool) {
return m.CPU, true
}
func (m *mockEvalCtxImpl) CanCreateTxnRecord(
context.Context, uuid.UUID, []byte, hlc.Timestamp,
) (bool, roachpb.TransactionAbortedReason) {
Expand Down
Loading

0 comments on commit 408e227

Please sign in to comment.