From 11956723c8398839075bb303ebd407a64f320ca2 Mon Sep 17 00:00:00 2001 From: Kai Sun Date: Thu, 15 Dec 2022 23:35:37 -0500 Subject: [PATCH] split: Redesign the load-based splitter to be consistent with new rebalancing signals. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes: #90574 In the current load splitter, we find the split key that best balances the QPS of the left and right sides. As a result, each request is unweighted, since one request contributes one to the QPS. In particular, the load splitter does not differentiate between what kinds of requests they are, how heavy the request is, and what resources these requests consume, which can result in scenarios where QPS is balanced but one side has a lot more work due to a few heavy requests. Moreover, the current load splitter treats requests that contain a split key as “contained”. Optimizing for QPS, contained requests are bad since splitting at a point in a contained request will not help lower the QPS of either side. However, optimizing for other signals like CPU, splitting at a point in a contained request is great as each side will get part of the work of processing that request. This motivates a redesign of the load splitter, one that enables recording weighted requests and considers contained requests in the weight balancing for splitting. In this PR, we redesign the load-based splitter with the following interface: 1. Record a point key “start” or span “[start, end)” with a weight “w” at a specific time “ts”, where “w” is some measure of load recorded for a span e.g. Record(ts, start, w) or Record(ts, [start, end), w) 2. Find a split key such that the load (i.e. total weight) on the resulting split ranges would be as equal as possible according to the recorded loads above e.g. Key() To make the current load-based splitter (Finder) weighted, we make the following modifications: 1. Instead of using reservoir sampling, we use weighted reservoir sampling (a simplified version of A-Chao) 2. Remove the contained counter 3. Increment the left and right counters by the weight of the request rather than just 1 4. Treat a weighted range request ([start, end), w) into two weighted point requests (start, w/2) and (end, w/2) For more details, see the design doc: https://docs.google.com/document/d/1bdSxucz-xFzwnxL3fFXNZsRc9Vsht0oO0XuZrz5Iw84/edit#bookmark=id.xjc41tm3jx3x. Release note (ops change): The load-based splitter has been redesigned to be more consistent with CPU-based rebalancing rather than QPS-based rebalancing to improve range splits. --- .../testdata/explain-bundle/bundle/env.sql | 1 + pkg/kv/kvserver/BUILD.bazel | 1 + pkg/kv/kvserver/asim/state/BUILD.bazel | 1 + pkg/kv/kvserver/asim/state/split_decider.go | 11 +- pkg/kv/kvserver/replica_init.go | 5 +- pkg/kv/kvserver/split/BUILD.bazel | 8 +- pkg/kv/kvserver/split/decider.go | 65 ++- pkg/kv/kvserver/split/decider_test.go | 30 +- .../split/{finder.go => deprecated_finder.go} | 89 ++-- ...nder_test.go => deprecated_finder_test.go} | 25 +- .../split/load_based_splitter_test.go | 159 +++++-- pkg/kv/kvserver/split/weighted_finder.go | 282 ++++++++++++ pkg/kv/kvserver/split/weighted_finder_test.go | 413 ++++++++++++++++++ 13 files changed, 948 insertions(+), 142 deletions(-) rename pkg/kv/kvserver/split/{finder.go => deprecated_finder.go} (74%) rename pkg/kv/kvserver/split/{finder_test.go => deprecated_finder_test.go} (95%) create mode 100644 pkg/kv/kvserver/split/weighted_finder.go create mode 100644 pkg/kv/kvserver/split/weighted_finder_test.go diff --git a/pkg/cli/testdata/explain-bundle/bundle/env.sql b/pkg/cli/testdata/explain-bundle/bundle/env.sql index 37896e985e24..57ce914720e8 100644 --- a/pkg/cli/testdata/explain-bundle/bundle/env.sql +++ b/pkg/cli/testdata/explain-bundle/bundle/env.sql @@ -95,6 +95,7 @@ -- kv.closed_timestamp.side_transport_interval = 200ms (the interval at which the closed-timestamp side-transport attempts to advance each range's closed timestamp; set to 0 to disable the side-transport) -- kv.closed_timestamp.target_duration = 3s (if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration) -- kv.concurrency.optimistic_eval_limited_scans.enabled = true (when true, limited scans are optimistically evaluated in the sense of not checking for conflicting latches or locks up front for the full key range of the scan, and instead subsequently checking for conflicts only over the key range that was read) +-- kv.deprecated_lb_split_finder.enabled = false (if enabled, use the deprecated finder for load-based splitting) -- kv.dist_sender.concurrency_limit = 2048 (maximum number of asynchronous send requests) -- kv.follower_read.target_multiple = 3 (if above 1, encourages the distsender to perform a read against the closest replica if a request is older than kv.closed_timestamp.target_duration * (1 + kv.closed_timestamp.close_fraction * this) less a clock uncertainty interval. This value also is used to create follower_timestamp().) -- kv.gc.intent_age_threshold = 2h0m0s (intents older than this threshold will be resolved when encountered by the GC queue) diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 51f5c8e2bd1b..bc73589c6175 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -213,6 +213,7 @@ go_library( "@io_etcd_go_raft_v3//tracker", "@io_opentelemetry_go_otel//attribute", "@org_golang_google_grpc//:go_default_library", + "@org_golang_x_exp//rand", "@org_golang_x_time//rate", ], ) diff --git a/pkg/kv/kvserver/asim/state/BUILD.bazel b/pkg/kv/kvserver/asim/state/BUILD.bazel index 4de663f39177..0c9400cdfb0e 100644 --- a/pkg/kv/kvserver/asim/state/BUILD.bazel +++ b/pkg/kv/kvserver/asim/state/BUILD.bazel @@ -36,6 +36,7 @@ go_library( "@com_github_google_btree//:btree", "@io_etcd_go_raft_v3//:raft", "@io_etcd_go_raft_v3//tracker", + "@org_golang_x_exp//rand", ], ) diff --git a/pkg/kv/kvserver/asim/state/split_decider.go b/pkg/kv/kvserver/asim/state/split_decider.go index f03ec4b4c9b0..86b532737943 100644 --- a/pkg/kv/kvserver/asim/state/split_decider.go +++ b/pkg/kv/kvserver/asim/state/split_decider.go @@ -12,13 +12,13 @@ package state import ( "context" - "math/rand" "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/metric" + "golang.org/x/exp/rand" ) // LoadSplitter provides an abstraction for load based splitting. It records @@ -61,14 +61,9 @@ func NewSplitDecider( } func (s *SplitDecider) newDecider() *split.Decider { - rand := rand.New(rand.NewSource(s.seed)) - - intN := func(n int) int { - return rand.Intn(n) - } - + rand := rand.New(rand.NewSource(uint64(s.seed))) decider := &split.Decider{} - split.Init(decider, intN, s.qpsThreshold, s.qpsRetention, &split.LoadSplitterMetrics{ + split.Init(decider, nil, rand, s.qpsThreshold, s.qpsRetention, &split.LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 60b5e4ef6fa8..3c1841d71e2f 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -13,7 +13,6 @@ package kvserver import ( "bytes" "context" - "math/rand" "time" "github.com/cockroachdb/cockroach/pkg/keys" @@ -34,6 +33,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "go.etcd.io/raft/v3" + "golang.org/x/exp/rand" ) const ( @@ -95,7 +95,8 @@ func newUnloadedReplica( r.mu.stateLoader = stateloader.Make(desc.RangeID) r.mu.quiescent = true r.mu.conf = store.cfg.DefaultSpanConfig - split.Init(&r.loadBasedSplitter, rand.Intn, func() float64 { + randSource := rand.New(rand.NewSource(2022)) + split.Init(&r.loadBasedSplitter, store.cfg.Settings, randSource, func() float64 { return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV)) }, func() time.Duration { return kvserverbase.SplitByLoadMergeDelay.Get(&store.cfg.Settings.SV) diff --git a/pkg/kv/kvserver/split/BUILD.bazel b/pkg/kv/kvserver/split/BUILD.bazel index 8ac9fb509f34..2c1046cbd801 100644 --- a/pkg/kv/kvserver/split/BUILD.bazel +++ b/pkg/kv/kvserver/split/BUILD.bazel @@ -5,13 +5,16 @@ go_library( name = "split", srcs = [ "decider.go", - "finder.go", + "deprecated_finder.go", + "weighted_finder.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split", visibility = ["//visibility:public"], deps = [ "//pkg/keys", "//pkg/roachpb", + "//pkg/settings", + "//pkg/settings/cluster", "//pkg/util/log", "//pkg/util/metric", "//pkg/util/syncutil", @@ -24,8 +27,9 @@ go_test( size = "small", srcs = [ "decider_test.go", - "finder_test.go", + "deprecated_finder_test.go", "load_based_splitter_test.go", + "weighted_finder_test.go", ], args = ["-test.timeout=55s"], embed = [":split"], diff --git a/pkg/kv/kvserver/split/decider.go b/pkg/kv/kvserver/split/decider.go index ce7d54e6c3fd..cb072bfa6696 100644 --- a/pkg/kv/kvserver/split/decider.go +++ b/pkg/kv/kvserver/split/decider.go @@ -14,19 +14,54 @@ package split import ( "context" + "fmt" "time" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "golang.org/x/exp/rand" ) const minSplitSuggestionInterval = time.Minute const minNoSplitKeyLoggingMetricsInterval = time.Minute const minQueriesPerSecondSampleDuration = time.Second +type LoadBasedSplitter interface { + // Record informs the LoadBasedSplitter about where the span lies with regard + // to the keys in the samples. + Record(span roachpb.Span, weight float64) + + // Key finds an appropriate split point from the sampled candidate split + // keys. Returns a nil key if no appropriate key was found. + Key() roachpb.Key + + // Ready checks if the LoadBasedSplitter has been initialized with a + // sufficient sample duration. + Ready(nowTime time.Time) bool + + // NoSplitKeyCauseLogMsg returns a log message containing information on the + // number of samples that don't pass each split key requirement if not all + // samples are invalid due to insufficient counters, otherwise returns an + // empty string. + NoSplitKeyCauseLogMsg() string + + // PopularKeyFrequency returns the percentage that the most popular key + // appears in the sampled candidate split keys. + PopularKeyFrequency() float64 +} + +var enableDeprecatedLBSplitFinder = settings.RegisterBoolSetting( + settings.TenantWritable, + "kv.deprecated_lb_split_finder.enabled", + "if enabled, use the deprecated finder for load-based splitting", + true, +) + // A Decider collects measurements about the activity (measured in qps) on a // Replica and, assuming that qps thresholds are exceeded, tries to determine a // split key that would approximately result in halving the load on each of the @@ -63,7 +98,8 @@ type LoadSplitterMetrics struct { // incoming requests to find potential split keys and checks if sampled // candidate split keys satisfy certain requirements. type Decider struct { - intn func(n int) int // supplied to Init + st *cluster.Settings // supplied to Init + randSource *rand.Rand // supplied to Init qpsThreshold func() float64 // supplied to Init qpsRetention func() time.Duration // supplied to Init loadSplitterMetrics *LoadSplitterMetrics // supplied to Init @@ -80,8 +116,8 @@ type Decider struct { maxQPS maxQPSTracker // Fields tracking split key suggestions. - splitFinder *Finder // populated when engaged or decided - lastSplitSuggestion time.Time // last stipulation to client to carry out split + splitFinder LoadBasedSplitter // populated when engaged or decided and kv.deprecated_lb_split_finder.enabled is false + lastSplitSuggestion time.Time // last stipulation to client to carry out split // Fields tracking logging / metrics around load-based splitter split key. lastNoSplitKeyLoggingMetrics time.Time @@ -94,12 +130,14 @@ type Decider struct { // may exist in the system at any given point in time. func Init( lbs *Decider, - intn func(n int) int, + st *cluster.Settings, + randSource *rand.Rand, qpsThreshold func() float64, qpsRetention func() time.Duration, loadSplitterMetrics *LoadSplitterMetrics, ) { - lbs.intn = intn + lbs.st = st + lbs.randSource = randSource lbs.qpsThreshold = qpsThreshold lbs.qpsRetention = qpsRetention lbs.loadSplitterMetrics = loadSplitterMetrics @@ -147,7 +185,11 @@ func (d *Decider) recordLocked( // to be used. if d.mu.lastQPS >= d.qpsThreshold() { if d.mu.splitFinder == nil { - d.mu.splitFinder = NewFinder(now) + if d.st == nil || enableDeprecatedLBSplitFinder.Get(&d.st.SV) { + d.mu.splitFinder = NewDeprecatedFinder(now, d.randSource) + } else { + d.mu.splitFinder = NewWeightedFinder(now, d.randSource) + } } } else { d.mu.splitFinder = nil @@ -157,7 +199,7 @@ func (d *Decider) recordLocked( if d.mu.splitFinder != nil && n != 0 { s := span() if s.Key != nil { - d.mu.splitFinder.Record(span(), d.intn) + d.mu.splitFinder.Record(span(), 1) } if d.mu.splitFinder.Ready(now) { if d.mu.splitFinder.Key() != nil { @@ -168,16 +210,15 @@ func (d *Decider) recordLocked( } else { if now.Sub(d.mu.lastNoSplitKeyLoggingMetrics) > minNoSplitKeyLoggingMetricsInterval { d.mu.lastNoSplitKeyLoggingMetrics = now - insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained := d.mu.splitFinder.NoSplitKeyCause() - if insufficientCounters < splitKeySampleSize { + noSplitKeyCauseLogMsg := d.mu.splitFinder.NoSplitKeyCauseLogMsg() + if noSplitKeyCauseLogMsg != "" { popularKeyFrequency := d.mu.splitFinder.PopularKeyFrequency() + noSplitKeyCauseLogMsg += fmt.Sprintf(", most popular key occurs in %d%% of samples", int(popularKeyFrequency*100)) + log.KvDistribution.Infof(ctx, "%s", noSplitKeyCauseLogMsg) if popularKeyFrequency >= splitKeyThreshold { d.loadSplitterMetrics.PopularKeyCount.Inc(1) } d.loadSplitterMetrics.NoSplitKeyCount.Inc(1) - log.KvDistribution.Infof(ctx, - "No split key found: insufficient counters = %d, imbalance = %d, too many contained = %d, imbalance and too many contained = %d, most popular key occurs in %d%% of samples", - insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained, int(popularKeyFrequency*100)) } } } diff --git a/pkg/kv/kvserver/split/decider_test.go b/pkg/kv/kvserver/split/decider_test.go index 6db6a2e4223a..e95800ef861f 100644 --- a/pkg/kv/kvserver/split/decider_test.go +++ b/pkg/kv/kvserver/split/decider_test.go @@ -13,7 +13,6 @@ package split import ( "context" "math" - "math/rand" "testing" "time" @@ -24,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/exp/rand" ) func ms(i int) time.Time { @@ -37,10 +37,10 @@ func ms(i int) time.Time { func TestDecider(t *testing.T) { defer leaktest.AfterTest(t)() - intn := rand.New(rand.NewSource(12)).Intn + rand := rand.New(rand.NewSource(12)) var d Decider - Init(&d, intn, func() float64 { return 10.0 }, func() time.Duration { return 2 * time.Second }, &LoadSplitterMetrics{ + Init(&d, nil, rand, func() float64 { return 10.0 }, func() time.Duration { return 2 * time.Second }, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) @@ -96,7 +96,7 @@ func TestDecider(t *testing.T) { assert.Equal(t, ms(1200), d.mu.lastQPSRollover) assertMaxQPS(1099, 0, false) - var nilFinder *Finder + var nilFinder *DeprecatedFinder assert.Equal(t, nilFinder, d.mu.splitFinder) @@ -166,7 +166,7 @@ func TestDecider(t *testing.T) { } // ... which we verify by looking at its samples directly. - for _, sample := range d.mu.splitFinder.samples { + for _, sample := range d.mu.splitFinder.(*DeprecatedFinder).samples { assert.Equal(t, roachpb.Key("p"), sample.key) } @@ -196,10 +196,10 @@ func TestDecider(t *testing.T) { func TestDecider_MaxQPS(t *testing.T) { defer leaktest.AfterTest(t)() - intn := rand.New(rand.NewSource(11)).Intn + rand := rand.New(rand.NewSource(11)) var d Decider - Init(&d, intn, func() float64 { return 100.0 }, func() time.Duration { return 10 * time.Second }, &LoadSplitterMetrics{ + Init(&d, nil, rand, func() float64 { return 100.0 }, func() time.Duration { return 10 * time.Second }, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) @@ -242,10 +242,10 @@ func TestDecider_MaxQPS(t *testing.T) { func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) { defer leaktest.AfterTest(t)() - intn := rand.New(rand.NewSource(11)).Intn + rand := rand.New(rand.NewSource(11)) var d Decider - Init(&d, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ + Init(&d, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) @@ -278,10 +278,10 @@ func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) { func TestDeciderIgnoresEnsureSafeSplitKeyOnError(t *testing.T) { defer leaktest.AfterTest(t)() - intn := rand.New(rand.NewSource(11)).Intn + rand := rand.New(rand.NewSource(11)) var d Decider - Init(&d, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ + Init(&d, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) @@ -409,11 +409,11 @@ func TestMaxQPSTracker(t *testing.T) { func TestDeciderMetrics(t *testing.T) { defer leaktest.AfterTest(t)() - intn := rand.New(rand.NewSource(11)).Intn + rand := rand.New(rand.NewSource(11)) timeStart := 1000 var dPopular Decider - Init(&dPopular, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ + Init(&dPopular, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) @@ -435,7 +435,7 @@ func TestDeciderMetrics(t *testing.T) { // No split key, not popular key var dNotPopular Decider - Init(&dNotPopular, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ + Init(&dNotPopular, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) @@ -455,7 +455,7 @@ func TestDeciderMetrics(t *testing.T) { // No split key, all insufficient counters var dAllInsufficientCounters Decider - Init(&dAllInsufficientCounters, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ + Init(&dAllInsufficientCounters, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{ PopularKeyCount: metric.NewCounter(metric.Metadata{}), NoSplitKeyCount: metric.NewCounter(metric.Metadata{}), }) diff --git a/pkg/kv/kvserver/split/finder.go b/pkg/kv/kvserver/split/deprecated_finder.go similarity index 74% rename from pkg/kv/kvserver/split/finder.go rename to pkg/kv/kvserver/split/deprecated_finder.go index 2a93b6364b95..f98b8eeb7d07 100644 --- a/pkg/kv/kvserver/split/finder.go +++ b/pkg/kv/kvserver/split/deprecated_finder.go @@ -12,6 +12,7 @@ package split import ( "bytes" + "fmt" "math" "sort" "time" @@ -28,7 +29,7 @@ import ( // - Disengage when a range no longer meets the criteria // - During split: // - Record start time -// - Keep a sample of 10 keys +// - Keep a sample of 20 keys // - Each sample contains three counters: left, right and contained. // - On each span, increment the left and/or right counters, depending // on whether the span falls entirely to the left, to the right. @@ -59,30 +60,32 @@ type sample struct { left, right, contained int } -// Finder is a structure that is used to determine the split point +// DeprecatedFinder is a structure that is used to determine the split point // using the Reservoir Sampling method. -type Finder struct { - startTime time.Time - samples [splitKeySampleSize]sample - count int +type DeprecatedFinder struct { + startTime time.Time + randSource RandSource + samples [splitKeySampleSize]sample + count int } -// NewFinder initiates a Finder with the given time. -func NewFinder(startTime time.Time) *Finder { - return &Finder{ - startTime: startTime, +// NewDeprecatedFinder initiates a DeprecatedFinder with the given time. +func NewDeprecatedFinder(startTime time.Time, randSource RandSource) *DeprecatedFinder { + rand.Float64() + return &DeprecatedFinder{ + startTime: startTime, + randSource: randSource, } } -// Ready checks if the Finder has been initialized with a sufficient -// sample duration. -func (f *Finder) Ready(nowTime time.Time) bool { +// Ready implements the LoadBasedSplitter interface. +func (f *DeprecatedFinder) Ready(nowTime time.Time) bool { return nowTime.Sub(f.startTime) > RecordDurationThreshold } -// Record informs the Finder about where the span lies with -// regard to the keys in the samples. -func (f *Finder) Record(span roachpb.Span, intNFn func(int) int) { +// Record implements the LoadBasedSplitter interface. Record uses reservoir +// sampling to get the candidate split keys. +func (f *DeprecatedFinder) Record(span roachpb.Span, weight float64) { if f == nil { return } @@ -92,7 +95,7 @@ func (f *Finder) Record(span roachpb.Span, intNFn func(int) int) { f.count++ if count < splitKeySampleSize { idx = count - } else if idx = intNFn(count); idx >= splitKeySampleSize { + } else if idx = f.randSource.Intn(count); idx >= splitKeySampleSize { // Increment all existing keys' counters. for i := range f.samples { if span.ProperlyContainsKey(f.samples[i].key) { @@ -120,9 +123,12 @@ func (f *Finder) Record(span roachpb.Span, intNFn func(int) int) { f.samples[idx] = sample{key: span.Key} } -// Key finds an appropriate split point based on the Reservoir sampling method. -// Returns a nil key if no appropriate key was found. -func (f *Finder) Key() roachpb.Key { +// Key implements the LoadBasedSplitter interface. Key returns the candidate +// split key that minimizes the sum of the balance score (percentage difference +// between the left and right counters) and the contained score (percentage of +// counters are contained), provided the balance score is < 0.25 and the +// contained score is < 0.5. +func (f *DeprecatedFinder) Key() roachpb.Key { if f == nil { return nil } @@ -156,7 +162,7 @@ func (f *Finder) Key() roachpb.Key { // determines the number of samples that don't pass each split key requirement // (e.g. insufficient counters, imbalance in left and right counters, too many // contained counters, or a combination of the last two). -func (f *Finder) NoSplitKeyCause() ( +func (f *DeprecatedFinder) noSplitKeyCause() ( insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained int, ) { for _, s := range f.samples { @@ -179,9 +185,20 @@ func (f *Finder) NoSplitKeyCause() ( return } -// PopularKeyFrequency returns the percentage that the most popular key appears -// in f.samples. -func (f *Finder) PopularKeyFrequency() float64 { +// NoSplitKeyCauseLogMsg implements the LoadBasedSplitter interface. +func (f *DeprecatedFinder) NoSplitKeyCauseLogMsg() string { + insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained := f.noSplitKeyCause() + if insufficientCounters == splitKeySampleSize { + return "" + } + noSplitKeyCauseLogMsg := fmt.Sprintf( + "No split key found: insufficient counters = %d, imbalance = %d, too many contained = %d, imbalance and too many contained = %d", + insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained) + return noSplitKeyCauseLogMsg +} + +// PopularKeyFrequency implements the LoadBasedSplitter interface. +func (f *DeprecatedFinder) PopularKeyFrequency() float64 { sort.Slice(f.samples[:], func(i, j int) bool { return bytes.Compare(f.samples[i].key, f.samples[j].key) < 0 }) @@ -201,27 +218,3 @@ func (f *Finder) PopularKeyFrequency() float64 { return float64(popularKeyCount) / float64(splitKeySampleSize) } - -// TestFinder is a wrapper of Finder compatible with the load-based splitter -// testing framework. -type TestFinder struct { - f Finder - randSource *rand.Rand -} - -// NewTestFinder initiates a TestFinder with a random source. -func NewTestFinder(randSource *rand.Rand) *TestFinder { - return &TestFinder{ - randSource: randSource, - } -} - -// Record records the span, ignoring weight as this Finder is unweighted. -func (tf *TestFinder) Record(span roachpb.Span, weight float32) { - tf.f.Record(span, tf.randSource.Intn) -} - -// Key finds a split key. -func (tf *TestFinder) Key() roachpb.Key { - return tf.f.Key() -} diff --git a/pkg/kv/kvserver/split/finder_test.go b/pkg/kv/kvserver/split/deprecated_finder_test.go similarity index 95% rename from pkg/kv/kvserver/split/finder_test.go rename to pkg/kv/kvserver/split/deprecated_finder_test.go index 0afd1844fc4c..96ad62a28cf4 100644 --- a/pkg/kv/kvserver/split/finder_test.go +++ b/pkg/kv/kvserver/split/deprecated_finder_test.go @@ -151,8 +151,9 @@ func TestSplitFinderKey(t *testing.T) { {multipleSpanReservoir, keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset + splitKeySampleSize/2)}, } + randSource := rand.New(rand.NewSource(2022)) for i, test := range testCases { - finder := NewFinder(timeutil.Now()) + finder := NewDeprecatedFinder(timeutil.Now(), randSource) finder.samples = test.reservoir if splitByLoadKey := finder.Key(); !bytes.Equal(splitByLoadKey, test.splitByLoadKey) { t.Errorf( @@ -261,11 +262,12 @@ func TestSplitFinderRecorder(t *testing.T) { {spanningSpan, getLargest, splitKeySampleSize + 1, spanningReservoir, expectedSpanningReservoir}, } + randSource := rand.New(rand.NewSource(2022)) for i, test := range testCases { - finder := NewFinder(timeutil.Now()) + finder := NewDeprecatedFinder(timeutil.Now(), randSource) finder.samples = test.currReservoir finder.count = test.currCount - finder.Record(test.recordSpan, test.intNFn) + finder.Record(test.recordSpan, 1) if !reflect.DeepEqual(finder.samples, test.expectedReservoir) { t.Errorf( "%d: expected reservoir: %v, but got reservoir: %v", @@ -278,7 +280,7 @@ func TestFinderNoSplitKeyCause(t *testing.T) { samples := [splitKeySampleSize]sample{} for i, idx := range rand.Perm(splitKeySampleSize) { if i < 5 { - // insufficient counters + // Insufficient counters. samples[idx] = sample{ key: keys.SystemSQLCodec.TablePrefix(uint32(i)), left: 0, @@ -286,7 +288,7 @@ func TestFinderNoSplitKeyCause(t *testing.T) { contained: splitKeyMinCounter - 1, } } else if i < 7 { - // imbalance + // Imbalance and too many contained counters. deviationLeft := rand.Intn(5) deviationRight := rand.Intn(5) samples[idx] = sample{ @@ -296,7 +298,7 @@ func TestFinderNoSplitKeyCause(t *testing.T) { contained: int(max(float64(splitKeyMinCounter-40-deviationLeft+deviationRight), float64(40+deviationLeft-deviationRight))), } } else if i < 13 { - // imbalance + // Imbalance counters. deviationLeft := rand.Intn(5) deviationRight := rand.Intn(5) samples[idx] = sample{ @@ -306,7 +308,7 @@ func TestFinderNoSplitKeyCause(t *testing.T) { contained: int(max(float64(splitKeyMinCounter-80-deviationLeft+deviationRight), 0)), } } else { - // too many contained + // Too many contained counters. contained := int(splitKeyMinCounter*splitKeyContainedThreshold + 1) left := (splitKeyMinCounter - contained) / 2 samples[idx] = sample{ @@ -318,9 +320,10 @@ func TestFinderNoSplitKeyCause(t *testing.T) { } } - finder := NewFinder(timeutil.Now()) + randSource := rand.New(rand.NewSource(2022)) + finder := NewDeprecatedFinder(timeutil.Now(), randSource) finder.samples = samples - insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained := finder.NoSplitKeyCause() + insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained := finder.noSplitKeyCause() assert.Equal(t, 5, insufficientCounters, "unexpected insufficient counters") assert.Equal(t, 6, imbalance, "unexpected imbalance counters") assert.Equal(t, 7, tooManyContained, "unexpected too many contained counters") @@ -392,8 +395,10 @@ func TestFinderPopularKeyFrequency(t *testing.T) { {fiftyFivePercentPopularKeySample, 0.55}, {sameKeySample, 1}, } + + randSource := rand.New(rand.NewSource(2022)) for i, test := range testCases { - finder := NewFinder(timeutil.Now()) + finder := NewDeprecatedFinder(timeutil.Now(), randSource) finder.samples = test.samples popularKeyFrequency := finder.PopularKeyFrequency() assert.Equal(t, test.expectedPopularKeyFrequency, popularKeyFrequency, "unexpected popular key frequency in test %d", i) diff --git a/pkg/kv/kvserver/split/load_based_splitter_test.go b/pkg/kv/kvserver/split/load_based_splitter_test.go index 06afe7a18522..3f1c8eff6750 100644 --- a/pkg/kv/kvserver/split/load_based_splitter_test.go +++ b/pkg/kv/kvserver/split/load_based_splitter_test.go @@ -15,7 +15,6 @@ import ( "fmt" "math" "sort" - "testing" "text/tabwriter" "time" @@ -23,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload/ycsb" - "github.com/stretchr/testify/require" "golang.org/x/exp/rand" ) @@ -77,20 +75,15 @@ import ( const ( zipfGenerator = 0 uniformGenerator = 1 - numIterations = 20 + numIterations = 200 ) -type loadBasedSplitter interface { - Record(span roachpb.Span, weight float32) - Key() roachpb.Key -} - type generator interface { Uint64() uint64 } type config struct { - lbs loadBasedSplitter + lbs LoadBasedSplitter startKeyGenerator generator spanLengthGenerator generator weightGenerator generator @@ -101,15 +94,15 @@ type config struct { type request struct { span roachpb.Span - weight float32 + weight float64 } type weightedKey struct { key uint32 - weight float32 + weight float64 } -type settings struct { +type lbsTestSettings struct { desc string startKeyGeneratorType int startKeyGeneratorIMax uint64 @@ -119,7 +112,7 @@ type settings struct { weightGeneratorIMax uint64 rangeRequestPercent float64 numRequests int - lbs func(*rand.Rand) loadBasedSplitter + lbs func(*rand.Rand) LoadBasedSplitter seed uint64 } @@ -127,15 +120,15 @@ func uint32ToKey(key uint32) roachpb.Key { return keys.SystemSQLCodec.TablePrefix(key) } -func generateRequests(config *config) ([]request, []weightedKey, float32) { - var totalWeight float32 +func generateRequests(config *config) ([]request, []weightedKey, float64) { + var totalWeight float64 requests := make([]request, 0, config.numRequests) weightedKeys := make([]weightedKey, 0, 2*config.numRequests) for i := 0; i < config.numRequests; i++ { startKey := uint32(config.startKeyGenerator.Uint64()) spanLength := uint32(config.spanLengthGenerator.Uint64()) - weight := float32(config.weightGenerator.Uint64()) + weight := float64(config.weightGenerator.Uint64()) var span roachpb.Span span.Key = uint32ToKey(startKey) @@ -167,25 +160,25 @@ func generateRequests(config *config) ([]request, []weightedKey, float32) { } func getOptimalKey( - weightedKeys []weightedKey, totalWeight float32, -) (optimalKey uint32, optimalLeftWeight, optimalRightWeight float32) { + weightedKeys []weightedKey, totalWeight float64, +) (optimalKey uint32, optimalLeftWeight, optimalRightWeight float64) { var optimalKeyPtr *uint32 - var leftWeight float32 + var leftWeight float64 sort.Slice(weightedKeys, func(i, j int) bool { return weightedKeys[i].key < weightedKeys[j].key }) - for _, weightedKey := range weightedKeys { + for i := range weightedKeys { rightWeight := totalWeight - leftWeight // Find the split key that results in the smallest difference between the // total weight of keys on the right side and the total weight of keys on // the left side. if optimalKeyPtr == nil || - math.Abs(float64(rightWeight-leftWeight)) < math.Abs(float64(optimalRightWeight-optimalLeftWeight)) { - optimalKeyPtr = &weightedKey.key + math.Abs(rightWeight-leftWeight) < math.Abs(optimalRightWeight-optimalLeftWeight) { + optimalKeyPtr = &weightedKeys[i].key optimalLeftWeight = leftWeight optimalRightWeight = rightWeight } - leftWeight += weightedKey.weight + leftWeight += weightedKeys[i].weight } optimalKey = *optimalKeyPtr return @@ -195,7 +188,7 @@ func getKey( config *config, requests []request, weightedKeys []weightedKey, ) ( key uint32, - leftWeight, rightWeight float32, + leftWeight, rightWeight float64, recordExecutionTime, keyExecutionTime time.Duration, ) { recordStart := timeutil.Now() @@ -222,7 +215,7 @@ func runTest( config *config, ) ( key, optimalKey uint32, - leftWeight, rightWeight, optimalLeftWeight, optimalRightWeight float32, + leftWeight, rightWeight, optimalLeftWeight, optimalRightWeight float64, recordExecutionTime, keyExecutionTime time.Duration, ) { requests, weightedKeys, totalWeight := generateRequests(config) @@ -231,7 +224,7 @@ func runTest( return } -func newGenerator(t *testing.T, randSource *rand.Rand, generatorType int, iMax uint64) generator { +func newGenerator(randSource *rand.Rand, generatorType int, iMax uint64) generator { var g generator var err error if generatorType == zipfGenerator { @@ -239,22 +232,24 @@ func newGenerator(t *testing.T, randSource *rand.Rand, generatorType int, iMax u } else if generatorType == uniformGenerator { g, err = ycsb.NewUniformGenerator(randSource, 1, iMax) } else { - require.Error(t, nil, "generatorType must be zipfGenerator or uniformGenerator") + panic("generatorType must be zipfGenerator or uniformGenerator") + } + if err != nil { + panic(err) } - require.NoError(t, err) return g } func runTestRepeated( - t *testing.T, settings *settings, + settings *lbsTestSettings, ) ( - avgPercentDifference, maxPercentDifference, avgOptimalPercentDifference, maxOptimalPercentDifference float32, + avgPercentDifference, maxPercentDifference, avgOptimalPercentDifference, maxOptimalPercentDifference float64, avgRecordExecutionTime, avgKeyExecutionTime time.Duration, ) { randSource := rand.New(rand.NewSource(settings.seed)) - startKeyGenerator := newGenerator(t, randSource, settings.startKeyGeneratorType, settings.startKeyGeneratorIMax) - spanLengthGenerator := newGenerator(t, randSource, settings.spanLengthGeneratorType, settings.spanLengthGeneratorIMax) - weightGenerator := newGenerator(t, randSource, settings.weightGeneratorType, settings.weightGeneratorIMax) + startKeyGenerator := newGenerator(randSource, settings.startKeyGeneratorType, settings.startKeyGeneratorIMax) + spanLengthGenerator := newGenerator(randSource, settings.spanLengthGeneratorType, settings.spanLengthGeneratorIMax) + weightGenerator := newGenerator(randSource, settings.weightGeneratorType, settings.weightGeneratorIMax) for i := 0; i < numIterations; i++ { _, _, leftWeight, rightWeight, optimalLeftWeight, optimalRightWeight, recordExecutionTime, keyExecutionTime := runTest(&config{ lbs: settings.lbs(randSource), @@ -265,12 +260,12 @@ func runTestRepeated( numRequests: settings.numRequests, randSource: randSource, }) - percentDifference := float32(100 * math.Abs(float64(leftWeight-rightWeight)) / math.Abs(float64(leftWeight+rightWeight))) + percentDifference := 100 * math.Abs(leftWeight-rightWeight) / (leftWeight + rightWeight) avgPercentDifference += percentDifference if maxPercentDifference < percentDifference { maxPercentDifference = percentDifference } - optimalPercentDifference := float32(100 * math.Abs(float64(optimalLeftWeight-optimalRightWeight)) / math.Abs(float64(optimalLeftWeight+optimalRightWeight))) + optimalPercentDifference := 100 * math.Abs(optimalLeftWeight-optimalRightWeight) / (optimalLeftWeight + optimalRightWeight) avgOptimalPercentDifference += optimalPercentDifference if maxOptimalPercentDifference < optimalPercentDifference { maxOptimalPercentDifference = optimalPercentDifference @@ -280,12 +275,12 @@ func runTestRepeated( } avgRecordExecutionTime = time.Duration(avgRecordExecutionTime.Nanoseconds() / int64(numIterations)) avgKeyExecutionTime = time.Duration(avgKeyExecutionTime.Nanoseconds() / int64(numIterations)) - avgPercentDifference /= float32(numIterations) - avgOptimalPercentDifference /= float32(numIterations) + avgPercentDifference /= numIterations + avgOptimalPercentDifference /= numIterations return } -func runTestMultipleSettings(t *testing.T, settingsArr []settings) { +func runTestMultipleSettings(settingsArr []lbsTestSettings) { var buf bytes.Buffer w := tabwriter.NewWriter(&buf, 4, 0, 2, ' ', 0) _, _ = fmt.Fprintln(w, @@ -298,7 +293,7 @@ func runTestMultipleSettings(t *testing.T, settingsArr []settings) { "Avg Key Execution Time", ) for _, settings := range settingsArr { - avgPercentDifference, maxPercentDifference, avgOptimalPercentDifference, maxOptimalPercentDifference, avgRecordExecutionTime, avgKeyExecutionTime := runTestRepeated(t, &settings) + avgPercentDifference, maxPercentDifference, avgOptimalPercentDifference, maxOptimalPercentDifference, avgRecordExecutionTime, avgKeyExecutionTime := runTestRepeated(&settings) _, _ = fmt.Fprintf(w, "%s\t%f\t%f\t%f\t%f\t%s\t%s\n", settings.desc, avgPercentDifference, @@ -312,10 +307,10 @@ func runTestMultipleSettings(t *testing.T, settingsArr []settings) { fmt.Print(buf.String()) } -func TestUnweightedFinder(t *testing.T) { - runTestMultipleSettings(t, []settings{ +func ExampleDeprecatedFinder() { + runTestMultipleSettings([]lbsTestSettings{ { - desc: "Unweighted Finder", + desc: "DeprecatedFinder", startKeyGeneratorType: zipfGenerator, startKeyGeneratorIMax: 10000000000, spanLengthGeneratorType: uniformGenerator, @@ -323,11 +318,85 @@ func TestUnweightedFinder(t *testing.T) { weightGeneratorType: uniformGenerator, weightGeneratorIMax: 1, rangeRequestPercent: 0.95, - numRequests: 10000, - lbs: func(randSource *rand.Rand) loadBasedSplitter { - return NewTestFinder(randSource) + numRequests: 13000, + lbs: func(randSource *rand.Rand) LoadBasedSplitter { + return NewDeprecatedFinder(timeutil.Now(), randSource) }, seed: 2022, }, }) } + +func ExampleWeightedFinder() { + seed := uint64(2022) + lbs := func(randSource *rand.Rand) LoadBasedSplitter { + return NewWeightedFinder(timeutil.Now(), randSource) + } + runTestMultipleSettings([]lbsTestSettings{ + { + desc: "WeightedFinder/startIMax=10000000000/spanIMax=1000", + startKeyGeneratorType: zipfGenerator, + startKeyGeneratorIMax: 10000000000, + spanLengthGeneratorType: uniformGenerator, + spanLengthGeneratorIMax: 1000, + weightGeneratorType: uniformGenerator, + weightGeneratorIMax: 10, + rangeRequestPercent: 0.95, + numRequests: 10000, + lbs: lbs, + seed: seed, + }, + { + desc: "WeightedFinder/startIMax=100000/spanIMax=1000", + startKeyGeneratorType: zipfGenerator, + startKeyGeneratorIMax: 100000, + spanLengthGeneratorType: uniformGenerator, + spanLengthGeneratorIMax: 1000, + weightGeneratorType: uniformGenerator, + weightGeneratorIMax: 10, + rangeRequestPercent: 0.95, + numRequests: 10000, + lbs: lbs, + seed: seed, + }, + { + desc: "WeightedFinder/startIMax=1000/spanIMax=100", + startKeyGeneratorType: zipfGenerator, + startKeyGeneratorIMax: 1000, + spanLengthGeneratorType: uniformGenerator, + spanLengthGeneratorIMax: 100, + weightGeneratorType: uniformGenerator, + weightGeneratorIMax: 10, + rangeRequestPercent: 0.95, + numRequests: 10000, + lbs: lbs, + seed: seed, + }, + { + desc: "WeightedFinder/startIMax=100000/spanIMax=1000/point", + startKeyGeneratorType: zipfGenerator, + startKeyGeneratorIMax: 100000, + spanLengthGeneratorType: uniformGenerator, + spanLengthGeneratorIMax: 1000, + weightGeneratorType: uniformGenerator, + weightGeneratorIMax: 10, + rangeRequestPercent: 0, + numRequests: 10000, + lbs: lbs, + seed: seed, + }, + { + desc: "WeightedFinder/startIMax=10000000000/spanIMax=1000/unweighted", + startKeyGeneratorType: zipfGenerator, + startKeyGeneratorIMax: 10000000000, + spanLengthGeneratorType: uniformGenerator, + spanLengthGeneratorIMax: 1000, + weightGeneratorType: uniformGenerator, + weightGeneratorIMax: 1, + rangeRequestPercent: 0.95, + numRequests: 10000, + lbs: lbs, + seed: seed, + }, + }) +} diff --git a/pkg/kv/kvserver/split/weighted_finder.go b/pkg/kv/kvserver/split/weighted_finder.go new file mode 100644 index 000000000000..d8a59330cbaa --- /dev/null +++ b/pkg/kv/kvserver/split/weighted_finder.go @@ -0,0 +1,282 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package split + +import ( + "fmt" + "math" + "sort" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachpb" +) + +// Load-based splitting. +// +// - Engage split for ranges: +// - With size exceeding min-range-bytes +// - with reqs/s rate over a configurable threshold +// - Disengage when a range no longer meets the criteria +// - During split: +// - Record start time +// - Keep a sample of 20 keys using weighted reservoir sampling (a simplified +// version of A-Chao algorithm). For more information on A-Chao, see +// https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_A-Chao or +// https://arxiv.org/pdf/1012.0256.pdf. +// - Each sample contains two counters: left and right. +// - For a weighted point span (key, weight), record that as is; for a +// weighted ranged span ([key, endKey), weight), record that as two +// weighted point spans: (key, weight/2) and (endKey, weight/2). +// - On each weighted point span, increment the left and/or right counters by +// the weight, depending on whether the key falls to the left or to the +// right. +// - When a sample is replaced, discard its counters. +// - If a range is on for more than a threshold interval: +// - Examine sample for the smallest diff between left and right counters, +// excluding any whose counters are not sufficiently advanced; +// If not less than some constant threshold, skip split. +// - If a desired point is reached, add range to split queue with the chosen +// key as split key, and provide hint to scatter the replicas. + +type RandSource interface { + // Float64 returns, as a float64, a pseudo-random number in the half-open + // interval [0.0,1.0) from the RandSource. + Float64() float64 + + // Intn returns, as an int, a non-negative pseudo-random number in the + // half-open interval [0,n). + Intn(n int) int +} + +type weightedSample struct { + key roachpb.Key + weight float64 + left, right float64 + count int +} + +// WeightedFinder is a structure that is used to determine the split point +// using the Weighted Reservoir Sampling method (a simplified version of A-Chao +// algorithm). +type WeightedFinder struct { + samples [splitKeySampleSize]weightedSample + count int + totalWeight float64 + startTime time.Time + randSource RandSource +} + +// NewWeightedFinder initiates a WeightedFinder with the given time. +func NewWeightedFinder(startTime time.Time, randSource RandSource) *WeightedFinder { + return &WeightedFinder{ + startTime: startTime, + randSource: randSource, + } +} + +// Ready implements the LoadBasedSplitter interface. +func (f *WeightedFinder) Ready(nowTime time.Time) bool { + return nowTime.Sub(f.startTime) > RecordDurationThreshold +} + +// record informs the WeightedFinder about where the incoming point span (i.e. +// key) lies with regard to the candidate split keys. We use weighted reservoir +// sampling (a simplified version of A-Chao algorithm) to get the candidate +// split keys. +func (f *WeightedFinder) record(key roachpb.Key, weight float64) { + if f == nil { + return + } + + var idx int + count := f.count + f.count++ + f.totalWeight += weight + if count < splitKeySampleSize { + idx = count + } else if f.randSource.Float64() > splitKeySampleSize*weight/f.totalWeight { + for i := range f.samples { + // Example: Suppose we have candidate split key = "k" (i.e. + // f.samples[i].Key). + // + // Suppose we record the following weighted keys: + // record("i", 3) x Increment left counter by 3 + // record("k", 5) x Increment right counter by 5 + // record("l", 1) x Increment right counter by 1 + // |----|----|----|----| + // "i" "j" "k" "l" "m" + // ^ + // Candidate split key + // Left range split [ ) + // Right range split [ ) + if comp := key.Compare(f.samples[i].key); comp < 0 { + // Case key < f.samples[i].Key i.e. key is to the left of the candidate + // split key (left is exclusive to split key). + f.samples[i].left += weight + } else { + // Case key >= f.samples[i].Key i.e. key is to the right of or on the + // candidate split key (right is inclusive to split key). + f.samples[i].right += weight + } + f.samples[i].count++ + } + return + } else { + idx = f.randSource.Intn(splitKeySampleSize) + } + + // Note we always use the start key of the span. We could + // take the average of the byte slices, but that seems + // unnecessarily complex for practical usage. + f.samples[idx] = weightedSample{key: key, weight: weight} +} + +// Record implements the LoadBasedSplitter interface. +// +// Note that we treat a weighted range request ([start, end), w) as two +// weighted point requests (start, w/2) and (end, w/2). The motivation for this +// is that within the range [start, end), we do not know anything about the +// underlying data distribution without complex methods to retrieve such +// information. Moreover, we do not even know what keys are within this range +// since all keys are byte arrays, and consequently we have no concept of +// “distance” or “midpoint” between a pair of keys. The most basic approach is +// to be agnostic to the underlying data distribution and relative distance +// between keys, and simply assume that a weighted range request that contains +// a candidate split key will contribute half of its weight to the left counter +// and half of its weight to the right counter of that candidate split key. +func (f *WeightedFinder) Record(span roachpb.Span, weight float64) { + if span.EndKey == nil { + f.record(span.Key, weight) + } else { + f.record(span.Key, weight/2) + f.record(span.EndKey, weight/2) + } +} + +// Key implements the LoadBasedSplitter interface. Key returns the candidate +// split key that minimizes the balance score (percentage difference between +// the left and right counters), provided the balance score is < 0.25. +func (f *WeightedFinder) Key() roachpb.Key { + if f == nil { + return nil + } + + var bestIdx = -1 + var bestScore float64 = 1 + // For simplicity, we suppose splitKeyMinCounter = 5. + // + // Example 1 (numbers refer to weights of requests): + // 1 | + // 2 | + // | 1 + // | 1 + // | + // split key + // s.left = 3 + // s.right = 2 + // s.count = 4 + // Invalid split key because insufficient counters + // (s.count < splitKeyMinCounter). + // + // Example 2 (numbers refer to weights of requests): + // 1 | + // 2 | + // | 1 + // | 1 + // | 3 + // split key + // s.left = 3 + // s.right = 5 + // balance score = |3 - 5| / (3 + 5) = 0.25 + // Invalid split key because imbalance in left and right counters i.e. + // balance score >= splitKeyThreshold. + // + // Example 3: + // 1 | | + // 2 | | + // 2 | | + // | 1| + // | | 1 + // | | 6 + // sk1 + // sk2 + // balance score of sk1 = 0.23 + // balance score of sk2 = 0.08 + // We choose split key sk2 because it has the lowest balance score. + for i, s := range f.samples { + if s.count < splitKeyMinCounter { + continue + } + balanceScore := math.Abs(s.left-s.right) / (s.left + s.right) + if balanceScore >= splitKeyThreshold { + continue + } + if balanceScore < bestScore { + bestIdx = i + bestScore = balanceScore + } + } + + if bestIdx == -1 { + return nil + } + return f.samples[bestIdx].key +} + +// noSplitKeyCause iterates over all sampled candidate split keys and +// determines the number of samples that don't pass each split key requirement +// (e.g. insufficient counters, imbalance in left and right counters). +func (f *WeightedFinder) noSplitKeyCause() (insufficientCounters, imbalance int) { + for _, s := range f.samples { + if s.count < splitKeyMinCounter { + insufficientCounters++ + } else if balanceScore := math.Abs(s.left-s.right) / (s.left + s.right); balanceScore >= splitKeyThreshold { + imbalance++ + } + } + return +} + +// NoSplitKeyCauseLogMsg implements the LoadBasedSplitter interface. +func (f *WeightedFinder) NoSplitKeyCauseLogMsg() string { + insufficientCounters, imbalance := f.noSplitKeyCause() + if insufficientCounters == splitKeySampleSize { + return "" + } + noSplitKeyCauseLogMsg := fmt.Sprintf("No split key found: insufficient counters = %d, imbalance = %d", insufficientCounters, imbalance) + return noSplitKeyCauseLogMsg +} + +// PopularKeyFrequency implements the LoadBasedSplitter interface. +func (f *WeightedFinder) PopularKeyFrequency() float64 { + sort.Slice(f.samples[:], func(i, j int) bool { + return f.samples[i].key.Compare(f.samples[j].key) < 0 + }) + + weight := f.samples[0].weight + currentKeyWeight := weight + popularKeyWeight := weight + totalWeight := weight + for i := 1; i < len(f.samples); i++ { + weight := f.samples[i].weight + if f.samples[i].key.Equal(f.samples[i-1].key) { + currentKeyWeight += weight + } else { + currentKeyWeight = weight + } + if popularKeyWeight < currentKeyWeight { + popularKeyWeight = currentKeyWeight + } + totalWeight += weight + } + + return popularKeyWeight / totalWeight +} diff --git a/pkg/kv/kvserver/split/weighted_finder_test.go b/pkg/kv/kvserver/split/weighted_finder_test.go new file mode 100644 index 000000000000..093ff45ab89f --- /dev/null +++ b/pkg/kv/kvserver/split/weighted_finder_test.go @@ -0,0 +1,413 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package split + +import ( + "bytes" + "context" + "math" + "reflect" + "testing" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/assert" + "golang.org/x/exp/rand" +) + +type ZeroRandSource struct{} + +func (r ZeroRandSource) Float64() float64 { + return 0 +} + +func (r ZeroRandSource) Intn(int) int { + return 0 +} + +type LargestRandSource struct{} + +func (r LargestRandSource) Float64() float64 { + return 1 +} + +func (r LargestRandSource) Intn(int) int { + return 0 +} + +// TestSplitWeightedFinderKey verifies the Key() method correctly +// finds an appropriate split point for the range. +func TestSplitWeightedFinderKey(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop(context.Background()) + + const ReservoirKeyOffset = 1000 + + // Test an empty reservoir (reservoir without load). + basicReservoir := [splitKeySampleSize]weightedSample{} + + // Test reservoir with no load should have no splits. + noLoadReservoir := [splitKeySampleSize]weightedSample{} + for i := 0; i < splitKeySampleSize; i++ { + tempSample := weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(ReservoirKeyOffset + i)), + left: 0, + right: 0, + count: 0, + } + noLoadReservoir[i] = tempSample + } + + // Test a uniform reservoir. + uniformReservoir := [splitKeySampleSize]weightedSample{} + for i := 0; i < splitKeySampleSize; i++ { + tempSample := weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(ReservoirKeyOffset + i)), + left: splitKeyMinCounter, + right: splitKeyMinCounter, + count: splitKeyMinCounter, + } + uniformReservoir[i] = tempSample + } + + // Testing a non-uniform reservoir. + nonUniformReservoir := [splitKeySampleSize]weightedSample{} + for i := 0; i < splitKeySampleSize; i++ { + tempSample := weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(ReservoirKeyOffset + i)), + left: float64(splitKeyMinCounter * i), + right: float64(splitKeyMinCounter * (splitKeySampleSize - i)), + count: splitKeyMinCounter, + } + nonUniformReservoir[i] = tempSample + } + + // Test a load heavy reservoir on a single hot key (the last key). + singleHotKeyReservoir := [splitKeySampleSize]weightedSample{} + for i := 0; i < splitKeySampleSize; i++ { + tempSample := weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(ReservoirKeyOffset + i)), + left: 0, + right: splitKeyMinCounter, + count: splitKeyMinCounter, + } + singleHotKeyReservoir[i] = tempSample + } + + // Test a load heavy reservoir on multiple hot keys (first and last key). + multipleHotKeysReservoir := [splitKeySampleSize]weightedSample{} + for i := 0; i < splitKeySampleSize; i++ { + tempSample := weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(ReservoirKeyOffset + i)), + left: splitKeyMinCounter, + right: splitKeyMinCounter, + count: splitKeyMinCounter, + } + multipleHotKeysReservoir[i] = tempSample + } + multipleHotKeysReservoir[0].left = 0 + + // Test a spanning reservoir where splits shouldn't occur. + spanningReservoir := [splitKeySampleSize]weightedSample{} + for i := 0; i < splitKeySampleSize; i++ { + tempSample := weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(ReservoirKeyOffset + i)), + left: 75, + right: 45, + count: splitKeyMinCounter, + } + spanningReservoir[i] = tempSample + } + + // Test that splits happen in best balance score. + bestBalanceReservoir := [splitKeySampleSize]weightedSample{} + for i := 0; i < splitKeySampleSize; i++ { + tempSample := weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(ReservoirKeyOffset + i)), + left: 2.3 * splitKeyMinCounter, + right: 1.7 * splitKeyMinCounter, + count: splitKeyMinCounter, + } + bestBalanceReservoir[i] = tempSample + } + midSample := weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(ReservoirKeyOffset + splitKeySampleSize/2)), + left: 1.1 * splitKeyMinCounter, + right: 0.9 * splitKeyMinCounter, + count: splitKeyMinCounter, + } + bestBalanceReservoir[splitKeySampleSize/2] = midSample + + testCases := []struct { + reservoir [splitKeySampleSize]weightedSample + splitByLoadKey roachpb.Key + }{ + // Test an empty reservoir. + {basicReservoir, nil}, + // Test reservoir with no load should have no splits. + {noLoadReservoir, nil}, + // Test a uniform reservoir (Splits at the first key) + {uniformReservoir, keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset)}, + // Testing a non-uniform reservoir. + {nonUniformReservoir, keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset + splitKeySampleSize/2)}, + // Test a load heavy reservoir on a single hot key. Splitting can't help here. + {singleHotKeyReservoir, nil}, + // Test a load heavy reservoir on multiple hot keys. Splits between the hot keys. + {multipleHotKeysReservoir, keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset + 1)}, + // Test a spanning reservoir. Splitting will be bad here. Should avoid it. + {spanningReservoir, nil}, + // Test that splits happen between two heavy spans. + {bestBalanceReservoir, keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset + splitKeySampleSize/2)}, + } + + randSource := rand.New(rand.NewSource(2022)) + for i, test := range testCases { + weightedFinder := NewWeightedFinder(timeutil.Now(), randSource) + weightedFinder.samples = test.reservoir + if splitByLoadKey := weightedFinder.Key(); !bytes.Equal(splitByLoadKey, test.splitByLoadKey) { + t.Errorf( + "%d: expected splitByLoadKey: %v, but got splitByLoadKey: %v", + i, test.splitByLoadKey, splitByLoadKey) + } + } +} + +// TestSplitWeightedFinderRecorder verifies the Record() method correctly +// records a span. +func TestSplitWeightedFinderRecorder(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop(context.Background()) + + const ReservoirKeyOffset = 1000 + + // Test recording a key query before the reservoir is full. + basicReservoir := [splitKeySampleSize]weightedSample{} + basicSpan := roachpb.Span{ + Key: keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset), + EndKey: keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset + 1), + } + const basicWeight = 1 + expectedBasicReservoir := [splitKeySampleSize]weightedSample{} + expectedBasicReservoir[0] = weightedSample{ + key: basicSpan.Key, + weight: 0.5, + } + expectedBasicReservoir[1] = weightedSample{ + key: basicSpan.EndKey, + weight: 0.5, + } + + // Test recording a key query after the reservoir is full with replacement. + replacementReservoir := [splitKeySampleSize]weightedSample{} + for i := 0; i < splitKeySampleSize; i++ { + tempSample := weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(ReservoirKeyOffset + i)), + weight: 1, + left: 0, + right: 0, + } + replacementReservoir[i] = tempSample + } + replacementSpan := roachpb.Span{ + Key: keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset + splitKeySampleSize), + EndKey: keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset + splitKeySampleSize + 1), + } + const replacementWeight = 1 + expectedReplacementReservoir := replacementReservoir + expectedReplacementReservoir[0] = weightedSample{ + key: replacementSpan.EndKey, + weight: 0.5, + } + + // Test recording a key query after the reservoir is full without replacement. + fullReservoir := replacementReservoir + fullSpan := roachpb.Span{ + Key: keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset), + EndKey: keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset + 1), + } + const fullWeight = 1 + expectedFullReservoir := fullReservoir + for i := 0; i < splitKeySampleSize; i++ { + tempSample := weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(ReservoirKeyOffset + i)), + weight: 1, + left: 1, + right: 0, + count: 2, + } + expectedFullReservoir[i] = tempSample + } + expectedFullReservoir[0].left = 0 + expectedFullReservoir[0].right = 1 + expectedFullReservoir[1].left = 0.5 + expectedFullReservoir[1].right = 0.5 + + // Test recording a spanning query. + spanningReservoir := replacementReservoir + spanningSpan := roachpb.Span{ + Key: keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset - 1), + EndKey: keys.SystemSQLCodec.TablePrefix(ReservoirKeyOffset + splitKeySampleSize + 1), + } + const spanningWeight = 1 + expectedSpanningReservoir := spanningReservoir + for i := 0; i < splitKeySampleSize; i++ { + tempSample := weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(ReservoirKeyOffset + i)), + weight: 1, + left: 0.5, + right: 0.5, + count: 2, + } + expectedSpanningReservoir[i] = tempSample + } + + testCases := []struct { + recordSpan roachpb.Span + weight float64 + randSource RandSource + currCount int + currReservoir [splitKeySampleSize]weightedSample + expectedReservoir [splitKeySampleSize]weightedSample + }{ + // Test recording a key query before the reservoir is full. + {basicSpan, basicWeight, LargestRandSource{}, 0, basicReservoir, expectedBasicReservoir}, + // Test recording a key query after the reservoir is full with replacement. + {replacementSpan, replacementWeight, ZeroRandSource{}, splitKeySampleSize + 1, replacementReservoir, expectedReplacementReservoir}, + // Test recording a key query after the reservoir is full without replacement. + {fullSpan, fullWeight, LargestRandSource{}, splitKeySampleSize + 1, fullReservoir, expectedFullReservoir}, + // Test recording a spanning query. + {spanningSpan, spanningWeight, LargestRandSource{}, splitKeySampleSize + 1, spanningReservoir, expectedSpanningReservoir}, + } + + for i, test := range testCases { + weightedFinder := NewWeightedFinder(timeutil.Now(), test.randSource) + weightedFinder.samples = test.currReservoir + weightedFinder.count = test.currCount + weightedFinder.totalWeight = 100 + weightedFinder.Record(test.recordSpan, test.weight) + if !reflect.DeepEqual(weightedFinder.samples, test.expectedReservoir) { + t.Errorf( + "%d: expected reservoir: %v, but got reservoir: %v", + i, test.expectedReservoir, weightedFinder.samples) + } + } +} + +func TestWeightedFinderNoSplitKeyCause(t *testing.T) { + samples := [splitKeySampleSize]weightedSample{} + for i, idx := range rand.Perm(splitKeySampleSize) { + if i < 7 { + // Insufficient counters. + samples[idx] = weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(i)), + weight: 1, + left: splitKeyMinCounter, + right: splitKeyMinCounter, + count: splitKeyMinCounter - 1, + } + } else { + // Imbalance counters. + deviationLeft := 5 * rand.Float64() + deviationRight := 5 * rand.Float64() + samples[idx] = weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(i)), + weight: 1, + left: 75 + deviationLeft, + right: 45 - deviationRight, + count: splitKeyMinCounter, + } + } + } + + randSource := rand.New(rand.NewSource(2022)) + weightedFinder := NewWeightedFinder(timeutil.Now(), randSource) + weightedFinder.samples = samples + insufficientCounters, imbalance := weightedFinder.noSplitKeyCause() + assert.Equal(t, 7, insufficientCounters, "unexpected insufficient counters") + assert.Equal(t, 13, imbalance, "unexpected imbalance counters") +} + +func TestWeightedFinderPopularKeyFrequency(t *testing.T) { + uniqueKeyUnweightedSample := [splitKeySampleSize]weightedSample{} + for i, idx := range rand.Perm(splitKeySampleSize) { + uniqueKeyUnweightedSample[idx] = weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(i)), + weight: 1, + } + } + uniqueKeyWeightedSample := [splitKeySampleSize]weightedSample{} + for i, idx := range rand.Perm(splitKeySampleSize) { + uniqueKeyWeightedSample[idx] = weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(uint32(i)), + weight: float64(i + 1), + } + } + duplicateKeyUnweightedSample := [splitKeySampleSize]weightedSample{} + for i, idx := range rand.Perm(splitKeySampleSize) { + var tableID uint32 + if i < 8 || i >= 13 { + tableID = uint32(i / 4) + } else { + tableID = 2 + } + duplicateKeyUnweightedSample[idx] = weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(tableID), + weight: 1, + } + } + duplicateKeyWeightedSample := [splitKeySampleSize]weightedSample{} + for i, idx := range rand.Perm(splitKeySampleSize) { + var tableID uint32 + if i < 8 || i >= 15 { + tableID = uint32(i / 4) + } else { + tableID = 2 + } + duplicateKeyWeightedSample[idx] = weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(tableID), + weight: float64(i + 1), + } + } + sameKeySample := [splitKeySampleSize]weightedSample{} + for i, idx := range rand.Perm(splitKeySampleSize) { + sameKeySample[idx] = weightedSample{ + key: keys.SystemSQLCodec.TablePrefix(0), + weight: float64(i), + } + } + + const eps = 1e-3 + testCases := []struct { + samples [splitKeySampleSize]weightedSample + expectedPopularKeyFrequency float64 + }{ + {uniqueKeyUnweightedSample, 1.0 / 20.0}, + {uniqueKeyWeightedSample, 20.0 / 210.0}, // 20/(1+2+...+20) + {duplicateKeyUnweightedSample, 5.0 / 20.0}, + {duplicateKeyWeightedSample, 84.0 / 210.0}, // (9+10+...+15)/(1+2+...+20) + {sameKeySample, 1}, + } + + randSource := rand.New(rand.NewSource(2022)) + for i, test := range testCases { + weightedFinder := NewWeightedFinder(timeutil.Now(), randSource) + weightedFinder.samples = test.samples + popularKeyFrequency := weightedFinder.PopularKeyFrequency() + assert.True(t, math.Abs(test.expectedPopularKeyFrequency-popularKeyFrequency) < eps, + "%d: expected popular key frequency %f, got %f", + i, test.expectedPopularKeyFrequency, popularKeyFrequency) + } +}