diff --git a/pkg/kv/kvserver/asim/state/split_decider.go b/pkg/kv/kvserver/asim/state/split_decider.go index f03ec4b4c9b0..3c62003b3b1f 100644 --- a/pkg/kv/kvserver/asim/state/split_decider.go +++ b/pkg/kv/kvserver/asim/state/split_decider.go @@ -62,13 +62,8 @@ func NewSplitDecider( func (s *SplitDecider) newDecider() *split.Decider { rand := rand.New(rand.NewSource(s.seed)) - - intN := func(n int) int { - return rand.Intn(n) - } - decider := &split.Decider{} - split.Init(decider, intN, s.qpsThreshold, s.qpsRetention, &split.LoadSplitterMetrics{ + split.Init(decider, nil, rand, s.qpsThreshold, s.qpsRetention, &split.LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 60b5e4ef6fa8..3414033afb70 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -95,7 +95,8 @@ func newUnloadedReplica( r.mu.stateLoader = stateloader.Make(desc.RangeID) r.mu.quiescent = true r.mu.conf = store.cfg.DefaultSpanConfig - split.Init(&r.loadBasedSplitter, rand.Intn, func() float64 { + randSource := rand.New(rand.NewSource(timeutil.Now().UnixNano())) + split.Init(&r.loadBasedSplitter, store.cfg.Settings, randSource, func() float64 { return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV)) }, func() time.Duration { return kvserverbase.SplitByLoadMergeDelay.Get(&store.cfg.Settings.SV) diff --git a/pkg/kv/kvserver/split/BUILD.bazel b/pkg/kv/kvserver/split/BUILD.bazel index 8ac9fb509f34..031270e152c7 100644 --- a/pkg/kv/kvserver/split/BUILD.bazel +++ b/pkg/kv/kvserver/split/BUILD.bazel @@ -5,17 +5,19 @@ go_library( name = "split", srcs = [ "decider.go", - "finder.go", + "unweighted_finder.go", + "weighted_finder.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split", visibility = ["//visibility:public"], deps = [ "//pkg/keys", "//pkg/roachpb", + "//pkg/settings", + "//pkg/settings/cluster", "//pkg/util/log", "//pkg/util/metric", "//pkg/util/syncutil", - "@org_golang_x_exp//rand", ], ) @@ -24,8 +26,9 @@ go_test( size = "small", srcs = [ "decider_test.go", - "finder_test.go", "load_based_splitter_test.go", + "unweighted_finder_test.go", + "weighted_finder_test.go", ], args = ["-test.timeout=55s"], embed = [":split"], diff --git a/pkg/kv/kvserver/split/decider.go b/pkg/kv/kvserver/split/decider.go index ce7d54e6c3fd..0c5ea9165210 100644 --- a/pkg/kv/kvserver/split/decider.go +++ b/pkg/kv/kvserver/split/decider.go @@ -14,10 +14,13 @@ package split import ( "context" + "fmt" "time" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -27,6 +30,50 @@ const minSplitSuggestionInterval = time.Minute const minNoSplitKeyLoggingMetricsInterval = time.Minute const minQueriesPerSecondSampleDuration = time.Second +type LoadBasedSplitter interface { + // Record informs the LoadBasedSplitter about where the span lies with regard + // to the keys in the samples. + Record(span roachpb.Span, weight float64) + + // Key finds an appropriate split point from the sampled candidate split + // keys. Returns a nil key if no appropriate key was found. + Key() roachpb.Key + + // Ready checks if the LoadBasedSplitter has been initialized with a + // sufficient sample duration. + Ready(nowTime time.Time) bool + + // NoSplitKeyCauseLogMsg returns a log message containing information on the + // number of samples that don't pass each split key requirement if not all + // samples are invalid due to insufficient counters, otherwise returns an + // empty string. + NoSplitKeyCauseLogMsg() string + + // PopularKeyFrequency returns the percentage that the most popular key + // appears in the sampled candidate split keys. + PopularKeyFrequency() float64 +} + +type RandSource interface { + // Float64 returns, as a float64, a pseudo-random number in the half-open + // interval [0.0,1.0) from the RandSource. + Float64() float64 + + // Intn returns, as an int, a non-negative pseudo-random number in the + // half-open interval [0,n). + Intn(n int) int +} + +var enableUnweightedLBSplitFinder = settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.unweighted_lb_split_finder.enabled", + "if enabled, use the un-weighted finder for load-based splitting; "+ + "the unweighted finder will attempt to find a key during when splitting "+ + "a range based on load that evenly divides the QPS among the resulting "+ + "left and right hand side ranges", + true, +) + // A Decider collects measurements about the activity (measured in qps) on a // Replica and, assuming that qps thresholds are exceeded, tries to determine a // split key that would approximately result in halving the load on each of the @@ -63,7 +110,8 @@ type LoadSplitterMetrics struct { // incoming requests to find potential split keys and checks if sampled // candidate split keys satisfy certain requirements. type Decider struct { - intn func(n int) int // supplied to Init + st *cluster.Settings // supplied to Init + randSource RandSource // supplied to Init qpsThreshold func() float64 // supplied to Init qpsRetention func() time.Duration // supplied to Init loadSplitterMetrics *LoadSplitterMetrics // supplied to Init @@ -80,8 +128,8 @@ type Decider struct { maxQPS maxQPSTracker // Fields tracking split key suggestions. - splitFinder *Finder // populated when engaged or decided - lastSplitSuggestion time.Time // last stipulation to client to carry out split + splitFinder LoadBasedSplitter // populated when engaged or decided + lastSplitSuggestion time.Time // last stipulation to client to carry out split // Fields tracking logging / metrics around load-based splitter split key. lastNoSplitKeyLoggingMetrics time.Time @@ -94,12 +142,14 @@ type Decider struct { // may exist in the system at any given point in time. func Init( lbs *Decider, - intn func(n int) int, + st *cluster.Settings, + randSource RandSource, qpsThreshold func() float64, qpsRetention func() time.Duration, loadSplitterMetrics *LoadSplitterMetrics, ) { - lbs.intn = intn + lbs.st = st + lbs.randSource = randSource lbs.qpsThreshold = qpsThreshold lbs.qpsRetention = qpsRetention lbs.loadSplitterMetrics = loadSplitterMetrics @@ -147,7 +197,11 @@ func (d *Decider) recordLocked( // to be used. if d.mu.lastQPS >= d.qpsThreshold() { if d.mu.splitFinder == nil { - d.mu.splitFinder = NewFinder(now) + if d.st == nil || enableUnweightedLBSplitFinder.Get(&d.st.SV) { + d.mu.splitFinder = NewUnweightedFinder(now, d.randSource) + } else { + d.mu.splitFinder = NewWeightedFinder(now, d.randSource) + } } } else { d.mu.splitFinder = nil @@ -157,7 +211,7 @@ func (d *Decider) recordLocked( if d.mu.splitFinder != nil && n != 0 { s := span() if s.Key != nil { - d.mu.splitFinder.Record(span(), d.intn) + d.mu.splitFinder.Record(span(), 1) } if d.mu.splitFinder.Ready(now) { if d.mu.splitFinder.Key() != nil { @@ -168,16 +222,15 @@ func (d *Decider) recordLocked( } else { if now.Sub(d.mu.lastNoSplitKeyLoggingMetrics) > minNoSplitKeyLoggingMetricsInterval { d.mu.lastNoSplitKeyLoggingMetrics = now - insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained := d.mu.splitFinder.NoSplitKeyCause() - if insufficientCounters < splitKeySampleSize { + noSplitKeyCauseLogMsg := d.mu.splitFinder.NoSplitKeyCauseLogMsg() + if noSplitKeyCauseLogMsg != "" { popularKeyFrequency := d.mu.splitFinder.PopularKeyFrequency() + noSplitKeyCauseLogMsg += fmt.Sprintf(", most popular key occurs in %d%% of samples", int(popularKeyFrequency*100)) + log.KvDistribution.Infof(ctx, "%s", noSplitKeyCauseLogMsg) if popularKeyFrequency >= splitKeyThreshold { d.loadSplitterMetrics.PopularKeyCount.Inc(1) } d.loadSplitterMetrics.NoSplitKeyCount.Inc(1) - log.KvDistribution.Infof(ctx, - "No split key found: insufficient counters = %d, imbalance = %d, too many contained = %d, imbalance and too many contained = %d, most popular key occurs in %d%% of samples", - insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained, int(popularKeyFrequency*100)) } } } diff --git a/pkg/kv/kvserver/split/decider_test.go b/pkg/kv/kvserver/split/decider_test.go index 6db6a2e4223a..062854e4125f 100644 --- a/pkg/kv/kvserver/split/decider_test.go +++ b/pkg/kv/kvserver/split/decider_test.go @@ -37,10 +37,10 @@ func ms(i int) time.Time { func TestDecider(t *testing.T) { defer leaktest.AfterTest(t)() - intn := rand.New(rand.NewSource(12)).Intn + rand := rand.New(rand.NewSource(12)) var d Decider - Init(&d, intn, func() float64 { return 10.0 }, func() time.Duration { return 2 * time.Second }, &LoadSplitterMetrics{ + Init(&d, nil, rand, func() float64 { return 10.0 }, func() time.Duration { return 2 * time.Second }, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) @@ -96,12 +96,10 @@ func TestDecider(t *testing.T) { assert.Equal(t, ms(1200), d.mu.lastQPSRollover) assertMaxQPS(1099, 0, false) - var nilFinder *Finder - - assert.Equal(t, nilFinder, d.mu.splitFinder) + assert.Equal(t, nil, d.mu.splitFinder) assert.Equal(t, false, d.Record(context.Background(), ms(2199), 12, nil)) - assert.Equal(t, nilFinder, d.mu.splitFinder) + assert.Equal(t, nil, d.mu.splitFinder) // 2200 is the next rollover point, and 12+1=13 qps should be computed. assert.Equal(t, false, d.Record(context.Background(), ms(2200), 1, op("a"))) @@ -148,7 +146,7 @@ func TestDecider(t *testing.T) { tick += 1000 assert.False(t, d.Record(context.Background(), ms(tick), 9, op("a"))) assert.Equal(t, roachpb.Key(nil), d.MaybeSplitKey(context.Background(), ms(tick))) - assert.Equal(t, nilFinder, d.mu.splitFinder) + assert.Equal(t, nil, d.mu.splitFinder) // Hammer a key with writes above threshold. There shouldn't be a split // since everyone is hitting the same key and load can't be balanced. @@ -166,7 +164,7 @@ func TestDecider(t *testing.T) { } // ... which we verify by looking at its samples directly. - for _, sample := range d.mu.splitFinder.samples { + for _, sample := range d.mu.splitFinder.(*UnweightedFinder).samples { assert.Equal(t, roachpb.Key("p"), sample.key) } @@ -196,10 +194,10 @@ func TestDecider(t *testing.T) { func TestDecider_MaxQPS(t *testing.T) { defer leaktest.AfterTest(t)() - intn := rand.New(rand.NewSource(11)).Intn + rand := rand.New(rand.NewSource(11)) var d Decider - Init(&d, intn, func() float64 { return 100.0 }, func() time.Duration { return 10 * time.Second }, &LoadSplitterMetrics{ + Init(&d, nil, rand, func() float64 { return 100.0 }, func() time.Duration { return 10 * time.Second }, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) @@ -242,10 +240,10 @@ func TestDecider_MaxQPS(t *testing.T) { func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) { defer leaktest.AfterTest(t)() - intn := rand.New(rand.NewSource(11)).Intn + rand := rand.New(rand.NewSource(11)) var d Decider - Init(&d, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ + Init(&d, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) @@ -278,10 +276,10 @@ func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) { func TestDeciderIgnoresEnsureSafeSplitKeyOnError(t *testing.T) { defer leaktest.AfterTest(t)() - intn := rand.New(rand.NewSource(11)).Intn + rand := rand.New(rand.NewSource(11)) var d Decider - Init(&d, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ + Init(&d, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) @@ -409,11 +407,11 @@ func TestMaxQPSTracker(t *testing.T) { func TestDeciderMetrics(t *testing.T) { defer leaktest.AfterTest(t)() - intn := rand.New(rand.NewSource(11)).Intn + rand := rand.New(rand.NewSource(11)) timeStart := 1000 var dPopular Decider - Init(&dPopular, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ + Init(&dPopular, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) @@ -435,7 +433,7 @@ func TestDeciderMetrics(t *testing.T) { // No split key, not popular key var dNotPopular Decider - Init(&dNotPopular, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ + Init(&dNotPopular, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) @@ -455,7 +453,7 @@ func TestDeciderMetrics(t *testing.T) { // No split key, all insufficient counters var dAllInsufficientCounters Decider - Init(&dAllInsufficientCounters, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ + Init(&dAllInsufficientCounters, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) diff --git a/pkg/kv/kvserver/split/load_based_splitter_test.go b/pkg/kv/kvserver/split/load_based_splitter_test.go index 06afe7a18522..e043715a9735 100644 --- a/pkg/kv/kvserver/split/load_based_splitter_test.go +++ b/pkg/kv/kvserver/split/load_based_splitter_test.go @@ -15,7 +15,6 @@ import ( "fmt" "math" "sort" - "testing" "text/tabwriter" "time" @@ -23,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload/ycsb" - "github.com/stretchr/testify/require" "golang.org/x/exp/rand" ) @@ -77,20 +75,15 @@ import ( const ( zipfGenerator = 0 uniformGenerator = 1 - numIterations = 20 + numIterations = 200 ) -type loadBasedSplitter interface { - Record(span roachpb.Span, weight float32) - Key() roachpb.Key -} - type generator interface { Uint64() uint64 } type config struct { - lbs loadBasedSplitter + lbs LoadBasedSplitter startKeyGenerator generator spanLengthGenerator generator weightGenerator generator @@ -101,15 +94,15 @@ type config struct { type request struct { span roachpb.Span - weight float32 + weight float64 } type weightedKey struct { key uint32 - weight float32 + weight float64 } -type settings struct { +type lbsTestSettings struct { desc string startKeyGeneratorType int startKeyGeneratorIMax uint64 @@ -119,7 +112,7 @@ type settings struct { weightGeneratorIMax uint64 rangeRequestPercent float64 numRequests int - lbs func(*rand.Rand) loadBasedSplitter + lbs func(*rand.Rand) LoadBasedSplitter seed uint64 } @@ -127,15 +120,15 @@ func uint32ToKey(key uint32) roachpb.Key { return keys.SystemSQLCodec.TablePrefix(key) } -func generateRequests(config *config) ([]request, []weightedKey, float32) { - var totalWeight float32 +func generateRequests(config *config) ([]request, []weightedKey, float64) { + var totalWeight float64 requests := make([]request, 0, config.numRequests) weightedKeys := make([]weightedKey, 0, 2*config.numRequests) for i := 0; i < config.numRequests; i++ { startKey := uint32(config.startKeyGenerator.Uint64()) spanLength := uint32(config.spanLengthGenerator.Uint64()) - weight := float32(config.weightGenerator.Uint64()) + weight := float64(config.weightGenerator.Uint64()) var span roachpb.Span span.Key = uint32ToKey(startKey) @@ -167,25 +160,25 @@ func generateRequests(config *config) ([]request, []weightedKey, float32) { } func getOptimalKey( - weightedKeys []weightedKey, totalWeight float32, -) (optimalKey uint32, optimalLeftWeight, optimalRightWeight float32) { + weightedKeys []weightedKey, totalWeight float64, +) (optimalKey uint32, optimalLeftWeight, optimalRightWeight float64) { var optimalKeyPtr *uint32 - var leftWeight float32 + var leftWeight float64 sort.Slice(weightedKeys, func(i, j int) bool { return weightedKeys[i].key < weightedKeys[j].key }) - for _, weightedKey := range weightedKeys { + for i := range weightedKeys { rightWeight := totalWeight - leftWeight // Find the split key that results in the smallest difference between the // total weight of keys on the right side and the total weight of keys on // the left side. if optimalKeyPtr == nil || - math.Abs(float64(rightWeight-leftWeight)) < math.Abs(float64(optimalRightWeight-optimalLeftWeight)) { - optimalKeyPtr = &weightedKey.key + math.Abs(rightWeight-leftWeight) < math.Abs(optimalRightWeight-optimalLeftWeight) { + optimalKeyPtr = &weightedKeys[i].key optimalLeftWeight = leftWeight optimalRightWeight = rightWeight } - leftWeight += weightedKey.weight + leftWeight += weightedKeys[i].weight } optimalKey = *optimalKeyPtr return @@ -195,7 +188,7 @@ func getKey( config *config, requests []request, weightedKeys []weightedKey, ) ( key uint32, - leftWeight, rightWeight float32, + leftWeight, rightWeight float64, recordExecutionTime, keyExecutionTime time.Duration, ) { recordStart := timeutil.Now() @@ -222,7 +215,7 @@ func runTest( config *config, ) ( key, optimalKey uint32, - leftWeight, rightWeight, optimalLeftWeight, optimalRightWeight float32, + leftWeight, rightWeight, optimalLeftWeight, optimalRightWeight float64, recordExecutionTime, keyExecutionTime time.Duration, ) { requests, weightedKeys, totalWeight := generateRequests(config) @@ -231,7 +224,7 @@ func runTest( return } -func newGenerator(t *testing.T, randSource *rand.Rand, generatorType int, iMax uint64) generator { +func newGenerator(randSource *rand.Rand, generatorType int, iMax uint64) generator { var g generator var err error if generatorType == zipfGenerator { @@ -239,22 +232,24 @@ func newGenerator(t *testing.T, randSource *rand.Rand, generatorType int, iMax u } else if generatorType == uniformGenerator { g, err = ycsb.NewUniformGenerator(randSource, 1, iMax) } else { - require.Error(t, nil, "generatorType must be zipfGenerator or uniformGenerator") + panic("generatorType must be zipfGenerator or uniformGenerator") + } + if err != nil { + panic(err) } - require.NoError(t, err) return g } func runTestRepeated( - t *testing.T, settings *settings, + settings *lbsTestSettings, ) ( - avgPercentDifference, maxPercentDifference, avgOptimalPercentDifference, maxOptimalPercentDifference float32, + avgPercentDifference, maxPercentDifference, avgOptimalPercentDifference, maxOptimalPercentDifference float64, avgRecordExecutionTime, avgKeyExecutionTime time.Duration, ) { randSource := rand.New(rand.NewSource(settings.seed)) - startKeyGenerator := newGenerator(t, randSource, settings.startKeyGeneratorType, settings.startKeyGeneratorIMax) - spanLengthGenerator := newGenerator(t, randSource, settings.spanLengthGeneratorType, settings.spanLengthGeneratorIMax) - weightGenerator := newGenerator(t, randSource, settings.weightGeneratorType, settings.weightGeneratorIMax) + startKeyGenerator := newGenerator(randSource, settings.startKeyGeneratorType, settings.startKeyGeneratorIMax) + spanLengthGenerator := newGenerator(randSource, settings.spanLengthGeneratorType, settings.spanLengthGeneratorIMax) + weightGenerator := newGenerator(randSource, settings.weightGeneratorType, settings.weightGeneratorIMax) for i := 0; i < numIterations; i++ { _, _, leftWeight, rightWeight, optimalLeftWeight, optimalRightWeight, recordExecutionTime, keyExecutionTime := runTest(&config{ lbs: settings.lbs(randSource), @@ -265,12 +260,12 @@ func runTestRepeated( numRequests: settings.numRequests, randSource: randSource, }) - percentDifference := float32(100 * math.Abs(float64(leftWeight-rightWeight)) / math.Abs(float64(leftWeight+rightWeight))) + percentDifference := 100 * math.Abs(leftWeight-rightWeight) / (leftWeight + rightWeight) avgPercentDifference += percentDifference if maxPercentDifference < percentDifference { maxPercentDifference = percentDifference } - optimalPercentDifference := float32(100 * math.Abs(float64(optimalLeftWeight-optimalRightWeight)) / math.Abs(float64(optimalLeftWeight+optimalRightWeight))) + optimalPercentDifference := 100 * math.Abs(optimalLeftWeight-optimalRightWeight) / (optimalLeftWeight + optimalRightWeight) avgOptimalPercentDifference += optimalPercentDifference if maxOptimalPercentDifference < optimalPercentDifference { maxOptimalPercentDifference = optimalPercentDifference @@ -280,12 +275,12 @@ func runTestRepeated( } avgRecordExecutionTime = time.Duration(avgRecordExecutionTime.Nanoseconds() / int64(numIterations)) avgKeyExecutionTime = time.Duration(avgKeyExecutionTime.Nanoseconds() / int64(numIterations)) - avgPercentDifference /= float32(numIterations) - avgOptimalPercentDifference /= float32(numIterations) + avgPercentDifference /= numIterations + avgOptimalPercentDifference /= numIterations return } -func runTestMultipleSettings(t *testing.T, settingsArr []settings) { +func runTestMultipleSettings(settingsArr []lbsTestSettings) { var buf bytes.Buffer w := tabwriter.NewWriter(&buf, 4, 0, 2, ' ', 0) _, _ = fmt.Fprintln(w, @@ -298,7 +293,7 @@ func runTestMultipleSettings(t *testing.T, settingsArr []settings) { "Avg Key Execution Time", ) for _, settings := range settingsArr { - avgPercentDifference, maxPercentDifference, avgOptimalPercentDifference, maxOptimalPercentDifference, avgRecordExecutionTime, avgKeyExecutionTime := runTestRepeated(t, &settings) + avgPercentDifference, maxPercentDifference, avgOptimalPercentDifference, maxOptimalPercentDifference, avgRecordExecutionTime, avgKeyExecutionTime := runTestRepeated(&settings) _, _ = fmt.Fprintf(w, "%s\t%f\t%f\t%f\t%f\t%s\t%s\n", settings.desc, avgPercentDifference, @@ -312,10 +307,10 @@ func runTestMultipleSettings(t *testing.T, settingsArr []settings) { fmt.Print(buf.String()) } -func TestUnweightedFinder(t *testing.T) { - runTestMultipleSettings(t, []settings{ +func ExampleUnweightedFinder() { + runTestMultipleSettings([]lbsTestSettings{ { - desc: "Unweighted Finder", + desc: "UnweightedFinder", startKeyGeneratorType: zipfGenerator, startKeyGeneratorIMax: 10000000000, spanLengthGeneratorType: uniformGenerator, @@ -323,11 +318,85 @@ func TestUnweightedFinder(t *testing.T) { weightGeneratorType: uniformGenerator, weightGeneratorIMax: 1, rangeRequestPercent: 0.95, - numRequests: 10000, - lbs: func(randSource *rand.Rand) loadBasedSplitter { - return NewTestFinder(randSource) + numRequests: 13000, + lbs: func(randSource *rand.Rand) LoadBasedSplitter { + return NewUnweightedFinder(timeutil.Now(), randSource) }, seed: 2022, }, }) } + +func ExampleWeightedFinder() { + seed := uint64(2022) + lbs := func(randSource *rand.Rand) LoadBasedSplitter { + return NewWeightedFinder(timeutil.Now(), randSource) + } + runTestMultipleSettings([]lbsTestSettings{ + { + desc: "WeightedFinder/startIMax=10000000000/spanIMax=1000", + startKeyGeneratorType: zipfGenerator, + startKeyGeneratorIMax: 10000000000, + spanLengthGeneratorType: uniformGenerator, + spanLengthGeneratorIMax: 1000, + weightGeneratorType: uniformGenerator, + weightGeneratorIMax: 10, + rangeRequestPercent: 0.95, + numRequests: 10000, + lbs: lbs, + seed: seed, + }, + { + desc: "WeightedFinder/startIMax=100000/spanIMax=1000", + startKeyGeneratorType: zipfGenerator, + startKeyGeneratorIMax: 100000, + spanLengthGeneratorType: uniformGenerator, + spanLengthGeneratorIMax: 1000, + weightGeneratorType: uniformGenerator, + weightGeneratorIMax: 10, + rangeRequestPercent: 0.95, + numRequests: 10000, + lbs: lbs, + seed: seed, + }, + { + desc: "WeightedFinder/startIMax=1000/spanIMax=100", + startKeyGeneratorType: zipfGenerator, + startKeyGeneratorIMax: 1000, + spanLengthGeneratorType: uniformGenerator, + spanLengthGeneratorIMax: 100, + weightGeneratorType: uniformGenerator, + weightGeneratorIMax: 10, + rangeRequestPercent: 0.95, + numRequests: 10000, + lbs: lbs, + seed: seed, + }, + { + desc: "WeightedFinder/startIMax=100000/spanIMax=1000/point", + startKeyGeneratorType: zipfGenerator, + startKeyGeneratorIMax: 100000, + spanLengthGeneratorType: uniformGenerator, + spanLengthGeneratorIMax: 1000, + weightGeneratorType: uniformGenerator, + weightGeneratorIMax: 10, + rangeRequestPercent: 0, + numRequests: 10000, + lbs: lbs, + seed: seed, + }, + { + desc: "WeightedFinder/startIMax=10000000000/spanIMax=1000/unweighted", + startKeyGeneratorType: zipfGenerator, + startKeyGeneratorIMax: 10000000000, + spanLengthGeneratorType: uniformGenerator, + spanLengthGeneratorIMax: 1000, + weightGeneratorType: uniformGenerator, + weightGeneratorIMax: 1, + rangeRequestPercent: 0.95, + numRequests: 10000, + lbs: lbs, + seed: seed, + }, + }) +} diff --git a/pkg/kv/kvserver/split/finder.go b/pkg/kv/kvserver/split/unweighted_finder.go similarity index 74% rename from pkg/kv/kvserver/split/finder.go rename to pkg/kv/kvserver/split/unweighted_finder.go index 2a93b6364b95..cbab193abf91 100644 --- a/pkg/kv/kvserver/split/finder.go +++ b/pkg/kv/kvserver/split/unweighted_finder.go @@ -12,12 +12,12 @@ package split import ( "bytes" + "fmt" "math" "sort" "time" "github.com/cockroachdb/cockroach/pkg/roachpb" - "golang.org/x/exp/rand" ) // Load-based splitting. @@ -28,7 +28,7 @@ import ( // - Disengage when a range no longer meets the criteria // - During split: // - Record start time -// - Keep a sample of 10 keys +// - Keep a sample of 20 keys // - Each sample contains three counters: left, right and contained. // - On each span, increment the left and/or right counters, depending // on whether the span falls entirely to the left, to the right. @@ -59,30 +59,31 @@ type sample struct { left, right, contained int } -// Finder is a structure that is used to determine the split point +// UnweightedFinder is a structure that is used to determine the split point // using the Reservoir Sampling method. -type Finder struct { - startTime time.Time - samples [splitKeySampleSize]sample - count int +type UnweightedFinder struct { + startTime time.Time + randSource RandSource + samples [splitKeySampleSize]sample + count int } -// NewFinder initiates a Finder with the given time. -func NewFinder(startTime time.Time) *Finder { - return &Finder{ - startTime: startTime, +// NewUnweightedFinder initiates a UnweightedFinder with the given time. +func NewUnweightedFinder(startTime time.Time, randSource RandSource) *UnweightedFinder { + return &UnweightedFinder{ + startTime: startTime, + randSource: randSource, } } -// Ready checks if the Finder has been initialized with a sufficient -// sample duration. -func (f *Finder) Ready(nowTime time.Time) bool { +// Ready implements the LoadBasedSplitter interface. +func (f *UnweightedFinder) Ready(nowTime time.Time) bool { return nowTime.Sub(f.startTime) > RecordDurationThreshold } -// Record informs the Finder about where the span lies with -// regard to the keys in the samples. -func (f *Finder) Record(span roachpb.Span, intNFn func(int) int) { +// Record implements the LoadBasedSplitter interface. Record uses reservoir +// sampling to get the candidate split keys. +func (f *UnweightedFinder) Record(span roachpb.Span, weight float64) { if f == nil { return } @@ -92,7 +93,7 @@ func (f *Finder) Record(span roachpb.Span, intNFn func(int) int) { f.count++ if count < splitKeySampleSize { idx = count - } else if idx = intNFn(count); idx >= splitKeySampleSize { + } else if idx = f.randSource.Intn(count); idx >= splitKeySampleSize { // Increment all existing keys' counters. for i := range f.samples { if span.ProperlyContainsKey(f.samples[i].key) { @@ -120,9 +121,12 @@ func (f *Finder) Record(span roachpb.Span, intNFn func(int) int) { f.samples[idx] = sample{key: span.Key} } -// Key finds an appropriate split point based on the Reservoir sampling method. -// Returns a nil key if no appropriate key was found. -func (f *Finder) Key() roachpb.Key { +// Key implements the LoadBasedSplitter interface. Key returns the candidate +// split key that minimizes the sum of the balance score (percentage difference +// between the left and right counters) and the contained score (percentage of +// counters are contained), provided the balance score is < 0.25 and the +// contained score is < 0.5. +func (f *UnweightedFinder) Key() roachpb.Key { if f == nil { return nil } @@ -156,7 +160,7 @@ func (f *Finder) Key() roachpb.Key { // determines the number of samples that don't pass each split key requirement // (e.g. insufficient counters, imbalance in left and right counters, too many // contained counters, or a combination of the last two). -func (f *Finder) NoSplitKeyCause() ( +func (f *UnweightedFinder) noSplitKeyCause() ( insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained int, ) { for _, s := range f.samples { @@ -179,9 +183,20 @@ func (f *Finder) NoSplitKeyCause() ( return } -// PopularKeyFrequency returns the percentage that the most popular key appears -// in f.samples. -func (f *Finder) PopularKeyFrequency() float64 { +// NoSplitKeyCauseLogMsg implements the LoadBasedSplitter interface. +func (f *UnweightedFinder) NoSplitKeyCauseLogMsg() string { + insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained := f.noSplitKeyCause() + if insufficientCounters == splitKeySampleSize { + return "" + } + noSplitKeyCauseLogMsg := fmt.Sprintf( + "No split key found: insufficient counters = %d, imbalance = %d, too many contained = %d, imbalance and too many contained = %d", + insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained) + return noSplitKeyCauseLogMsg +} + +// PopularKeyFrequency implements the LoadBasedSplitter interface. +func (f *UnweightedFinder) PopularKeyFrequency() float64 { sort.Slice(f.samples[:], func(i, j int) bool { return bytes.Compare(f.samples[i].key, f.samples[j].key) < 0 }) @@ -201,27 +216,3 @@ func (f *Finder) PopularKeyFrequency() float64 { return float64(popularKeyCount) / float64(splitKeySampleSize) } - -// TestFinder is a wrapper of Finder compatible with the load-based splitter -// testing framework. -type TestFinder struct { - f Finder - randSource *rand.Rand -} - -// NewTestFinder initiates a TestFinder with a random source. -func NewTestFinder(randSource *rand.Rand) *TestFinder { - return &TestFinder{ - randSource: randSource, - } -} - -// Record records the span, ignoring weight as this Finder is unweighted. -func (tf *TestFinder) Record(span roachpb.Span, weight float32) { - tf.f.Record(span, tf.randSource.Intn) -} - -// Key finds a split key. -func (tf *TestFinder) Key() roachpb.Key { - return tf.f.Key() -} diff --git a/pkg/kv/kvserver/split/finder_test.go b/pkg/kv/kvserver/split/unweighted_finder_test.go similarity index 90% rename from pkg/kv/kvserver/split/finder_test.go rename to pkg/kv/kvserver/split/unweighted_finder_test.go index 0afd1844fc4c..e25fa844bfcf 100644 --- a/pkg/kv/kvserver/split/finder_test.go +++ b/pkg/kv/kvserver/split/unweighted_finder_test.go @@ -25,6 +25,21 @@ import ( "github.com/stretchr/testify/assert" ) +type DFLargestRandSource struct{} + +func (r DFLargestRandSource) Float64() float64 { + return 0 +} + +// Intn returns the largest number possible in [0, n) +func (r DFLargestRandSource) Intn(n int) int { + var result int + if n > 0 { + result = n - 1 + } + return result +} + // TestSplitFinderKey verifies the Key() method correctly // finds an appropriate split point for the range. func TestSplitFinderKey(t *testing.T) { @@ -151,8 +166,9 @@ func TestSplitFinderKey(t *testing.T) { {multipleSpanReservoir, keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset + splitKeySampleSize/2)}, } + randSource := rand.New(rand.NewSource(2022)) for i, test := range testCases { - finder := NewFinder(timeutil.Now()) + finder := NewUnweightedFinder(timeutil.Now(), randSource) finder.samples = test.reservoir if splitByLoadKey := finder.Key(); !bytes.Equal(splitByLoadKey, test.splitByLoadKey) { t.Errorf( @@ -171,18 +187,6 @@ func TestSplitFinderRecorder(t *testing.T) { const ReservoirKeyOffset = 1000 - // getLargest is an IntN function that returns the largest number possible in [0, n) - getLargest := func(n int) int { - var result int - if n > 0 { - result = n - 1 - } - return result - } - - // getZero is an IntN function that returns 0 - getZero := func(n int) int { return 0 } - // Test recording a key query before the reservoir is full. basicReservoir := [splitKeySampleSize]sample{} basicSpan := roachpb.Span{ @@ -246,26 +250,26 @@ func TestSplitFinderRecorder(t *testing.T) { testCases := []struct { recordSpan roachpb.Span - intNFn func(int) int + randSource RandSource currCount int currReservoir [splitKeySampleSize]sample expectedReservoir [splitKeySampleSize]sample }{ // Test recording a key query before the reservoir is full. - {basicSpan, getLargest, 0, basicReservoir, expectedBasicReservoir}, + {basicSpan, DFLargestRandSource{}, 0, basicReservoir, expectedBasicReservoir}, // Test recording a key query after the reservoir is full with replacement. - {replacementSpan, getZero, splitKeySampleSize + 1, replacementReservoir, expectedReplacementReservoir}, + {replacementSpan, ZeroRandSource{}, splitKeySampleSize + 1, replacementReservoir, expectedReplacementReservoir}, // Test recording a key query after the reservoir is full without replacement. - {fullSpan, getLargest, splitKeySampleSize + 1, fullReservoir, expectedFullReservoir}, + {fullSpan, DFLargestRandSource{}, splitKeySampleSize + 1, fullReservoir, expectedFullReservoir}, // Test recording a spanning query. - {spanningSpan, getLargest, splitKeySampleSize + 1, spanningReservoir, expectedSpanningReservoir}, + {spanningSpan, DFLargestRandSource{}, splitKeySampleSize + 1, spanningReservoir, expectedSpanningReservoir}, } for i, test := range testCases { - finder := NewFinder(timeutil.Now()) + finder := NewUnweightedFinder(timeutil.Now(), test.randSource) finder.samples = test.currReservoir finder.count = test.currCount - finder.Record(test.recordSpan, test.intNFn) + finder.Record(test.recordSpan, 1) if !reflect.DeepEqual(finder.samples, test.expectedReservoir) { t.Errorf( "%d: expected reservoir: %v, but got reservoir: %v", @@ -278,7 +282,7 @@ func TestFinderNoSplitKeyCause(t *testing.T) { samples := [splitKeySampleSize]sample{} for i, idx := range rand.Perm(splitKeySampleSize) { if i < 5 { - // insufficient counters + // Insufficient counters. samples[idx] = sample{ key: keys.SystemSQLCodec.TablePrefix(uint32(i)), left: 0, @@ -286,7 +290,7 @@ func TestFinderNoSplitKeyCause(t *testing.T) { contained: splitKeyMinCounter - 1, } } else if i < 7 { - // imbalance + // Imbalance and too many contained counters. deviationLeft := rand.Intn(5) deviationRight := rand.Intn(5) samples[idx] = sample{ @@ -296,7 +300,7 @@ func TestFinderNoSplitKeyCause(t *testing.T) { contained: int(max(float64(splitKeyMinCounter-40-deviationLeft+deviationRight), float64(40+deviationLeft-deviationRight))), } } else if i < 13 { - // imbalance + // Imbalance counters. deviationLeft := rand.Intn(5) deviationRight := rand.Intn(5) samples[idx] = sample{ @@ -306,7 +310,7 @@ func TestFinderNoSplitKeyCause(t *testing.T) { contained: int(max(float64(splitKeyMinCounter-80-deviationLeft+deviationRight), 0)), } } else { - // too many contained + // Too many contained counters. contained := int(splitKeyMinCounter*splitKeyContainedThreshold + 1) left := (splitKeyMinCounter - contained) / 2 samples[idx] = sample{ @@ -318,9 +322,10 @@ func TestFinderNoSplitKeyCause(t *testing.T) { } } - finder := NewFinder(timeutil.Now()) + randSource := rand.New(rand.NewSource(2022)) + finder := NewUnweightedFinder(timeutil.Now(), randSource) finder.samples = samples - insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained := finder.NoSplitKeyCause() + insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained := finder.noSplitKeyCause() assert.Equal(t, 5, insufficientCounters, "unexpected insufficient counters") assert.Equal(t, 6, imbalance, "unexpected imbalance counters") assert.Equal(t, 7, tooManyContained, "unexpected too many contained counters") @@ -392,8 +397,10 @@ func TestFinderPopularKeyFrequency(t *testing.T) { {fiftyFivePercentPopularKeySample, 0.55}, {sameKeySample, 1}, } + + randSource := rand.New(rand.NewSource(2022)) for i, test := range testCases { - finder := NewFinder(timeutil.Now()) + finder := NewUnweightedFinder(timeutil.Now(), randSource) finder.samples = test.samples popularKeyFrequency := finder.PopularKeyFrequency() assert.Equal(t, test.expectedPopularKeyFrequency, popularKeyFrequency, "unexpected popular key frequency in test %d", i) diff --git a/pkg/kv/kvserver/split/weighted_finder.go b/pkg/kv/kvserver/split/weighted_finder.go new file mode 100644 index 000000000000..773c548e613f --- /dev/null +++ b/pkg/kv/kvserver/split/weighted_finder.go @@ -0,0 +1,272 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package split + +import ( + "fmt" + "math" + "sort" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachpb" +) + +// Load-based splitting. +// +// - Engage split for ranges: +// - With size exceeding min-range-bytes +// - with reqs/s rate over a configurable threshold +// - Disengage when a range no longer meets the criteria +// - During split: +// - Record start time +// - Keep a sample of 20 keys using weighted reservoir sampling (a simplified +// version of A-Chao algorithm). For more information on A-Chao, see +// https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_A-Chao or +// https://arxiv.org/pdf/1012.0256.pdf. +// - Each sample contains two counters: left and right. +// - For a weighted point span (key, weight), record that as is; for a +// weighted ranged span ([key, endKey), weight), record that as two +// weighted point spans: (key, weight/2) and (endKey, weight/2). +// - On each weighted point span, increment the left and/or right counters by +// the weight, depending on whether the key falls to the left or to the +// right. +// - When a sample is replaced, discard its counters. +// - If a range is on for more than a threshold interval: +// - Examine sample for the smallest diff between left and right counters, +// excluding any whose counters are not sufficiently advanced; +// If not less than some constant threshold, skip split. +// - If a desired point is reached, add range to split queue with the chosen +// key as split key, and provide hint to scatter the replicas. + +type weightedSample struct { + key roachpb.Key + weight float64 + left, right float64 + count int +} + +// WeightedFinder is a structure that is used to determine the split point +// using the Weighted Reservoir Sampling method (a simplified version of A-Chao +// algorithm). +type WeightedFinder struct { + samples [splitKeySampleSize]weightedSample + count int + totalWeight float64 + startTime time.Time + randSource RandSource +} + +// NewWeightedFinder initiates a WeightedFinder with the given time. +func NewWeightedFinder(startTime time.Time, randSource RandSource) *WeightedFinder { + return &WeightedFinder{ + startTime: startTime, + randSource: randSource, + } +} + +// Ready implements the LoadBasedSplitter interface. +func (f *WeightedFinder) Ready(nowTime time.Time) bool { + return nowTime.Sub(f.startTime) > RecordDurationThreshold +} + +// record informs the WeightedFinder about where the incoming point span (i.e. +// key) lies with regard to the candidate split keys. We use weighted reservoir +// sampling (a simplified version of A-Chao algorithm) to get the candidate +// split keys. +func (f *WeightedFinder) record(key roachpb.Key, weight float64) { + if f == nil { + return + } + + var idx int + count := f.count + f.count++ + f.totalWeight += weight + if count < splitKeySampleSize { + idx = count + } else if f.randSource.Float64() > splitKeySampleSize*weight/f.totalWeight { + for i := range f.samples { + // Example: Suppose we have candidate split key = "k" (i.e. + // f.samples[i].Key). + // + // Suppose we record the following weighted keys: + // record("i", 3) x Increment left counter by 3 + // record("k", 5) x Increment right counter by 5 + // record("l", 1) x Increment right counter by 1 + // |----|----|----|----| + // "i" "j" "k" "l" "m" + // ^ + // Candidate split key + // Left range split [ ) + // Right range split [ ) + if comp := key.Compare(f.samples[i].key); comp < 0 { + // Case key < f.samples[i].Key i.e. key is to the left of the candidate + // split key (left is exclusive to split key). + f.samples[i].left += weight + } else { + // Case key >= f.samples[i].Key i.e. key is to the right of or on the + // candidate split key (right is inclusive to split key). + f.samples[i].right += weight + } + f.samples[i].count++ + } + return + } else { + idx = f.randSource.Intn(splitKeySampleSize) + } + + // Note we always use the start key of the span. We could + // take the average of the byte slices, but that seems + // unnecessarily complex for practical usage. + f.samples[idx] = weightedSample{key: key, weight: weight} +} + +// Record implements the LoadBasedSplitter interface. +// +// Note that we treat a weighted range request ([start, end), w) as two +// weighted point requests (start, w/2) and (end, w/2). The motivation for this +// is that within the range [start, end), we do not know anything about the +// underlying data distribution without complex methods to retrieve such +// information. Moreover, we do not even know what keys are within this range +// since all keys are byte arrays, and consequently we have no concept of +// “distance” or “midpoint” between a pair of keys. The most basic approach is +// to be agnostic to the underlying data distribution and relative distance +// between keys, and simply assume that a weighted range request that contains +// a candidate split key will contribute half of its weight to the left counter +// and half of its weight to the right counter of that candidate split key. +func (f *WeightedFinder) Record(span roachpb.Span, weight float64) { + if span.EndKey == nil { + f.record(span.Key, weight) + } else { + f.record(span.Key, weight/2) + f.record(span.EndKey, weight/2) + } +} + +// Key implements the LoadBasedSplitter interface. Key returns the candidate +// split key that minimizes the balance score (percentage difference between +// the left and right counters), provided the balance score is < 0.25. +func (f *WeightedFinder) Key() roachpb.Key { + if f == nil { + return nil + } + + var bestIdx = -1 + var bestScore float64 = 1 + // For simplicity, we suppose splitKeyMinCounter = 5. + // + // Example 1 (numbers refer to weights of requests): + // 1 | + // 2 | + // | 1 + // | 1 + // | + // split key + // s.left = 3 + // s.right = 2 + // s.count = 4 + // Invalid split key because insufficient counters + // (s.count < splitKeyMinCounter). + // + // Example 2 (numbers refer to weights of requests): + // 1 | + // 2 | + // | 1 + // | 1 + // | 3 + // split key + // s.left = 3 + // s.right = 5 + // balance score = |3 - 5| / (3 + 5) = 0.25 + // Invalid split key because imbalance in left and right counters i.e. + // balance score >= splitKeyThreshold. + // + // Example 3: + // 1 | | + // 2 | | + // 2 | | + // | 1| + // | | 1 + // | | 6 + // sk1 + // sk2 + // balance score of sk1 = 0.23 + // balance score of sk2 = 0.08 + // We choose split key sk2 because it has the lowest balance score. + for i, s := range f.samples { + if s.count < splitKeyMinCounter { + continue + } + balanceScore := math.Abs(s.left-s.right) / (s.left + s.right) + if balanceScore >= splitKeyThreshold { + continue + } + if balanceScore < bestScore { + bestIdx = i + bestScore = balanceScore + } + } + + if bestIdx == -1 { + return nil + } + return f.samples[bestIdx].key +} + +// noSplitKeyCause iterates over all sampled candidate split keys and +// determines the number of samples that don't pass each split key requirement +// (e.g. insufficient counters, imbalance in left and right counters). +func (f *WeightedFinder) noSplitKeyCause() (insufficientCounters, imbalance int) { + for _, s := range f.samples { + if s.count < splitKeyMinCounter { + insufficientCounters++ + } else if balanceScore := math.Abs(s.left-s.right) / (s.left + s.right); balanceScore >= splitKeyThreshold { + imbalance++ + } + } + return +} + +// NoSplitKeyCauseLogMsg implements the LoadBasedSplitter interface. +func (f *WeightedFinder) NoSplitKeyCauseLogMsg() string { + insufficientCounters, imbalance := f.noSplitKeyCause() + if insufficientCounters == splitKeySampleSize { + return "" + } + noSplitKeyCauseLogMsg := fmt.Sprintf("No split key found: insufficient counters = %d, imbalance = %d", insufficientCounters, imbalance) + return noSplitKeyCauseLogMsg +} + +// PopularKeyFrequency implements the LoadBasedSplitter interface. +func (f *WeightedFinder) PopularKeyFrequency() float64 { + sort.Slice(f.samples[:], func(i, j int) bool { + return f.samples[i].key.Compare(f.samples[j].key) < 0 + }) + + weight := f.samples[0].weight + currentKeyWeight := weight + popularKeyWeight := weight + totalWeight := weight + for i := 1; i < len(f.samples); i++ { + weight := f.samples[i].weight + if f.samples[i].key.Equal(f.samples[i-1].key) { + currentKeyWeight += weight + } else { + currentKeyWeight = weight + } + if popularKeyWeight < currentKeyWeight { + popularKeyWeight = currentKeyWeight + } + totalWeight += weight + } + + return popularKeyWeight / totalWeight +} diff --git a/pkg/kv/kvserver/split/weighted_finder_test.go b/pkg/kv/kvserver/split/weighted_finder_test.go new file mode 100644 index 000000000000..6db32a6bae3f --- /dev/null +++ b/pkg/kv/kvserver/split/weighted_finder_test.go @@ -0,0 +1,413 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package split + +import ( + "bytes" + "context" + "math" + "reflect" + "testing" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/assert" + "golang.org/x/exp/rand" +) + +type ZeroRandSource struct{} + +func (r ZeroRandSource) Float64() float64 { + return 0 +} + +func (r ZeroRandSource) Intn(int) int { + return 0 +} + +type WFLargestRandSource struct{} + +func (r WFLargestRandSource) Float64() float64 { + return 1 +} + +func (r WFLargestRandSource) Intn(int) int { + return 0 +} + +// TestSplitWeightedFinderKey verifies the Key() method correctly +// finds an appropriate split point for the range. +func TestSplitWeightedFinderKey(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop(context.Background()) + + const ReservoirKeyOffset = 1000 + + // Test an empty reservoir (reservoir without load). + basicReservoir := [splitKeySampleSize]weightedSample{} + + // Test reservoir with no load should have no splits. + noLoadReservoir := [splitKeySampleSize]weightedSample{} + for i := 0; i < splitKeySampleSize; i++ { + tempSample := weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(ReservoirKeyOffset + i)), + left: 0, + right: 0, + count: 0, + } + noLoadReservoir[i] = tempSample + } + + // Test a uniform reservoir. + uniformReservoir := [splitKeySampleSize]weightedSample{} + for i := 0; i < splitKeySampleSize; i++ { + tempSample := weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(ReservoirKeyOffset + i)), + left: splitKeyMinCounter, + right: splitKeyMinCounter, + count: splitKeyMinCounter, + } + uniformReservoir[i] = tempSample + } + + // Testing a non-uniform reservoir. + nonUniformReservoir := [splitKeySampleSize]weightedSample{} + for i := 0; i < splitKeySampleSize; i++ { + tempSample := weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(ReservoirKeyOffset + i)), + left: float64(splitKeyMinCounter * i), + right: float64(splitKeyMinCounter * (splitKeySampleSize - i)), + count: splitKeyMinCounter, + } + nonUniformReservoir[i] = tempSample + } + + // Test a load heavy reservoir on a single hot key (the last key). + singleHotKeyReservoir := [splitKeySampleSize]weightedSample{} + for i := 0; i < splitKeySampleSize; i++ { + tempSample := weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(ReservoirKeyOffset + i)), + left: 0, + right: splitKeyMinCounter, + count: splitKeyMinCounter, + } + singleHotKeyReservoir[i] = tempSample + } + + // Test a load heavy reservoir on multiple hot keys (first and last key). + multipleHotKeysReservoir := [splitKeySampleSize]weightedSample{} + for i := 0; i < splitKeySampleSize; i++ { + tempSample := weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(ReservoirKeyOffset + i)), + left: splitKeyMinCounter, + right: splitKeyMinCounter, + count: splitKeyMinCounter, + } + multipleHotKeysReservoir[i] = tempSample + } + multipleHotKeysReservoir[0].left = 0 + + // Test a spanning reservoir where splits shouldn't occur. + spanningReservoir := [splitKeySampleSize]weightedSample{} + for i := 0; i < splitKeySampleSize; i++ { + tempSample := weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(ReservoirKeyOffset + i)), + left: 75, + right: 45, + count: splitKeyMinCounter, + } + spanningReservoir[i] = tempSample + } + + // Test that splits happen in best balance score. + bestBalanceReservoir := [splitKeySampleSize]weightedSample{} + for i := 0; i < splitKeySampleSize; i++ { + tempSample := weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(ReservoirKeyOffset + i)), + left: 2.3 * splitKeyMinCounter, + right: 1.7 * splitKeyMinCounter, + count: splitKeyMinCounter, + } + bestBalanceReservoir[i] = tempSample + } + midSample := weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(ReservoirKeyOffset + splitKeySampleSize/2)), + left: 1.1 * splitKeyMinCounter, + right: 0.9 * splitKeyMinCounter, + count: splitKeyMinCounter, + } + bestBalanceReservoir[splitKeySampleSize/2] = midSample + + testCases := []struct { + reservoir [splitKeySampleSize]weightedSample + splitByLoadKey roachpb.Key + }{ + // Test an empty reservoir. + {basicReservoir, nil}, + // Test reservoir with no load should have no splits. + {noLoadReservoir, nil}, + // Test a uniform reservoir (Splits at the first key) + {uniformReservoir, keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset)}, + // Testing a non-uniform reservoir. + {nonUniformReservoir, keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset + splitKeySampleSize/2)}, + // Test a load heavy reservoir on a single hot key. Splitting can't help here. + {singleHotKeyReservoir, nil}, + // Test a load heavy reservoir on multiple hot keys. Splits between the hot keys. + {multipleHotKeysReservoir, keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset + 1)}, + // Test a spanning reservoir. Splitting will be bad here. Should avoid it. + {spanningReservoir, nil}, + // Test that splits happen between two heavy spans. + {bestBalanceReservoir, keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset + splitKeySampleSize/2)}, + } + + randSource := rand.New(rand.NewSource(2022)) + for i, test := range testCases { + weightedFinder := NewWeightedFinder(timeutil.Now(), randSource) + weightedFinder.samples = test.reservoir + if splitByLoadKey := weightedFinder.Key(); !bytes.Equal(splitByLoadKey, test.splitByLoadKey) { + t.Errorf( + "%d: expected splitByLoadKey: %v, but got splitByLoadKey: %v", + i, test.splitByLoadKey, splitByLoadKey) + } + } +} + +// TestSplitWeightedFinderRecorder verifies the Record() method correctly +// records a span. +func TestSplitWeightedFinderRecorder(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop(context.Background()) + + const ReservoirKeyOffset = 1000 + + // Test recording a key query before the reservoir is full. + basicReservoir := [splitKeySampleSize]weightedSample{} + basicSpan := roachpb.Span{ + Key: keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset), + EndKey: keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset + 1), + } + const basicWeight = 1 + expectedBasicReservoir := [splitKeySampleSize]weightedSample{} + expectedBasicReservoir[0] = weightedSample{ + key: basicSpan.Key, + weight: 0.5, + } + expectedBasicReservoir[1] = weightedSample{ + key: basicSpan.EndKey, + weight: 0.5, + } + + // Test recording a key query after the reservoir is full with replacement. + replacementReservoir := [splitKeySampleSize]weightedSample{} + for i := 0; i < splitKeySampleSize; i++ { + tempSample := weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(ReservoirKeyOffset + i)), + weight: 1, + left: 0, + right: 0, + } + replacementReservoir[i] = tempSample + } + replacementSpan := roachpb.Span{ + Key: keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset + splitKeySampleSize), + EndKey: keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset + splitKeySampleSize + 1), + } + const replacementWeight = 1 + expectedReplacementReservoir := replacementReservoir + expectedReplacementReservoir[0] = weightedSample{ + key: replacementSpan.EndKey, + weight: 0.5, + } + + // Test recording a key query after the reservoir is full without replacement. + fullReservoir := replacementReservoir + fullSpan := roachpb.Span{ + Key: keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset), + EndKey: keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset + 1), + } + const fullWeight = 1 + expectedFullReservoir := fullReservoir + for i := 0; i < splitKeySampleSize; i++ { + tempSample := weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(ReservoirKeyOffset + i)), + weight: 1, + left: 1, + right: 0, + count: 2, + } + expectedFullReservoir[i] = tempSample + } + expectedFullReservoir[0].left = 0 + expectedFullReservoir[0].right = 1 + expectedFullReservoir[1].left = 0.5 + expectedFullReservoir[1].right = 0.5 + + // Test recording a spanning query. + spanningReservoir := replacementReservoir + spanningSpan := roachpb.Span{ + Key: keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset - 1), + EndKey: keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset + splitKeySampleSize + 1), + } + const spanningWeight = 1 + expectedSpanningReservoir := spanningReservoir + for i := 0; i < splitKeySampleSize; i++ { + tempSample := weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(ReservoirKeyOffset + i)), + weight: 1, + left: 0.5, + right: 0.5, + count: 2, + } + expectedSpanningReservoir[i] = tempSample + } + + testCases := []struct { + recordSpan roachpb.Span + weight float64 + randSource RandSource + currCount int + currReservoir [splitKeySampleSize]weightedSample + expectedReservoir [splitKeySampleSize]weightedSample + }{ + // Test recording a key query before the reservoir is full. + {basicSpan, basicWeight, WFLargestRandSource{}, 0, basicReservoir, expectedBasicReservoir}, + // Test recording a key query after the reservoir is full with replacement. + {replacementSpan, replacementWeight, ZeroRandSource{}, splitKeySampleSize + 1, replacementReservoir, expectedReplacementReservoir}, + // Test recording a key query after the reservoir is full without replacement. + {fullSpan, fullWeight, WFLargestRandSource{}, splitKeySampleSize + 1, fullReservoir, expectedFullReservoir}, + // Test recording a spanning query. + {spanningSpan, spanningWeight, WFLargestRandSource{}, splitKeySampleSize + 1, spanningReservoir, expectedSpanningReservoir}, + } + + for i, test := range testCases { + weightedFinder := NewWeightedFinder(timeutil.Now(), test.randSource) + weightedFinder.samples = test.currReservoir + weightedFinder.count = test.currCount + weightedFinder.totalWeight = 100 + weightedFinder.Record(test.recordSpan, test.weight) + if !reflect.DeepEqual(weightedFinder.samples, test.expectedReservoir) { + t.Errorf( + "%d: expected reservoir: %v, but got reservoir: %v", + i, test.expectedReservoir, weightedFinder.samples) + } + } +} + +func TestWeightedFinderNoSplitKeyCause(t *testing.T) { + samples := [splitKeySampleSize]weightedSample{} + for i, idx := range rand.Perm(splitKeySampleSize) { + if i < 7 { + // Insufficient counters. + samples[idx] = weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(i)), + weight: 1, + left: splitKeyMinCounter, + right: splitKeyMinCounter, + count: splitKeyMinCounter - 1, + } + } else { + // Imbalance counters. + deviationLeft := 5 * rand.Float64() + deviationRight := 5 * rand.Float64() + samples[idx] = weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(i)), + weight: 1, + left: 75 + deviationLeft, + right: 45 - deviationRight, + count: splitKeyMinCounter, + } + } + } + + randSource := rand.New(rand.NewSource(2022)) + weightedFinder := NewWeightedFinder(timeutil.Now(), randSource) + weightedFinder.samples = samples + insufficientCounters, imbalance := weightedFinder.noSplitKeyCause() + assert.Equal(t, 7, insufficientCounters, "unexpected insufficient counters") + assert.Equal(t, 13, imbalance, "unexpected imbalance counters") +} + +func TestWeightedFinderPopularKeyFrequency(t *testing.T) { + uniqueKeyUnweightedSample := [splitKeySampleSize]weightedSample{} + for i, idx := range rand.Perm(splitKeySampleSize) { + uniqueKeyUnweightedSample[idx] = weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(i)), + weight: 1, + } + } + uniqueKeyWeightedSample := [splitKeySampleSize]weightedSample{} + for i, idx := range rand.Perm(splitKeySampleSize) { + uniqueKeyWeightedSample[idx] = weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(i)), + weight: float64(i + 1), + } + } + duplicateKeyUnweightedSample := [splitKeySampleSize]weightedSample{} + for i, idx := range rand.Perm(splitKeySampleSize) { + var tableID uint32 + if i < 8 || i >= 13 { + tableID = uint32(i / 4) + } else { + tableID = 2 + } + duplicateKeyUnweightedSample[idx] = weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(tableID), + weight: 1, + } + } + duplicateKeyWeightedSample := [splitKeySampleSize]weightedSample{} + for i, idx := range rand.Perm(splitKeySampleSize) { + var tableID uint32 + if i < 8 || i >= 15 { + tableID = uint32(i / 4) + } else { + tableID = 2 + } + duplicateKeyWeightedSample[idx] = weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(tableID), + weight: float64(i + 1), + } + } + sameKeySample := [splitKeySampleSize]weightedSample{} + for i, idx := range rand.Perm(splitKeySampleSize) { + sameKeySample[idx] = weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(0), + weight: float64(i), + } + } + + const eps = 1e-3 + testCases := []struct { + samples [splitKeySampleSize]weightedSample + expectedPopularKeyFrequency float64 + }{ + {uniqueKeyUnweightedSample, 1.0 / 20.0}, + {uniqueKeyWeightedSample, 20.0 / 210.0}, // 20/(1+2+...+20) + {duplicateKeyUnweightedSample, 5.0 / 20.0}, + {duplicateKeyWeightedSample, 84.0 / 210.0}, // (9+10+...+15)/(1+2+...+20) + {sameKeySample, 1}, + } + + randSource := rand.New(rand.NewSource(2022)) + for i, test := range testCases { + weightedFinder := NewWeightedFinder(timeutil.Now(), randSource) + weightedFinder.samples = test.samples + popularKeyFrequency := weightedFinder.PopularKeyFrequency() + assert.True(t, math.Abs(test.expectedPopularKeyFrequency-popularKeyFrequency) < eps, + "%d: expected popular key frequency %f, got %f", + i, test.expectedPopularKeyFrequency, popularKeyFrequency) + } +}