Skip to content

Commit

Permalink
sql: add ALTER RANGE x SPLIT syntax
Browse files Browse the repository at this point in the history
Fixes cockroachdb#55116

Extend the ALTER RANGE syntax to support splitting a range.
ALTER RANGE x SPLIT will split a range at the best available
load based key. The new command will not perform a split directly
but rather advise the load based split decider to ignore the QPS
threshold and perform a load based split based on the lastest
available information. The decider has a 10s timeout built in between
deciding to split and determing the split key, so the actual split will
occur approximately 10s after the commnad is run.

To support all of this we changed AdminSplitRequest and added a new parameter
LoadBased. When set this will trigger a load based split on the range
through the split_queue.

Release note(sql change): Enhance the ALTER RANGE syntax to
allow an operator to manually perform a load based split of
a range.
  • Loading branch information
lunevalex committed Jun 14, 2022
1 parent 999a127 commit 91930ed
Show file tree
Hide file tree
Showing 42 changed files with 657 additions and 75 deletions.
5 changes: 5 additions & 0 deletions docs/generated/sql/bnf/alter_range_split_load_stmt.bnf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
alter_range_split_load_stmt ::=
'ALTER' 'RANGE' 'SPLIT' 'FOR' select_stmt
| 'ALTER' 'RANGE' 'SPLIT' 'FOR' select_stmt 'WITH' 'EXPIRATION' a_expr
| 'ALTER' 'RANGE' a_expr 'SPLIT'
| 'ALTER' 'RANGE' a_expr 'SPLIT' 'WITH' 'EXPIRATION' a_expr
1 change: 1 addition & 0 deletions docs/generated/sql/bnf/alter_range_stmt.bnf
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
alter_range_stmt ::=
alter_zone_range_stmt
| alter_range_relocate_stmt
| alter_range_split_load_stmt
7 changes: 7 additions & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -1476,6 +1476,7 @@ alter_database_stmt ::=
alter_range_stmt ::=
alter_zone_range_stmt
| alter_range_relocate_stmt
| alter_range_split_load_stmt

alter_partition_stmt ::=
alter_zone_partition_stmt
Expand Down Expand Up @@ -2032,6 +2033,12 @@ alter_range_relocate_stmt ::=
| 'ALTER' 'RANGE' relocate_kw relocate_subject_nonlease 'FROM' a_expr 'TO' a_expr 'FOR' select_stmt
| 'ALTER' 'RANGE' a_expr relocate_kw relocate_subject_nonlease 'FROM' a_expr 'TO' a_expr

alter_range_split_load_stmt ::=
'ALTER' 'RANGE' 'SPLIT' 'FOR' select_stmt
| 'ALTER' 'RANGE' 'SPLIT' 'FOR' select_stmt 'WITH' 'EXPIRATION' a_expr
| 'ALTER' 'RANGE' a_expr 'SPLIT'
| 'ALTER' 'RANGE' a_expr 'SPLIT' 'WITH' 'EXPIRATION' a_expr

alter_zone_partition_stmt ::=
'ALTER' 'PARTITION' partition_name 'OF' 'TABLE' table_name set_zone_config
| 'ALTER' 'PARTITION' partition_name 'OF' 'INDEX' table_index_name set_zone_config
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/cliflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1246,7 +1246,7 @@ can also be specified (e.g. .25).`,

DemoWorkloadMaxQPS = FlagInfo{
Name: "workload-max-qps",
Description: "The maximum QPS when a workload is running.",
Description: "The maximum QPS when a workload is running.",
}

DemoNodeLocality = FlagInfo{
Expand Down
17 changes: 17 additions & 0 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,23 @@ func (b *Batch) adminSplit(
b.initResult(1, 0, notRaw, nil)
}

func (b *Batch) adminLoadBasedSplit(startKeyIn interface{}, expirationTime hlc.Timestamp) {
startKey, err := marshalKey(startKeyIn)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
req := &roachpb.AdminSplitRequest{
RequestHeader: roachpb.RequestHeader{
Key: startKey,
},
ExpirationTime: expirationTime,
LoadBased: true,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

func (b *Batch) adminUnsplit(splitKeyIn interface{}) {
splitKey, err := marshalKey(splitKeyIn)
if err != nil {
Expand Down
17 changes: 17 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,23 @@ func (db *DB) AdminSplit(
return getOneErr(db.Run(ctx, b), b)
}

// AdminLoadBasedSplit splits the range at the load based key as determined by the
// range stats.
//
// expirationTime is the timestamp when the split expires and is eligible for
// automatic merging by the merge queue. To specify that a split should
// immediately be eligible for automatic merging, set expirationTime to
// hlc.Timestamp{} (I.E. the zero timestamp). To specify that a split should
// never be eligible, set expirationTime to hlc.MaxTimestamp.
//
func (db *DB) AdminLoadBasedSplit(
ctx context.Context, startKey interface{}, expirationTime hlc.Timestamp,
) error {
b := &Batch{}
b.adminLoadBasedSplit(startKey, expirationTime)
return getOneErr(db.Run(ctx, b), b)
}

// AdminScatter scatters the range containing the specified key.
//
// maxSize greater than non-zero specified a maximum size of the range above
Expand Down
76 changes: 76 additions & 0 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
Expand Down Expand Up @@ -3582,3 +3583,78 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
repl = store.LookupReplica(roachpb.RKey(splitKey))
require.Equal(t, descKey, repl.Desc().StartKey.AsRawKey())
}

// TestAlterRangeSplit verifies that the ALTER_RANGE SPLIT commands work as expected.
func TestAlterRangeSplit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

settings := cluster.MakeTestingClusterSettings()
sv := &settings.SV
ctx := context.Background()
const numStores = 3
tc := testcluster.StartTestCluster(t, numStores,
base.TestClusterArgs{
ReplicationMode: base.ReplicationAuto,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
// Set the threshold to 1 second instead of the default 10, to speed
// up test.
LoadBasedSplitRecordDurationThreshold: 500 * time.Millisecond,
},
},
},
},
)
defer tc.Stopper().Stop(ctx)
require.NoError(t, tc.WaitForFullReplication())

// We set the threshold artificially high, so we know that the automatic
// process never splits the range.
kvserver.SplitByLoadQPSThreshold.Override(ctx, sv, 10000)

_, rhsDesc := tc.SplitRangeOrFatal(t, bootstrap.TestingUserTableDataMin())
// We start with having the range under test on (1,2,3).
db := tc.ServerConn(0)

startKey := rhsDesc.StartKey
endKey := rhsDesc.EndKey
// Put a consistent workload on the range, so it can pick a split point.
if err := tc.Stopper().RunAsyncTask(ctx, "workload", func(_ context.Context) {
store := tc.GetFirstStoreFromServer(t, 0)
key := rhsDesc.StartKey
keys := make([]roachpb.RKey, 10)
for i := 0; i < 10; i++ {
keys[i] = key
key = key.Next()
}
for {
select {
case <-tc.Stopper().ShouldQuiesce():
return // All done.
default:
// Keep going.
}
if err := store.DB().Put(ctx, keys[rand.Intn(10)], "foo"); err != nil {
t.Fatal(err)
}
key = key.Next()
time.Sleep(10 * time.Millisecond)
}
}); err != nil {
t.Fatal(err)
}

_, err := db.Exec("ALTER RANGE $1 SPLIT", rhsDesc.RangeID)
require.NoError(t, err)

testutils.SucceedsSoon(t, func() error {
repl := tc.GetFirstStoreFromServer(t, 0).LookupReplica(startKey)
require.NotNil(t, repl)
if repl.Desc().EndKey.Equal(endKey) {
return errors.Errorf("No split yet, since the end key is the same")
}
return nil
})
}
19 changes: 17 additions & 2 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,11 @@ var sendSnapshotTimeout = envutil.EnvOrDefaultDuration(
func (r *Replica) AdminSplit(
ctx context.Context, args roachpb.AdminSplitRequest, reason string,
) (reply roachpb.AdminSplitResponse, _ *roachpb.Error) {
if len(args.SplitKey) == 0 {
return roachpb.AdminSplitResponse{}, roachpb.NewErrorf("cannot split range with no key provided")
if len(args.SplitKey) == 0 && !args.LoadBased {
return roachpb.AdminSplitResponse{}, roachpb.NewErrorf("cannot split range with no key provided or load based flag set")
}
if len(args.SplitKey) != 0 && args.LoadBased {
return roachpb.AdminSplitResponse{}, roachpb.NewErrorf("cannot do a load based split with the provided key")
}

err := r.executeAdminCommandWithDescriptor(ctx, func(desc *roachpb.RangeDescriptor) error {
Expand Down Expand Up @@ -324,6 +327,18 @@ func (r *Replica) adminSplitWithDescriptor(
return reply, err
}

// A load based split is forced through the split queue, so this just sets
// the flags to force it to happen, rather than actually perform it.
// To perform an effective load based split we need to collect data on the
// keys to use and that data is only collected once we are ready to split.
// Thus forcing the loadBasedSplitter to start collecting data, will quickly
// force a split regardless of QPS threshold, assuming there is still
// traffic to this range.
if args.LoadBased {
r.loadBasedSplitter.ForceSplitDecision()
return reply, nil
}

// Determine split key if not provided with args. This scan is
// allowed to be relatively slow because admin commands don't block
// other commands.
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ func newUnloadedReplica(
return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV))
}, func() time.Duration {
return kvserverbase.SplitByLoadMergeDelay.Get(&store.cfg.Settings.SV)
}, func() time.Duration {
if store.cfg.TestingKnobs.LoadBasedSplitRecordDurationThreshold != 0 {
return store.cfg.TestingKnobs.LoadBasedSplitRecordDurationThreshold
}
return split.DefaultRecordDurationThreshold
})
r.mu.proposals = map[kvserverbase.CmdIDKey]*ProposalData{}
r.mu.checksums = map[uuid.UUID]replicaChecksum{}
Expand Down
29 changes: 24 additions & 5 deletions pkg/kv/kvserver/split/decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ const minQueriesPerSecondSampleDuration = time.Second
// have consistently remained below a certain QPS threshold for a sufficiently
// long period of time.
type Decider struct {
intn func(n int) int // supplied to Init
qpsThreshold func() float64 // supplied to Init
qpsRetention func() time.Duration // supplied to Init
intn func(n int) int // supplied to Init
qpsThreshold func() float64 // supplied to Init
qpsRetention func() time.Duration // supplied to Init
recordDurationThreshold func() time.Duration // supplied to Init

mu struct {
syncutil.Mutex
Expand All @@ -67,6 +68,11 @@ type Decider struct {
// Fields tracking split key suggestions.
splitFinder *Finder // populated when engaged or decided
lastSplitSuggestion time.Time // last stipulation to client to carry out split

// Setting to force a load based split. When forceLoadSplit is set to true
// collector will ignore the QPS threshold and force a split once enough data
// is collected.
forceSplitDecision bool
}
}

Expand All @@ -79,10 +85,22 @@ func Init(
intn func(n int) int,
qpsThreshold func() float64,
qpsRetention func() time.Duration,
recordDurationThreshold func() time.Duration,
) {
lbs.intn = intn
lbs.qpsThreshold = qpsThreshold
lbs.qpsRetention = qpsRetention
lbs.recordDurationThreshold = recordDurationThreshold
}

// ForceSplitDecision forces the Decider to enable tracking of individual keys
// to split on, which will result in a split happening once enough
// data is collected. .
func (d *Decider) ForceSplitDecision() {
d.mu.Lock()
defer d.mu.Unlock()

d.mu.forceSplitDecision = true
}

// Record notifies the Decider that 'n' operations are being carried out which
Expand Down Expand Up @@ -123,9 +141,9 @@ func (d *Decider) recordLocked(now time.Time, n int, span func() roachpb.Span) b
// begin to Record requests so it can find a split point. If a
// splitFinder already exists, we check if a split point is ready
// to be used.
if d.mu.lastQPS >= d.qpsThreshold() {
if d.mu.lastQPS >= d.qpsThreshold() || d.mu.forceSplitDecision {
if d.mu.splitFinder == nil {
d.mu.splitFinder = NewFinder(now)
d.mu.splitFinder = NewFinder(now, d.recordDurationThreshold())
}
} else {
d.mu.splitFinder = nil
Expand Down Expand Up @@ -240,6 +258,7 @@ func (d *Decider) Reset(now time.Time) {
d.mu.maxQPS.reset(now, d.qpsRetention())
d.mu.splitFinder = nil
d.mu.lastSplitSuggestion = time.Time{}
d.mu.forceSplitDecision = false
}

// maxQPSTracker collects a series of queries-per-second measurement samples and
Expand Down
16 changes: 12 additions & 4 deletions pkg/kv/kvserver/split/decider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ func TestDecider(t *testing.T) {
intn := rand.New(rand.NewSource(12)).Intn

var d Decider
Init(&d, intn, func() float64 { return 10.0 }, func() time.Duration { return 2 * time.Second })
Init(&d, intn, func() float64 { return 10.0 },
func() time.Duration { return 2 * time.Second },
func() time.Duration { return DefaultRecordDurationThreshold })

op := func(s string) func() roachpb.Span {
return func() roachpb.Span { return roachpb.Span{Key: roachpb.Key(s)} }
Expand Down Expand Up @@ -194,7 +196,9 @@ func TestDecider_MaxQPS(t *testing.T) {
intn := rand.New(rand.NewSource(11)).Intn

var d Decider
Init(&d, intn, func() float64 { return 100.0 }, func() time.Duration { return 10 * time.Second })
Init(&d, intn, func() float64 { return 100.0 },
func() time.Duration { return 10 * time.Second },
func() time.Duration { return DefaultRecordDurationThreshold })

assertMaxQPS := func(i int, expMaxQPS float64, expOK bool) {
t.Helper()
Expand Down Expand Up @@ -237,7 +241,9 @@ func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) {
intn := rand.New(rand.NewSource(11)).Intn

var d Decider
Init(&d, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second })
Init(&d, intn, func() float64 { return 1.0 },
func() time.Duration { return time.Second },
func() time.Duration { return DefaultRecordDurationThreshold })

baseKey := keys.SystemSQLCodec.TablePrefix(51)
for i := 0; i < 4; i++ {
Expand Down Expand Up @@ -270,7 +276,9 @@ func TestDeciderIgnoresEnsureSafeSplitKeyOnError(t *testing.T) {
intn := rand.New(rand.NewSource(11)).Intn

var d Decider
Init(&d, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second })
Init(&d, intn, func() float64 { return 1.0 },
func() time.Duration { return time.Second },
func() time.Duration { return DefaultRecordDurationThreshold })

baseKey := keys.SystemSQLCodec.TablePrefix(51)
for i := 0; i < 4; i++ {
Expand Down
26 changes: 14 additions & 12 deletions pkg/kv/kvserver/split/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ import (
// key as split key, and provide hint to scatter the replicas.

const (
// RecordDurationThreshold is the minimum duration of time the split finder
// DefaultRecordDurationThreshold is the minimum duration of time the split finder
// will record a range for, before being ready for a split.
RecordDurationThreshold = 10 * time.Second // 10s
splitKeySampleSize = 20 // size of split key sample
splitKeyMinCounter = 100 // min aggregate counters before consideration
splitKeyThreshold = 0.25 // 25% difference between left/right counters
splitKeyContainedThreshold = 0.50 // too many spanning queries over split point
DefaultRecordDurationThreshold = 10 * time.Second // 10s
splitKeySampleSize = 20 // size of split key sample
splitKeyMinCounter = 100 // min aggregate counters before consideration
splitKeyThreshold = 0.25 // 25% difference between left/right counters
splitKeyContainedThreshold = 0.50 // too many spanning queries over split point
)

type sample struct {
Expand All @@ -60,22 +60,24 @@ type sample struct {
// Finder is a structure that is used to determine the split point
// using the Reservoir Sampling method.
type Finder struct {
startTime time.Time
samples [splitKeySampleSize]sample
count int
startTime time.Time
samples [splitKeySampleSize]sample
count int
recordDurationThreshold time.Duration
}

// NewFinder initiates a Finder with the given time.
func NewFinder(startTime time.Time) *Finder {
func NewFinder(startTime time.Time, recordDurationThreshold time.Duration) *Finder {
return &Finder{
startTime: startTime,
startTime: startTime,
recordDurationThreshold: recordDurationThreshold,
}
}

// Ready checks if the Finder has been initialized with a sufficient
// sample duration.
func (f *Finder) Ready(nowTime time.Time) bool {
return nowTime.Sub(f.startTime) > RecordDurationThreshold
return nowTime.Sub(f.startTime) > f.recordDurationThreshold
}

// Record informs the Finder about where the span lies with
Expand Down
Loading

0 comments on commit 91930ed

Please sign in to comment.