Skip to content

Commit

Permalink
split: Redesign the load-based splitter to be consistent with new reb…
Browse files Browse the repository at this point in the history
…alancing signals.

Fixes: #90574

In the current load splitter, we find the split key that best balances
the QPS of the left and right sides. As a result, each request is
unweighted, since one request contributes one to the QPS. In particular,
the load splitter does not differentiate between what kinds of requests
they are, how heavy the request is, and what resources these requests
consume, which can result in scenarios where QPS is balanced but one
side has a lot more work due to a few heavy requests. Moreover, the
current load splitter treats requests that contain a split key as
“contained”. Optimizing for QPS, contained requests are bad since
splitting at a point in a contained request will not help lower the QPS
of either side. However, optimizing for other signals like CPU,
splitting at a point in a contained request is great as each side will
get part of the work of processing that request. This motivates a
redesign of the load splitter, one that enables recording weighted
requests and considers contained requests in the weight balancing for
splitting.

In this PR, we redesign the load-based splitter with the following
interface:
1. Record a point key “start” or span “[start, end)” with a weight “w”
at a specific time “ts”, where “w” is some measure of load recorded for
a span e.g. Record(ts, start, w) or Record(ts, [start, end), w)
2. Find a split key such that the load (i.e. total weight) on the
resulting split ranges would be as equal as possible according to the
recorded loads above e.g. Key()

To make the current load-based splitter (Finder) weighted, we make the
following modifications:
1. Instead of using reservoir sampling, we use weighted reservoir
sampling (a simplified version of A-Chao)
2. Remove the contained counter
3. Increment the left and right counters by the weight of the request
rather than just 1
4. Treat a weighted range request ([start, end), w) into two weighted
point requests (start, w/2) and (end, w/2)

For more details, see the design doc:
https://docs.google.com/document/d/1bdSxucz-xFzwnxL3fFXNZsRc9Vsht0oO0XuZrz5Iw84/edit#bookmark=id.xjc41tm3jx3x.

Release note (ops change): The load-based splitter has been redesigned
to be more consistent with CPU-based rebalancing rather than QPS-based
rebalancing to improve range splits. This redesign is disabled by
default currently.
  • Loading branch information
KaiSun314 authored and kvoli committed Jan 3, 2023
1 parent ab6a865 commit 09adcc0
Show file tree
Hide file tree
Showing 10 changed files with 963 additions and 161 deletions.
7 changes: 1 addition & 6 deletions pkg/kv/kvserver/asim/state/split_decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,8 @@ func NewSplitDecider(

func (s *SplitDecider) newDecider() *split.Decider {
rand := rand.New(rand.NewSource(s.seed))

intN := func(n int) int {
return rand.Intn(n)
}

decider := &split.Decider{}
split.Init(decider, intN, s.qpsThreshold, s.qpsRetention, &split.LoadSplitterMetrics{
split.Init(decider, nil, rand, s.qpsThreshold, s.qpsRetention, &split.LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ func newUnloadedReplica(
r.mu.stateLoader = stateloader.Make(desc.RangeID)
r.mu.quiescent = true
r.mu.conf = store.cfg.DefaultSpanConfig
split.Init(&r.loadBasedSplitter, rand.Intn, func() float64 {
randSource := rand.New(rand.NewSource(timeutil.Now().UnixNano()))
split.Init(&r.loadBasedSplitter, store.cfg.Settings, randSource, func() float64 {
return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV))
}, func() time.Duration {
return kvserverbase.SplitByLoadMergeDelay.Get(&store.cfg.Settings.SV)
Expand Down
9 changes: 6 additions & 3 deletions pkg/kv/kvserver/split/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ go_library(
name = "split",
srcs = [
"decider.go",
"finder.go",
"unweighted_finder.go",
"weighted_finder.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split",
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/syncutil",
"@org_golang_x_exp//rand",
],
)

Expand All @@ -24,8 +26,9 @@ go_test(
size = "small",
srcs = [
"decider_test.go",
"finder_test.go",
"load_based_splitter_test.go",
"unweighted_finder_test.go",
"weighted_finder_test.go",
],
args = ["-test.timeout=55s"],
embed = [":split"],
Expand Down
77 changes: 65 additions & 12 deletions pkg/kv/kvserver/split/decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ package split

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand All @@ -27,6 +30,50 @@ const minSplitSuggestionInterval = time.Minute
const minNoSplitKeyLoggingMetricsInterval = time.Minute
const minQueriesPerSecondSampleDuration = time.Second

type LoadBasedSplitter interface {
// Record informs the LoadBasedSplitter about where the span lies with regard
// to the keys in the samples.
Record(span roachpb.Span, weight float64)

// Key finds an appropriate split point from the sampled candidate split
// keys. Returns a nil key if no appropriate key was found.
Key() roachpb.Key

// Ready checks if the LoadBasedSplitter has been initialized with a
// sufficient sample duration.
Ready(nowTime time.Time) bool

// NoSplitKeyCauseLogMsg returns a log message containing information on the
// number of samples that don't pass each split key requirement if not all
// samples are invalid due to insufficient counters, otherwise returns an
// empty string.
NoSplitKeyCauseLogMsg() string

// PopularKeyFrequency returns the percentage that the most popular key
// appears in the sampled candidate split keys.
PopularKeyFrequency() float64
}

type RandSource interface {
// Float64 returns, as a float64, a pseudo-random number in the half-open
// interval [0.0,1.0) from the RandSource.
Float64() float64

// Intn returns, as an int, a non-negative pseudo-random number in the
// half-open interval [0,n).
Intn(n int) int
}

var enableUnweightedLBSplitFinder = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.unweighted_lb_split_finder.enabled",
"if enabled, use the un-weighted finder for load-based splitting; "+
"the unweighted finder will attempt to find a key during when splitting "+
"a range based on load that evenly divides the QPS among the resulting "+
"left and right hand side ranges",
true,
)

// A Decider collects measurements about the activity (measured in qps) on a
// Replica and, assuming that qps thresholds are exceeded, tries to determine a
// split key that would approximately result in halving the load on each of the
Expand Down Expand Up @@ -63,7 +110,8 @@ type LoadSplitterMetrics struct {
// incoming requests to find potential split keys and checks if sampled
// candidate split keys satisfy certain requirements.
type Decider struct {
intn func(n int) int // supplied to Init
st *cluster.Settings // supplied to Init
randSource RandSource // supplied to Init
qpsThreshold func() float64 // supplied to Init
qpsRetention func() time.Duration // supplied to Init
loadSplitterMetrics *LoadSplitterMetrics // supplied to Init
Expand All @@ -80,8 +128,8 @@ type Decider struct {
maxQPS maxQPSTracker

// Fields tracking split key suggestions.
splitFinder *Finder // populated when engaged or decided
lastSplitSuggestion time.Time // last stipulation to client to carry out split
splitFinder LoadBasedSplitter // populated when engaged or decided
lastSplitSuggestion time.Time // last stipulation to client to carry out split

// Fields tracking logging / metrics around load-based splitter split key.
lastNoSplitKeyLoggingMetrics time.Time
Expand All @@ -94,12 +142,14 @@ type Decider struct {
// may exist in the system at any given point in time.
func Init(
lbs *Decider,
intn func(n int) int,
st *cluster.Settings,
randSource RandSource,
qpsThreshold func() float64,
qpsRetention func() time.Duration,
loadSplitterMetrics *LoadSplitterMetrics,
) {
lbs.intn = intn
lbs.st = st
lbs.randSource = randSource
lbs.qpsThreshold = qpsThreshold
lbs.qpsRetention = qpsRetention
lbs.loadSplitterMetrics = loadSplitterMetrics
Expand Down Expand Up @@ -147,7 +197,11 @@ func (d *Decider) recordLocked(
// to be used.
if d.mu.lastQPS >= d.qpsThreshold() {
if d.mu.splitFinder == nil {
d.mu.splitFinder = NewFinder(now)
if d.st == nil || enableUnweightedLBSplitFinder.Get(&d.st.SV) {
d.mu.splitFinder = NewUnweightedFinder(now, d.randSource)
} else {
d.mu.splitFinder = NewWeightedFinder(now, d.randSource)
}
}
} else {
d.mu.splitFinder = nil
Expand All @@ -157,7 +211,7 @@ func (d *Decider) recordLocked(
if d.mu.splitFinder != nil && n != 0 {
s := span()
if s.Key != nil {
d.mu.splitFinder.Record(span(), d.intn)
d.mu.splitFinder.Record(span(), 1)
}
if d.mu.splitFinder.Ready(now) {
if d.mu.splitFinder.Key() != nil {
Expand All @@ -168,16 +222,15 @@ func (d *Decider) recordLocked(
} else {
if now.Sub(d.mu.lastNoSplitKeyLoggingMetrics) > minNoSplitKeyLoggingMetricsInterval {
d.mu.lastNoSplitKeyLoggingMetrics = now
insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained := d.mu.splitFinder.NoSplitKeyCause()
if insufficientCounters < splitKeySampleSize {
noSplitKeyCauseLogMsg := d.mu.splitFinder.NoSplitKeyCauseLogMsg()
if noSplitKeyCauseLogMsg != "" {
popularKeyFrequency := d.mu.splitFinder.PopularKeyFrequency()
noSplitKeyCauseLogMsg += fmt.Sprintf(", most popular key occurs in %d%% of samples", int(popularKeyFrequency*100))
log.KvDistribution.Infof(ctx, "%s", noSplitKeyCauseLogMsg)
if popularKeyFrequency >= splitKeyThreshold {
d.loadSplitterMetrics.PopularKeyCount.Inc(1)
}
d.loadSplitterMetrics.NoSplitKeyCount.Inc(1)
log.KvDistribution.Infof(ctx,
"No split key found: insufficient counters = %d, imbalance = %d, too many contained = %d, imbalance and too many contained = %d, most popular key occurs in %d%% of samples",
insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained, int(popularKeyFrequency*100))
}
}
}
Expand Down
34 changes: 16 additions & 18 deletions pkg/kv/kvserver/split/decider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ func ms(i int) time.Time {
func TestDecider(t *testing.T) {
defer leaktest.AfterTest(t)()

intn := rand.New(rand.NewSource(12)).Intn
rand := rand.New(rand.NewSource(12))

var d Decider
Init(&d, intn, func() float64 { return 10.0 }, func() time.Duration { return 2 * time.Second }, &LoadSplitterMetrics{
Init(&d, nil, rand, func() float64 { return 10.0 }, func() time.Duration { return 2 * time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down Expand Up @@ -96,12 +96,10 @@ func TestDecider(t *testing.T) {
assert.Equal(t, ms(1200), d.mu.lastQPSRollover)
assertMaxQPS(1099, 0, false)

var nilFinder *Finder

assert.Equal(t, nilFinder, d.mu.splitFinder)
assert.Equal(t, nil, d.mu.splitFinder)

assert.Equal(t, false, d.Record(context.Background(), ms(2199), 12, nil))
assert.Equal(t, nilFinder, d.mu.splitFinder)
assert.Equal(t, nil, d.mu.splitFinder)

// 2200 is the next rollover point, and 12+1=13 qps should be computed.
assert.Equal(t, false, d.Record(context.Background(), ms(2200), 1, op("a")))
Expand Down Expand Up @@ -148,7 +146,7 @@ func TestDecider(t *testing.T) {
tick += 1000
assert.False(t, d.Record(context.Background(), ms(tick), 9, op("a")))
assert.Equal(t, roachpb.Key(nil), d.MaybeSplitKey(context.Background(), ms(tick)))
assert.Equal(t, nilFinder, d.mu.splitFinder)
assert.Equal(t, nil, d.mu.splitFinder)

// Hammer a key with writes above threshold. There shouldn't be a split
// since everyone is hitting the same key and load can't be balanced.
Expand All @@ -166,7 +164,7 @@ func TestDecider(t *testing.T) {
}

// ... which we verify by looking at its samples directly.
for _, sample := range d.mu.splitFinder.samples {
for _, sample := range d.mu.splitFinder.(*UnweightedFinder).samples {
assert.Equal(t, roachpb.Key("p"), sample.key)
}

Expand Down Expand Up @@ -196,10 +194,10 @@ func TestDecider(t *testing.T) {

func TestDecider_MaxQPS(t *testing.T) {
defer leaktest.AfterTest(t)()
intn := rand.New(rand.NewSource(11)).Intn
rand := rand.New(rand.NewSource(11))

var d Decider
Init(&d, intn, func() float64 { return 100.0 }, func() time.Duration { return 10 * time.Second }, &LoadSplitterMetrics{
Init(&d, nil, rand, func() float64 { return 100.0 }, func() time.Duration { return 10 * time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down Expand Up @@ -242,10 +240,10 @@ func TestDecider_MaxQPS(t *testing.T) {

func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) {
defer leaktest.AfterTest(t)()
intn := rand.New(rand.NewSource(11)).Intn
rand := rand.New(rand.NewSource(11))

var d Decider
Init(&d, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
Init(&d, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down Expand Up @@ -278,10 +276,10 @@ func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) {

func TestDeciderIgnoresEnsureSafeSplitKeyOnError(t *testing.T) {
defer leaktest.AfterTest(t)()
intn := rand.New(rand.NewSource(11)).Intn
rand := rand.New(rand.NewSource(11))

var d Decider
Init(&d, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
Init(&d, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down Expand Up @@ -409,11 +407,11 @@ func TestMaxQPSTracker(t *testing.T) {

func TestDeciderMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
intn := rand.New(rand.NewSource(11)).Intn
rand := rand.New(rand.NewSource(11))
timeStart := 1000

var dPopular Decider
Init(&dPopular, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
Init(&dPopular, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand All @@ -435,7 +433,7 @@ func TestDeciderMetrics(t *testing.T) {

// No split key, not popular key
var dNotPopular Decider
Init(&dNotPopular, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
Init(&dNotPopular, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand All @@ -455,7 +453,7 @@ func TestDeciderMetrics(t *testing.T) {

// No split key, all insufficient counters
var dAllInsufficientCounters Decider
Init(&dAllInsufficientCounters, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
Init(&dAllInsufficientCounters, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down
Loading

0 comments on commit 09adcc0

Please sign in to comment.