Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: introduce cpu load based splits #96128

Merged
merged 2 commits into from
Feb 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ kv.closed_timestamp.follower_reads_enabled boolean true allow (all) replicas to
kv.log_range_and_node_events.enabled boolean true set to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog
kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconciling jobs with protected timestamp records
kv.range_split.by_load_enabled boolean true allow automatic splits of ranges based on where load is concentrated
kv.range_split.load_cpu_threshold duration 250ms the CPU use per second over which, the range becomes a candidate for load based splitting
kv.range_split.load_qps_threshold integer 2500 the QPS over which, the range becomes a candidate for load based splitting
kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled
kv.rangefeed.range_stuck_threshold duration 1m0s restart rangefeeds if they don't emit anything for the specified threshold; 0 disables (kv.closed_timestamp.side_transport_interval takes precedence)
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
<tr><td><div id="setting-kv-log-range-and-node-events-enabled" class="anchored"><code>kv.log_range_and_node_events.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog</td></tr>
<tr><td><div id="setting-kv-protectedts-reconciliation-interval" class="anchored"><code>kv.protectedts.reconciliation.interval</code></div></td><td>duration</td><td><code>5m0s</code></td><td>the frequency for reconciling jobs with protected timestamp records</td></tr>
<tr><td><div id="setting-kv-range-split-by-load-enabled" class="anchored"><code>kv.range_split.by_load_enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>allow automatic splits of ranges based on where load is concentrated</td></tr>
<tr><td><div id="setting-kv-range-split-load-cpu-threshold" class="anchored"><code>kv.range_split.load_cpu_threshold</code></div></td><td>duration</td><td><code>250ms</code></td><td>the CPU use per second over which, the range becomes a candidate for load based splitting</td></tr>
<tr><td><div id="setting-kv-range-split-load-qps-threshold" class="anchored"><code>kv.range_split.load_qps_threshold</code></div></td><td>integer</td><td><code>2500</code></td><td>the QPS over which, the range becomes a candidate for load based splitting</td></tr>
<tr><td><div id="setting-kv-rangefeed-enabled" class="anchored"><code>kv.rangefeed.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if set, rangefeed registration is enabled</td></tr>
<tr><td><div id="setting-kv-rangefeed-range-stuck-threshold" class="anchored"><code>kv.rangefeed.range_stuck_threshold</code></div></td><td>duration</td><td><code>1m0s</code></td><td>restart rangefeeds if they don&#39;t emit anything for the specified threshold; 0 disables (kv.closed_timestamp.side_transport_interval takes precedence)</td></tr>
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/asim/config/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
defaultStateExchangeInterval = 10 * time.Second
defaultStateExchangeDelay = 500 * time.Millisecond
defaultSplitQPSThreshold = 2500
defaultSplitQPSRetention = 10 * time.Minute
defaultSplitStatRetention = 10 * time.Minute
defaultSeed = 42
defaultLBRebalancingMode = 2 // Leases and replicas.
defaultLBRebalancingInterval = time.Minute
Expand Down Expand Up @@ -89,9 +89,9 @@ type SimulationSettings struct {
// SplitQPSThreshold is the threshold above which a range will be a
// candidate for load based splitting.
SplitQPSThreshold float64
// SplitQPSRetention is the duration which recorded load will be retained
// SplitStatRetention is the duration which recorded load will be retained
// and factored into load based splitting decisions.
SplitQPSRetention time.Duration
SplitStatRetention time.Duration
// LBRebalancingMode controls if and when we do store-level rebalancing
// based on load. It maps to kvserver.LBRebalancingMode.
LBRebalancingMode int64
Expand Down Expand Up @@ -125,7 +125,7 @@ func DefaultSimulationSettings() *SimulationSettings {
StateExchangeInterval: defaultStateExchangeInterval,
StateExchangeDelay: defaultStateExchangeDelay,
SplitQPSThreshold: defaultSplitQPSThreshold,
SplitQPSRetention: defaultSplitQPSRetention,
SplitStatRetention: defaultSplitStatRetention,
LBRebalancingMode: defaultLBRebalancingMode,
LBRebalancingObjective: defaultLBRebalancingObjective,
LBRebalancingInterval: defaultLBRebalancingInterval,
Expand Down Expand Up @@ -167,6 +167,6 @@ func (s *SimulationSettings) SplitQPSThresholdFn() func() float64 {
// split decisions.
func (s *SimulationSettings) SplitQPSRetentionFn() func() time.Duration {
return func() time.Duration {
return s.SplitQPSRetention
return s.SplitStatRetention
}
}
5 changes: 1 addition & 4 deletions pkg/kv/kvserver/asim/state/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,7 @@ func (s *state) AddStore(nodeID NodeID) (Store, bool) {
s.stores[storeID] = store

// Add a range load splitter for this store.
s.loadsplits[storeID] = NewSplitDecider(s.settings.Seed,
s.settings.SplitQPSThresholdFn(),
s.settings.SplitQPSRetentionFn(),
)
s.loadsplits[storeID] = NewSplitDecider(s.settings)

return store, true
}
Expand Down
49 changes: 34 additions & 15 deletions pkg/kv/kvserver/asim/state/split_decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -38,32 +39,50 @@ type LoadSplitter interface {
ResetRange(rangeID RangeID)
}

type loadSplitConfig struct {
randSource split.RandSource
settings *config.SimulationSettings
}

// NewLoadBasedSplitter returns a new LoadBasedSplitter that may be used to
// find the midpoint based on recorded load.
func (lsc loadSplitConfig) NewLoadBasedSplitter(startTime time.Time) split.LoadBasedSplitter {
return split.NewUnweightedFinder(startTime, lsc.randSource)
}

// StatRetention returns the duration that recorded load is to be retained.
func (lsc loadSplitConfig) StatRetention() time.Duration {
return lsc.settings.SplitStatRetention
}

// StatThreshold returns the threshold for load above which the range
// should be considered split.
func (lsc loadSplitConfig) StatThreshold() float64 {
return lsc.settings.SplitQPSThreshold
}

// SplitDecider implements the LoadSplitter interface.
type SplitDecider struct {
deciders map[RangeID]*split.Decider
suggestions []RangeID
qpsThreshold func() float64
qpsRetention func() time.Duration
seed int64
deciders map[RangeID]*split.Decider
suggestions []RangeID
splitConfig split.LoadSplitConfig
}

// NewSplitDecider returns a new SplitDecider.
func NewSplitDecider(
seed int64, qpsThresholdFn func() float64, qpsRetentionFn func() time.Duration,
) *SplitDecider {
func NewSplitDecider(settings *config.SimulationSettings) *SplitDecider {
return &SplitDecider{
deciders: make(map[RangeID]*split.Decider),
suggestions: []RangeID{},
seed: seed,
qpsThreshold: qpsThresholdFn,
qpsRetention: qpsRetentionFn,
deciders: make(map[RangeID]*split.Decider),
suggestions: []RangeID{},
splitConfig: loadSplitConfig{
randSource: rand.New(rand.NewSource(settings.Seed)),
settings: settings,
},
}
}

func (s *SplitDecider) newDecider() *split.Decider {
rand := rand.New(rand.NewSource(s.seed))
decider := &split.Decider{}
split.Init(decider, nil, rand, s.qpsThreshold, s.qpsRetention, &split.LoadSplitterMetrics{
split.Init(decider, s.splitConfig, &split.LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down
25 changes: 12 additions & 13 deletions pkg/kv/kvserver/asim/state/split_decider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,19 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload"
"github.com/stretchr/testify/require"
)

var testingSequence = []Key{10, 1, 9, 2, 8, 3, 4, 7, 5, 6}

func TestSplitDecider(t *testing.T) {
testingSeed := 42
testingThreshold := func() float64 { return 2500 }
testingRetention := func() time.Duration { return 60 * time.Second }
testSettings := config.DefaultSimulationSettings()
testSettings.SplitQPSThreshold = 2500
testSettings.SplitStatRetention = 60 * time.Second
startTime := TestingStartTime()
decider := NewSplitDecider(int64(testingSeed), testingThreshold, testingRetention)
decider := NewSplitDecider(testSettings)

// A decider should be created for a range when a load event is first
// recorded against it.
Expand All @@ -45,8 +46,8 @@ func TestSplitDecider(t *testing.T) {
sequence := testingSequence

// Register load greater than the threshold.
for i := 0; int64(i) < int64(testingRetention()/time.Second); i++ {
for j := 0; j < int(testingThreshold())+100; j++ {
for i := 0; int64(i) < int64(testSettings.SplitStatRetention/time.Second); i++ {
for j := 0; j < int(testSettings.SplitQPSThreshold)+100; j++ {
decider.Record(
OffsetTick(startTime, int64(i)),
1,
Expand All @@ -58,7 +59,7 @@ func TestSplitDecider(t *testing.T) {
// There should now be 1 suggested range for splitting which corresponds to
// the midpoint of the testing sequence.
require.Equal(t, []RangeID{1}, decider.ClearSplitKeys())
splitKey, found = decider.SplitKey(startTime.Add(testingRetention()), 1)
splitKey, found = decider.SplitKey(startTime.Add(testSettings.SplitStatRetention), 1)
require.True(t, found)
require.Equal(t, Key(6), splitKey)

Expand All @@ -67,7 +68,6 @@ func TestSplitDecider(t *testing.T) {
}

func TestSplitDeciderWorkload(t *testing.T) {
testingSeed := 42
testingRangeID := FirstRangeID
startTime := TestingStartTime()

Expand Down Expand Up @@ -105,11 +105,10 @@ func TestSplitDeciderWorkload(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
splitDecider := NewSplitDecider(
int64(testingSeed),
func() float64 { return tc.threshold },
func() time.Duration { return tc.retention },
)
testSettings := config.DefaultSimulationSettings()
testSettings.SplitQPSThreshold = tc.threshold
testSettings.SplitStatRetention = tc.retention
splitDecider := NewSplitDecider(testSettings)
lastTick := int64(0)

for _, tick := range tc.ticks {
Expand Down
19 changes: 13 additions & 6 deletions pkg/kv/kvserver/batcheval/cmd_range_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,20 @@ func RangeStats(
) (result.Result, error) {
reply := resp.(*roachpb.RangeStatsResponse)
reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats()
reply.DeprecatedLastQueriesPerSecond = cArgs.EvalCtx.GetLastSplitQPS(ctx)
if qps, ok := cArgs.EvalCtx.GetMaxSplitQPS(ctx); ok {
reply.MaxQueriesPerSecond = qps
} else {
// See comment on MaxQueriesPerSecond. -1 means !ok.
reply.MaxQueriesPerSecond = -1

maxQPS, qpsOK := cArgs.EvalCtx.GetMaxSplitQPS(ctx)
maxCPU, cpuOK := cArgs.EvalCtx.GetMaxSplitCPU(ctx)
// See comment on MaxQueriesPerSecond and MaxCPUPerSecond. -1 means !ok.
reply.MaxCPUPerSecond, reply.MaxQueriesPerSecond = -1, -1
// NB: We don't expect both cpuOk and qpsOK to be true, however we don't
// prevent both being set.
if qpsOK {
reply.MaxQueriesPerSecond = maxQPS
}
if cpuOK {
reply.MaxCPUPerSecond = maxCPU
}

reply.MaxQueriesPerSecondSet = true
reply.RangeInfo = cArgs.EvalCtx.GetRangeInfo(ctx)
return result.Result{}, nil
Expand Down
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,19 @@ type EvalContext interface {
// results due to concurrent writes.
GetMVCCStats() enginepb.MVCCStats

// GetMaxSplitQPS returns the Replicas maximum queries/s request rate over a
// configured retention period.
// GetMaxSplitQPS returns the Replica's maximum queries/s request rate over
// a configured retention period.
//
// NOTE: This should not be used when the load based splitting cluster setting
// is disabled.
GetMaxSplitQPS(context.Context) (float64, bool)

// GetLastSplitQPS returns the Replica's most recent queries/s request rate.
// GetMaxSplitCPU returns the Replica's maximum request cpu/s rate over a
// configured retention period.
//
// NOTE: This should not be used when the load based splitting cluster setting
// is disabled.
//
// TODO(nvanbenschoten): remove this method in v22.1.
GetLastSplitQPS(context.Context) float64
GetMaxSplitCPU(context.Context) (float64, bool)

GetGCThreshold() hlc.Timestamp
ExcludeDataFromBackup() bool
Expand Down Expand Up @@ -169,6 +168,7 @@ type MockEvalCtx struct {
Clock *hlc.Clock
Stats enginepb.MVCCStats
QPS float64
CPU float64
AbortSpan *abortspan.AbortSpan
GCThreshold hlc.Timestamp
Term, FirstIndex uint64
Expand Down Expand Up @@ -248,8 +248,8 @@ func (m *mockEvalCtxImpl) GetMVCCStats() enginepb.MVCCStats {
func (m *mockEvalCtxImpl) GetMaxSplitQPS(context.Context) (float64, bool) {
return m.QPS, true
}
func (m *mockEvalCtxImpl) GetLastSplitQPS(context.Context) float64 {
return m.QPS
func (m *mockEvalCtxImpl) GetMaxSplitCPU(context.Context) (float64, bool) {
return m.CPU, true
}
func (m *mockEvalCtxImpl) CanCreateTxnRecord(
context.Context, uuid.UUID, []byte, hlc.Timestamp,
Expand Down
Loading