Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split: Redesign the load-based splitter to be consistent with new rebalancing signals. #93838

Merged
merged 1 commit into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions pkg/kv/kvserver/asim/state/split_decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,8 @@ func NewSplitDecider(

func (s *SplitDecider) newDecider() *split.Decider {
rand := rand.New(rand.NewSource(s.seed))

intN := func(n int) int {
return rand.Intn(n)
}

decider := &split.Decider{}
split.Init(decider, intN, s.qpsThreshold, s.qpsRetention, &split.LoadSplitterMetrics{
split.Init(decider, nil, rand, s.qpsThreshold, s.qpsRetention, &split.LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ func newUnloadedReplica(
r.mu.stateLoader = stateloader.Make(desc.RangeID)
r.mu.quiescent = true
r.mu.conf = store.cfg.DefaultSpanConfig
split.Init(&r.loadBasedSplitter, rand.Intn, func() float64 {
randSource := rand.New(rand.NewSource(timeutil.Now().UnixNano()))
split.Init(&r.loadBasedSplitter, store.cfg.Settings, randSource, func() float64 {
return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV))
}, func() time.Duration {
return kvserverbase.SplitByLoadMergeDelay.Get(&store.cfg.Settings.SV)
Expand Down
9 changes: 6 additions & 3 deletions pkg/kv/kvserver/split/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ go_library(
name = "split",
srcs = [
"decider.go",
"finder.go",
"unweighted_finder.go",
"weighted_finder.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split",
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/syncutil",
"@org_golang_x_exp//rand",
],
)

Expand All @@ -24,8 +26,9 @@ go_test(
size = "small",
srcs = [
"decider_test.go",
"finder_test.go",
"load_based_splitter_test.go",
"unweighted_finder_test.go",
"weighted_finder_test.go",
],
args = ["-test.timeout=55s"],
embed = [":split"],
Expand Down
77 changes: 65 additions & 12 deletions pkg/kv/kvserver/split/decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ package split

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand All @@ -27,6 +30,50 @@ const minSplitSuggestionInterval = time.Minute
const minNoSplitKeyLoggingMetricsInterval = time.Minute
const minQueriesPerSecondSampleDuration = time.Second

type LoadBasedSplitter interface {
// Record informs the LoadBasedSplitter about where the span lies with regard
// to the keys in the samples.
Record(span roachpb.Span, weight float64)

// Key finds an appropriate split point from the sampled candidate split
// keys. Returns a nil key if no appropriate key was found.
Key() roachpb.Key

// Ready checks if the LoadBasedSplitter has been initialized with a
// sufficient sample duration.
Ready(nowTime time.Time) bool

// NoSplitKeyCauseLogMsg returns a log message containing information on the
// number of samples that don't pass each split key requirement if not all
// samples are invalid due to insufficient counters, otherwise returns an
// empty string.
NoSplitKeyCauseLogMsg() string

// PopularKeyFrequency returns the percentage that the most popular key
// appears in the sampled candidate split keys.
PopularKeyFrequency() float64
}

type RandSource interface {
// Float64 returns, as a float64, a pseudo-random number in the half-open
// interval [0.0,1.0) from the RandSource.
Float64() float64

// Intn returns, as an int, a non-negative pseudo-random number in the
// half-open interval [0,n).
Intn(n int) int
}

var enableUnweightedLBSplitFinder = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.unweighted_lb_split_finder.enabled",
"if enabled, use the un-weighted finder for load-based splitting; "+
"the unweighted finder will attempt to find a key during when splitting "+
"a range based on load that evenly divides the QPS among the resulting "+
"left and right hand side ranges",
true,
)

// A Decider collects measurements about the activity (measured in qps) on a
// Replica and, assuming that qps thresholds are exceeded, tries to determine a
// split key that would approximately result in halving the load on each of the
Expand Down Expand Up @@ -63,7 +110,8 @@ type LoadSplitterMetrics struct {
// incoming requests to find potential split keys and checks if sampled
// candidate split keys satisfy certain requirements.
type Decider struct {
intn func(n int) int // supplied to Init
st *cluster.Settings // supplied to Init
randSource RandSource // supplied to Init
qpsThreshold func() float64 // supplied to Init
qpsRetention func() time.Duration // supplied to Init
loadSplitterMetrics *LoadSplitterMetrics // supplied to Init
Expand All @@ -80,8 +128,8 @@ type Decider struct {
maxQPS maxQPSTracker

// Fields tracking split key suggestions.
splitFinder *Finder // populated when engaged or decided
lastSplitSuggestion time.Time // last stipulation to client to carry out split
splitFinder LoadBasedSplitter // populated when engaged or decided
lastSplitSuggestion time.Time // last stipulation to client to carry out split

// Fields tracking logging / metrics around load-based splitter split key.
lastNoSplitKeyLoggingMetrics time.Time
Expand All @@ -94,12 +142,14 @@ type Decider struct {
// may exist in the system at any given point in time.
func Init(
lbs *Decider,
intn func(n int) int,
st *cluster.Settings,
randSource RandSource,
qpsThreshold func() float64,
qpsRetention func() time.Duration,
loadSplitterMetrics *LoadSplitterMetrics,
) {
lbs.intn = intn
lbs.st = st
lbs.randSource = randSource
lbs.qpsThreshold = qpsThreshold
lbs.qpsRetention = qpsRetention
lbs.loadSplitterMetrics = loadSplitterMetrics
Expand Down Expand Up @@ -147,7 +197,11 @@ func (d *Decider) recordLocked(
// to be used.
if d.mu.lastQPS >= d.qpsThreshold() {
if d.mu.splitFinder == nil {
d.mu.splitFinder = NewFinder(now)
if d.st == nil || enableUnweightedLBSplitFinder.Get(&d.st.SV) {
d.mu.splitFinder = NewUnweightedFinder(now, d.randSource)
} else {
d.mu.splitFinder = NewWeightedFinder(now, d.randSource)
}
}
} else {
d.mu.splitFinder = nil
Expand All @@ -157,7 +211,7 @@ func (d *Decider) recordLocked(
if d.mu.splitFinder != nil && n != 0 {
s := span()
if s.Key != nil {
d.mu.splitFinder.Record(span(), d.intn)
d.mu.splitFinder.Record(span(), 1)
}
if d.mu.splitFinder.Ready(now) {
if d.mu.splitFinder.Key() != nil {
Expand All @@ -168,16 +222,15 @@ func (d *Decider) recordLocked(
} else {
if now.Sub(d.mu.lastNoSplitKeyLoggingMetrics) > minNoSplitKeyLoggingMetricsInterval {
d.mu.lastNoSplitKeyLoggingMetrics = now
insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained := d.mu.splitFinder.NoSplitKeyCause()
if insufficientCounters < splitKeySampleSize {
noSplitKeyCauseLogMsg := d.mu.splitFinder.NoSplitKeyCauseLogMsg()
if noSplitKeyCauseLogMsg != "" {
popularKeyFrequency := d.mu.splitFinder.PopularKeyFrequency()
noSplitKeyCauseLogMsg += fmt.Sprintf(", most popular key occurs in %d%% of samples", int(popularKeyFrequency*100))
log.KvDistribution.Infof(ctx, "%s", noSplitKeyCauseLogMsg)
if popularKeyFrequency >= splitKeyThreshold {
d.loadSplitterMetrics.PopularKeyCount.Inc(1)
}
d.loadSplitterMetrics.NoSplitKeyCount.Inc(1)
log.KvDistribution.Infof(ctx,
"No split key found: insufficient counters = %d, imbalance = %d, too many contained = %d, imbalance and too many contained = %d, most popular key occurs in %d%% of samples",
insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained, int(popularKeyFrequency*100))
}
}
}
Expand Down
34 changes: 16 additions & 18 deletions pkg/kv/kvserver/split/decider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ func ms(i int) time.Time {
func TestDecider(t *testing.T) {
defer leaktest.AfterTest(t)()

intn := rand.New(rand.NewSource(12)).Intn
rand := rand.New(rand.NewSource(12))

var d Decider
Init(&d, intn, func() float64 { return 10.0 }, func() time.Duration { return 2 * time.Second }, &LoadSplitterMetrics{
Init(&d, nil, rand, func() float64 { return 10.0 }, func() time.Duration { return 2 * time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down Expand Up @@ -96,12 +96,10 @@ func TestDecider(t *testing.T) {
assert.Equal(t, ms(1200), d.mu.lastQPSRollover)
assertMaxQPS(1099, 0, false)

var nilFinder *Finder

assert.Equal(t, nilFinder, d.mu.splitFinder)
assert.Equal(t, nil, d.mu.splitFinder)

assert.Equal(t, false, d.Record(context.Background(), ms(2199), 12, nil))
assert.Equal(t, nilFinder, d.mu.splitFinder)
assert.Equal(t, nil, d.mu.splitFinder)

// 2200 is the next rollover point, and 12+1=13 qps should be computed.
assert.Equal(t, false, d.Record(context.Background(), ms(2200), 1, op("a")))
Expand Down Expand Up @@ -148,7 +146,7 @@ func TestDecider(t *testing.T) {
tick += 1000
assert.False(t, d.Record(context.Background(), ms(tick), 9, op("a")))
assert.Equal(t, roachpb.Key(nil), d.MaybeSplitKey(context.Background(), ms(tick)))
assert.Equal(t, nilFinder, d.mu.splitFinder)
assert.Equal(t, nil, d.mu.splitFinder)

// Hammer a key with writes above threshold. There shouldn't be a split
// since everyone is hitting the same key and load can't be balanced.
Expand All @@ -166,7 +164,7 @@ func TestDecider(t *testing.T) {
}

// ... which we verify by looking at its samples directly.
for _, sample := range d.mu.splitFinder.samples {
for _, sample := range d.mu.splitFinder.(*UnweightedFinder).samples {
assert.Equal(t, roachpb.Key("p"), sample.key)
}

Expand Down Expand Up @@ -196,10 +194,10 @@ func TestDecider(t *testing.T) {

func TestDecider_MaxQPS(t *testing.T) {
defer leaktest.AfterTest(t)()
intn := rand.New(rand.NewSource(11)).Intn
rand := rand.New(rand.NewSource(11))

var d Decider
Init(&d, intn, func() float64 { return 100.0 }, func() time.Duration { return 10 * time.Second }, &LoadSplitterMetrics{
Init(&d, nil, rand, func() float64 { return 100.0 }, func() time.Duration { return 10 * time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down Expand Up @@ -242,10 +240,10 @@ func TestDecider_MaxQPS(t *testing.T) {

func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) {
defer leaktest.AfterTest(t)()
intn := rand.New(rand.NewSource(11)).Intn
rand := rand.New(rand.NewSource(11))

var d Decider
Init(&d, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
Init(&d, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down Expand Up @@ -278,10 +276,10 @@ func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) {

func TestDeciderIgnoresEnsureSafeSplitKeyOnError(t *testing.T) {
defer leaktest.AfterTest(t)()
intn := rand.New(rand.NewSource(11)).Intn
rand := rand.New(rand.NewSource(11))

var d Decider
Init(&d, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
Init(&d, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down Expand Up @@ -409,11 +407,11 @@ func TestMaxQPSTracker(t *testing.T) {

func TestDeciderMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
intn := rand.New(rand.NewSource(11)).Intn
rand := rand.New(rand.NewSource(11))
timeStart := 1000

var dPopular Decider
Init(&dPopular, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
Init(&dPopular, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand All @@ -435,7 +433,7 @@ func TestDeciderMetrics(t *testing.T) {

// No split key, not popular key
var dNotPopular Decider
Init(&dNotPopular, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
Init(&dNotPopular, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand All @@ -455,7 +453,7 @@ func TestDeciderMetrics(t *testing.T) {

// No split key, all insufficient counters
var dAllInsufficientCounters Decider
Init(&dAllInsufficientCounters, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
Init(&dAllInsufficientCounters, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down
Loading