Skip to content

Commit

Permalink
split: Redesign the load-based splitter to be consistent with new reb…
Browse files Browse the repository at this point in the history
…alancing signals.

Fixes: cockroachdb#90574

In the current load splitter, we find the split key that best balances
the QPS of the left and right sides. As a result, each request is
unweighted, since one request contributes one to the QPS. In particular,
the load splitter does not differentiate between what kinds of requests
they are, how heavy the request is, and what resources these requests
consume, which can result in scenarios where QPS is balanced but one
side has a lot more work due to a few heavy requests. Moreover, the
current load splitter treats requests that contain a split key as
“contained”. Optimizing for QPS, contained requests are bad since
splitting at a point in a contained request will not help lower the QPS
of either side. However, optimizing for other signals like CPU,
splitting at a point in a contained request is great as each side will
get part of the work of processing that request. This motivates a
redesign of the load splitter, one that enables recording weighted
requests and considers contained requests in the weight balancing for
splitting.

In this PR, we redesign the load-based splitter with the following
interface:
1. Record a point key “start” or span “[start, end)” with a weight “w”
at a specific time “ts”, where “w” is some measure of load recorded for
a span e.g. Record(ts, start, w) or Record(ts, [start, end), w)
2. Find a split key such that the load (i.e. total weight) on the
resulting split ranges would be as equal as possible according to the
recorded loads above e.g. Key()

To make the current load-based splitter (Finder) weighted, we make the
following modifications:
1. Instead of using reservoir sampling, we use weighted reservoir
sampling (a simplified version of A-Chao)
2. Remove the contained counter
3. Increment the left and right counters by the weight of the request
rather than just 1
4. Treat a weighted range request ([start, end), w) into two weighted
point requests (start, w/2) and (end, w/2)

For more details, see the design doc:
https://docs.google.com/document/d/1bdSxucz-xFzwnxL3fFXNZsRc9Vsht0oO0XuZrz5Iw84/edit#bookmark=id.xjc41tm3jx3x.

Release note (ops change): The load-based splitter has been redesigned
to be more consistent with CPU-based rebalancing rather than QPS-based
rebalancing to improve range splits.
  • Loading branch information
KaiSun314 committed Dec 22, 2022
1 parent f9d7473 commit 1195672
Show file tree
Hide file tree
Showing 13 changed files with 948 additions and 142 deletions.
1 change: 1 addition & 0 deletions pkg/cli/testdata/explain-bundle/bundle/env.sql
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
-- kv.closed_timestamp.side_transport_interval = 200ms (the interval at which the closed-timestamp side-transport attempts to advance each range's closed timestamp; set to 0 to disable the side-transport)
-- kv.closed_timestamp.target_duration = 3s (if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration)
-- kv.concurrency.optimistic_eval_limited_scans.enabled = true (when true, limited scans are optimistically evaluated in the sense of not checking for conflicting latches or locks up front for the full key range of the scan, and instead subsequently checking for conflicts only over the key range that was read)
-- kv.deprecated_lb_split_finder.enabled = false (if enabled, use the deprecated finder for load-based splitting)
-- kv.dist_sender.concurrency_limit = 2048 (maximum number of asynchronous send requests)
-- kv.follower_read.target_multiple = 3 (if above 1, encourages the distsender to perform a read against the closest replica if a request is older than kv.closed_timestamp.target_duration * (1 + kv.closed_timestamp.close_fraction * this) less a clock uncertainty interval. This value also is used to create follower_timestamp().)
-- kv.gc.intent_age_threshold = 2h0m0s (intents older than this threshold will be resolved when encountered by the GC queue)
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ go_library(
"@io_etcd_go_raft_v3//tracker",
"@io_opentelemetry_go_otel//attribute",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_x_exp//rand",
"@org_golang_x_time//rate",
],
)
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/asim/state/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_library(
"@com_github_google_btree//:btree",
"@io_etcd_go_raft_v3//:raft",
"@io_etcd_go_raft_v3//tracker",
"@org_golang_x_exp//rand",
],
)

Expand Down
11 changes: 3 additions & 8 deletions pkg/kv/kvserver/asim/state/split_decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ package state

import (
"context"
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"golang.org/x/exp/rand"
)

// LoadSplitter provides an abstraction for load based splitting. It records
Expand Down Expand Up @@ -61,14 +61,9 @@ func NewSplitDecider(
}

func (s *SplitDecider) newDecider() *split.Decider {
rand := rand.New(rand.NewSource(s.seed))

intN := func(n int) int {
return rand.Intn(n)
}

rand := rand.New(rand.NewSource(uint64(s.seed)))
decider := &split.Decider{}
split.Init(decider, intN, s.qpsThreshold, s.qpsRetention, &split.LoadSplitterMetrics{
split.Init(decider, nil, rand, s.qpsThreshold, s.qpsRetention, &split.LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package kvserver
import (
"bytes"
"context"
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
Expand All @@ -34,6 +33,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"go.etcd.io/raft/v3"
"golang.org/x/exp/rand"
)

const (
Expand Down Expand Up @@ -95,7 +95,8 @@ func newUnloadedReplica(
r.mu.stateLoader = stateloader.Make(desc.RangeID)
r.mu.quiescent = true
r.mu.conf = store.cfg.DefaultSpanConfig
split.Init(&r.loadBasedSplitter, rand.Intn, func() float64 {
randSource := rand.New(rand.NewSource(2022))
split.Init(&r.loadBasedSplitter, store.cfg.Settings, randSource, func() float64 {
return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV))
}, func() time.Duration {
return kvserverbase.SplitByLoadMergeDelay.Get(&store.cfg.Settings.SV)
Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/split/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ go_library(
name = "split",
srcs = [
"decider.go",
"finder.go",
"deprecated_finder.go",
"weighted_finder.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split",
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/syncutil",
Expand All @@ -24,8 +27,9 @@ go_test(
size = "small",
srcs = [
"decider_test.go",
"finder_test.go",
"deprecated_finder_test.go",
"load_based_splitter_test.go",
"weighted_finder_test.go",
],
args = ["-test.timeout=55s"],
embed = [":split"],
Expand Down
65 changes: 53 additions & 12 deletions pkg/kv/kvserver/split/decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,54 @@ package split

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"golang.org/x/exp/rand"
)

const minSplitSuggestionInterval = time.Minute
const minNoSplitKeyLoggingMetricsInterval = time.Minute
const minQueriesPerSecondSampleDuration = time.Second

type LoadBasedSplitter interface {
// Record informs the LoadBasedSplitter about where the span lies with regard
// to the keys in the samples.
Record(span roachpb.Span, weight float64)

// Key finds an appropriate split point from the sampled candidate split
// keys. Returns a nil key if no appropriate key was found.
Key() roachpb.Key

// Ready checks if the LoadBasedSplitter has been initialized with a
// sufficient sample duration.
Ready(nowTime time.Time) bool

// NoSplitKeyCauseLogMsg returns a log message containing information on the
// number of samples that don't pass each split key requirement if not all
// samples are invalid due to insufficient counters, otherwise returns an
// empty string.
NoSplitKeyCauseLogMsg() string

// PopularKeyFrequency returns the percentage that the most popular key
// appears in the sampled candidate split keys.
PopularKeyFrequency() float64
}

var enableDeprecatedLBSplitFinder = settings.RegisterBoolSetting(
settings.TenantWritable,
"kv.deprecated_lb_split_finder.enabled",
"if enabled, use the deprecated finder for load-based splitting",
true,
)

// A Decider collects measurements about the activity (measured in qps) on a
// Replica and, assuming that qps thresholds are exceeded, tries to determine a
// split key that would approximately result in halving the load on each of the
Expand Down Expand Up @@ -63,7 +98,8 @@ type LoadSplitterMetrics struct {
// incoming requests to find potential split keys and checks if sampled
// candidate split keys satisfy certain requirements.
type Decider struct {
intn func(n int) int // supplied to Init
st *cluster.Settings // supplied to Init
randSource *rand.Rand // supplied to Init
qpsThreshold func() float64 // supplied to Init
qpsRetention func() time.Duration // supplied to Init
loadSplitterMetrics *LoadSplitterMetrics // supplied to Init
Expand All @@ -80,8 +116,8 @@ type Decider struct {
maxQPS maxQPSTracker

// Fields tracking split key suggestions.
splitFinder *Finder // populated when engaged or decided
lastSplitSuggestion time.Time // last stipulation to client to carry out split
splitFinder LoadBasedSplitter // populated when engaged or decided and kv.deprecated_lb_split_finder.enabled is false
lastSplitSuggestion time.Time // last stipulation to client to carry out split

// Fields tracking logging / metrics around load-based splitter split key.
lastNoSplitKeyLoggingMetrics time.Time
Expand All @@ -94,12 +130,14 @@ type Decider struct {
// may exist in the system at any given point in time.
func Init(
lbs *Decider,
intn func(n int) int,
st *cluster.Settings,
randSource *rand.Rand,
qpsThreshold func() float64,
qpsRetention func() time.Duration,
loadSplitterMetrics *LoadSplitterMetrics,
) {
lbs.intn = intn
lbs.st = st
lbs.randSource = randSource
lbs.qpsThreshold = qpsThreshold
lbs.qpsRetention = qpsRetention
lbs.loadSplitterMetrics = loadSplitterMetrics
Expand Down Expand Up @@ -147,7 +185,11 @@ func (d *Decider) recordLocked(
// to be used.
if d.mu.lastQPS >= d.qpsThreshold() {
if d.mu.splitFinder == nil {
d.mu.splitFinder = NewFinder(now)
if d.st == nil || enableDeprecatedLBSplitFinder.Get(&d.st.SV) {
d.mu.splitFinder = NewDeprecatedFinder(now, d.randSource)
} else {
d.mu.splitFinder = NewWeightedFinder(now, d.randSource)
}
}
} else {
d.mu.splitFinder = nil
Expand All @@ -157,7 +199,7 @@ func (d *Decider) recordLocked(
if d.mu.splitFinder != nil && n != 0 {
s := span()
if s.Key != nil {
d.mu.splitFinder.Record(span(), d.intn)
d.mu.splitFinder.Record(span(), 1)
}
if d.mu.splitFinder.Ready(now) {
if d.mu.splitFinder.Key() != nil {
Expand All @@ -168,16 +210,15 @@ func (d *Decider) recordLocked(
} else {
if now.Sub(d.mu.lastNoSplitKeyLoggingMetrics) > minNoSplitKeyLoggingMetricsInterval {
d.mu.lastNoSplitKeyLoggingMetrics = now
insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained := d.mu.splitFinder.NoSplitKeyCause()
if insufficientCounters < splitKeySampleSize {
noSplitKeyCauseLogMsg := d.mu.splitFinder.NoSplitKeyCauseLogMsg()
if noSplitKeyCauseLogMsg != "" {
popularKeyFrequency := d.mu.splitFinder.PopularKeyFrequency()
noSplitKeyCauseLogMsg += fmt.Sprintf(", most popular key occurs in %d%% of samples", int(popularKeyFrequency*100))
log.KvDistribution.Infof(ctx, "%s", noSplitKeyCauseLogMsg)
if popularKeyFrequency >= splitKeyThreshold {
d.loadSplitterMetrics.PopularKeyCount.Inc(1)
}
d.loadSplitterMetrics.NoSplitKeyCount.Inc(1)
log.KvDistribution.Infof(ctx,
"No split key found: insufficient counters = %d, imbalance = %d, too many contained = %d, imbalance and too many contained = %d, most popular key occurs in %d%% of samples",
insufficientCounters, imbalance, tooManyContained, imbalanceAndTooManyContained, int(popularKeyFrequency*100))
}
}
}
Expand Down
30 changes: 15 additions & 15 deletions pkg/kv/kvserver/split/decider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package split
import (
"context"
"math"
"math/rand"
"testing"
"time"

Expand All @@ -24,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
)

func ms(i int) time.Time {
Expand All @@ -37,10 +37,10 @@ func ms(i int) time.Time {
func TestDecider(t *testing.T) {
defer leaktest.AfterTest(t)()

intn := rand.New(rand.NewSource(12)).Intn
rand := rand.New(rand.NewSource(12))

var d Decider
Init(&d, intn, func() float64 { return 10.0 }, func() time.Duration { return 2 * time.Second }, &LoadSplitterMetrics{
Init(&d, nil, rand, func() float64 { return 10.0 }, func() time.Duration { return 2 * time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestDecider(t *testing.T) {
assert.Equal(t, ms(1200), d.mu.lastQPSRollover)
assertMaxQPS(1099, 0, false)

var nilFinder *Finder
var nilFinder *DeprecatedFinder

assert.Equal(t, nilFinder, d.mu.splitFinder)

Expand Down Expand Up @@ -166,7 +166,7 @@ func TestDecider(t *testing.T) {
}

// ... which we verify by looking at its samples directly.
for _, sample := range d.mu.splitFinder.samples {
for _, sample := range d.mu.splitFinder.(*DeprecatedFinder).samples {
assert.Equal(t, roachpb.Key("p"), sample.key)
}

Expand Down Expand Up @@ -196,10 +196,10 @@ func TestDecider(t *testing.T) {

func TestDecider_MaxQPS(t *testing.T) {
defer leaktest.AfterTest(t)()
intn := rand.New(rand.NewSource(11)).Intn
rand := rand.New(rand.NewSource(11))

var d Decider
Init(&d, intn, func() float64 { return 100.0 }, func() time.Duration { return 10 * time.Second }, &LoadSplitterMetrics{
Init(&d, nil, rand, func() float64 { return 100.0 }, func() time.Duration { return 10 * time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down Expand Up @@ -242,10 +242,10 @@ func TestDecider_MaxQPS(t *testing.T) {

func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) {
defer leaktest.AfterTest(t)()
intn := rand.New(rand.NewSource(11)).Intn
rand := rand.New(rand.NewSource(11))

var d Decider
Init(&d, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
Init(&d, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down Expand Up @@ -278,10 +278,10 @@ func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) {

func TestDeciderIgnoresEnsureSafeSplitKeyOnError(t *testing.T) {
defer leaktest.AfterTest(t)()
intn := rand.New(rand.NewSource(11)).Intn
rand := rand.New(rand.NewSource(11))

var d Decider
Init(&d, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
Init(&d, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down Expand Up @@ -409,11 +409,11 @@ func TestMaxQPSTracker(t *testing.T) {

func TestDeciderMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
intn := rand.New(rand.NewSource(11)).Intn
rand := rand.New(rand.NewSource(11))
timeStart := 1000

var dPopular Decider
Init(&dPopular, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
Init(&dPopular, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand All @@ -435,7 +435,7 @@ func TestDeciderMetrics(t *testing.T) {

// No split key, not popular key
var dNotPopular Decider
Init(&dNotPopular, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
Init(&dNotPopular, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand All @@ -455,7 +455,7 @@ func TestDeciderMetrics(t *testing.T) {

// No split key, all insufficient counters
var dAllInsufficientCounters Decider
Init(&dAllInsufficientCounters, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
Init(&dAllInsufficientCounters, nil, rand, func() float64 { return 1.0 }, func() time.Duration { return time.Second }, &LoadSplitterMetrics{
PopularKeyCount: metric.NewCounter(metric.Metadata{}),
NoSplitKeyCount: metric.NewCounter(metric.Metadata{}),
})
Expand Down
Loading

0 comments on commit 1195672

Please sign in to comment.