From 2db4d4bcfb6f9f82edac47fad614b55728a3d99e Mon Sep 17 00:00:00 2001 From: Alex Lunev Date: Wed, 16 Mar 2022 12:58:11 -0700 Subject: [PATCH] sql: add ALTER RANGE x SPLIT syntax Fixes #55116 Extend the ALTER RANGE syntax to support splitting a range. ALTER RANGE x SPLIT will split a range at the best available load based key. The new command will not perform a split directly but rather advise the load based split decider to ignore the QPS threshold and perform a load based split based on the lastest available information. The decider has a 10s timeout built in between deciding to split and determing the split key, so the actual split will occur approximately 10s after the commnad is run. To support all of this we changed AdminSplitRequest and added a new parameter LoadBased. When set this will trigger a load based split on the range through the split_queue. Release note(sql change): Enhance the ALTER RANGE syntax to allow an operator to manually perform a load based split of a range. --- docs/generated/sql/bnf/BUILD.bazel | 1 + .../sql/bnf/alter_range_split_load_stmt.bnf | 5 + docs/generated/sql/bnf/alter_range_stmt.bnf | 1 + docs/generated/sql/bnf/stmt_block.bnf | 7 ++ pkg/gen/bnf.bzl | 1 + pkg/gen/diagrams.bzl | 1 + pkg/gen/docs.bzl | 1 + pkg/kv/batch.go | 17 ++++ pkg/kv/db.go | 18 ++++ pkg/kv/kvserver/client_split_test.go | 76 +++++++++++++++ pkg/kv/kvserver/replica_command.go | 19 +++- pkg/kv/kvserver/replica_init.go | 5 + pkg/kv/kvserver/split/decider.go | 29 +++++- pkg/kv/kvserver/split/decider_test.go | 16 ++- pkg/kv/kvserver/split/finder.go | 26 ++--- pkg/kv/kvserver/split/finder_test.go | 4 +- pkg/kv/kvserver/testing_knobs.go | 3 + pkg/roachpb/api.proto | 4 + pkg/sql/BUILD.bazel | 1 + pkg/sql/catalog/colinfo/result_columns.go | 8 ++ pkg/sql/distsql_spec_exec_factory.go | 6 ++ pkg/sql/opt/exec/execbuilder/relational.go | 3 + pkg/sql/opt/exec/execbuilder/statement.go | 17 ++++ pkg/sql/opt/exec/explain/emit.go | 5 + pkg/sql/opt/exec/explain/plan_gist_factory.go | 2 +- pkg/sql/opt/exec/explain/result_columns.go | 3 + pkg/sql/opt/exec/factory.opt | 6 ++ pkg/sql/opt/memo/logical_props_builder.go | 6 ++ pkg/sql/opt/ops/statement.opt | 22 +++++ pkg/sql/opt/optbuilder/alter_range.go | 42 +++++++- pkg/sql/opt/optbuilder/alter_table.go | 44 +++++---- pkg/sql/opt/optbuilder/builder.go | 5 +- pkg/sql/opt/optbuilder/scope.go | 2 + pkg/sql/opt/optbuilder/testdata/alter_range | 88 +++++++++++++---- pkg/sql/opt/ordering/ordering.go | 5 + pkg/sql/opt/ordering/statement.go | 9 ++ pkg/sql/opt/xform/physical_props.go | 2 + pkg/sql/opt_exec_factory.go | 19 ++++ pkg/sql/parser/sql.y | 36 +++++++ pkg/sql/parser/testdata/alter_range | 32 ++++++ pkg/sql/plan_columns.go | 2 + pkg/sql/sem/tree/alter_range.go | 19 ++++ pkg/sql/sem/tree/stmt.go | 14 ++- pkg/sql/split_range.go | 97 +++++++++++++++++++ pkg/sql/walk.go | 4 + 45 files changed, 660 insertions(+), 73 deletions(-) create mode 100644 docs/generated/sql/bnf/alter_range_split_load_stmt.bnf create mode 100644 pkg/sql/split_range.go diff --git a/docs/generated/sql/bnf/BUILD.bazel b/docs/generated/sql/bnf/BUILD.bazel index 0cfadf3e7841..27f8da9e1c0c 100644 --- a/docs/generated/sql/bnf/BUILD.bazel +++ b/docs/generated/sql/bnf/BUILD.bazel @@ -36,6 +36,7 @@ FILES = [ "alter_primary_key", "alter_range_relocate_stmt", "alter_range_stmt", + "alter_range_split_load_stmt", "alter_rename_view_stmt", "alter_role_stmt", "alter_scatter_index_stmt", diff --git a/docs/generated/sql/bnf/alter_range_split_load_stmt.bnf b/docs/generated/sql/bnf/alter_range_split_load_stmt.bnf new file mode 100644 index 000000000000..510c8d5cd801 --- /dev/null +++ b/docs/generated/sql/bnf/alter_range_split_load_stmt.bnf @@ -0,0 +1,5 @@ +alter_range_split_load_stmt ::= + 'ALTER' 'RANGE' 'SPLIT' 'FOR' select_stmt + | 'ALTER' 'RANGE' 'SPLIT' 'FOR' select_stmt 'WITH' 'EXPIRATION' a_expr + | 'ALTER' 'RANGE' a_expr 'SPLIT' + | 'ALTER' 'RANGE' a_expr 'SPLIT' 'WITH' 'EXPIRATION' a_expr diff --git a/docs/generated/sql/bnf/alter_range_stmt.bnf b/docs/generated/sql/bnf/alter_range_stmt.bnf index f8a65e7db6c0..777a54fb8486 100644 --- a/docs/generated/sql/bnf/alter_range_stmt.bnf +++ b/docs/generated/sql/bnf/alter_range_stmt.bnf @@ -1,3 +1,4 @@ alter_range_stmt ::= alter_zone_range_stmt | alter_range_relocate_stmt + | alter_range_split_load_stmt diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index 95a4e5ff5646..6714e4a1c2a2 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -1568,6 +1568,7 @@ alter_database_stmt ::= alter_range_stmt ::= alter_zone_range_stmt | alter_range_relocate_stmt + | alter_range_split_load_stmt alter_partition_stmt ::= alter_zone_partition_stmt @@ -2168,6 +2169,12 @@ alter_range_relocate_stmt ::= | 'ALTER' 'RANGE' relocate_kw relocate_subject_nonlease 'FROM' a_expr 'TO' a_expr 'FOR' select_stmt | 'ALTER' 'RANGE' a_expr relocate_kw relocate_subject_nonlease 'FROM' a_expr 'TO' a_expr +alter_range_split_load_stmt ::= + 'ALTER' 'RANGE' 'SPLIT' 'FOR' select_stmt + | 'ALTER' 'RANGE' 'SPLIT' 'FOR' select_stmt 'WITH' 'EXPIRATION' a_expr + | 'ALTER' 'RANGE' a_expr 'SPLIT' + | 'ALTER' 'RANGE' a_expr 'SPLIT' 'WITH' 'EXPIRATION' a_expr + alter_zone_partition_stmt ::= 'ALTER' 'PARTITION' partition_name 'OF' 'TABLE' table_name set_zone_config | 'ALTER' 'PARTITION' partition_name 'OF' 'INDEX' table_index_name set_zone_config diff --git a/pkg/gen/bnf.bzl b/pkg/gen/bnf.bzl index aa5fa960c8f4..8ef832c9e4d8 100644 --- a/pkg/gen/bnf.bzl +++ b/pkg/gen/bnf.bzl @@ -35,6 +35,7 @@ BNF_SRCS = [ "//docs/generated/sql/bnf:alter_partition_stmt.bnf", "//docs/generated/sql/bnf:alter_primary_key.bnf", "//docs/generated/sql/bnf:alter_range_relocate_stmt.bnf", + "//docs/generated/sql/bnf:alter_range_split_load_stmt.bnf", "//docs/generated/sql/bnf:alter_range_stmt.bnf", "//docs/generated/sql/bnf:alter_rename_view_stmt.bnf", "//docs/generated/sql/bnf:alter_role_stmt.bnf", diff --git a/pkg/gen/diagrams.bzl b/pkg/gen/diagrams.bzl index c52c7bb6e5d2..d13472515a3f 100644 --- a/pkg/gen/diagrams.bzl +++ b/pkg/gen/diagrams.bzl @@ -37,6 +37,7 @@ DIAGRAMS_SRCS = [ "//docs/generated/sql/bnf:alter_primary_key.html", "//docs/generated/sql/bnf:alter_range.html", "//docs/generated/sql/bnf:alter_range_relocate.html", + "//docs/generated/sql/bnf:alter_range_split_load.html", "//docs/generated/sql/bnf:alter_rename_view.html", "//docs/generated/sql/bnf:alter_role.html", "//docs/generated/sql/bnf:alter_scatter.html", diff --git a/pkg/gen/docs.bzl b/pkg/gen/docs.bzl index 6308262b57a3..1dd286944a4f 100644 --- a/pkg/gen/docs.bzl +++ b/pkg/gen/docs.bzl @@ -47,6 +47,7 @@ DOCS_SRCS = [ "//docs/generated/sql/bnf:alter_partition_stmt.bnf", "//docs/generated/sql/bnf:alter_primary_key.bnf", "//docs/generated/sql/bnf:alter_range_relocate_stmt.bnf", + "//docs/generated/sql/bnf:alter_range_split_load_stmt.bnf", "//docs/generated/sql/bnf:alter_range_stmt.bnf", "//docs/generated/sql/bnf:alter_rename_view_stmt.bnf", "//docs/generated/sql/bnf:alter_role_stmt.bnf", diff --git a/pkg/kv/batch.go b/pkg/kv/batch.go index 06c21ca2a1d5..41731ba1dd43 100644 --- a/pkg/kv/batch.go +++ b/pkg/kv/batch.go @@ -719,6 +719,23 @@ func (b *Batch) adminSplit( b.initResult(1, 0, notRaw, nil) } +func (b *Batch) adminLoadBasedSplit(startKeyIn interface{}, expirationTime hlc.Timestamp) { + startKey, err := marshalKey(startKeyIn) + if err != nil { + b.initResult(0, 0, notRaw, err) + return + } + req := &roachpb.AdminSplitRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: startKey, + }, + ExpirationTime: expirationTime, + LoadBased: true, + } + b.appendReqs(req) + b.initResult(1, 0, notRaw, nil) +} + func (b *Batch) adminUnsplit(splitKeyIn interface{}) { splitKey, err := marshalKey(splitKeyIn) if err != nil { diff --git a/pkg/kv/db.go b/pkg/kv/db.go index c45d71fa99bd..490b25140710 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -597,6 +597,24 @@ func (db *DB) AdminSplit( return getOneErr(db.Run(ctx, b), b) } +// AdminLoadBasedSplit splits the range at the load based key as determined +// a 10s snapshot of observed query load. +// +// startKey is the start key of the range we want to target for the load split.. +// +// expirationTime is the timestamp when the split expires and is eligible for +// automatic merging by the merge queue. To specify that a split should +// immediately be eligible for automatic merging, set expirationTime to +// hlc.Timestamp{} (I.E. the zero timestamp). To specify that a split should +// never be eligible, set expirationTime to hlc.MaxTimestamp. +func (db *DB) AdminLoadBasedSplit( + ctx context.Context, startKey interface{}, expirationTime hlc.Timestamp, +) error { + b := &Batch{} + b.adminLoadBasedSplit(startKey, expirationTime) + return getOneErr(db.Run(ctx, b), b) +} + // AdminScatter scatters the range containing the specified key. // // maxSize greater than non-zero specified a maximum size of the range above diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index b9148e87236f..070cb3c90a59 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -41,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" @@ -3649,3 +3650,78 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) { repl = store.LookupReplica(roachpb.RKey(splitKey)) require.Equal(t, descKey, repl.Desc().StartKey.AsRawKey()) } + +// TestAlterRangeSplit verifies that the ALTER_RANGE SPLIT commands work as expected. +func TestAlterRangeSplit(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + settings := cluster.MakeTestingClusterSettings() + sv := &settings.SV + ctx := context.Background() + const numStores = 3 + tc := testcluster.StartTestCluster(t, numStores, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationAuto, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + // Set the threshold to 1 second instead of the default 10, to speed + // up test. + LoadBasedSplitRecordDurationThreshold: 500 * time.Millisecond, + }, + }, + }, + }, + ) + defer tc.Stopper().Stop(ctx) + require.NoError(t, tc.WaitForFullReplication()) + + // We set the threshold artificially high, so we know that the automatic + // process never splits the range. + kvserver.SplitByLoadQPSThreshold.Override(ctx, sv, 10000) + + _, rhsDesc := tc.SplitRangeOrFatal(t, bootstrap.TestingUserTableDataMin()) + // We start with having the range under test on (1,2,3). + db := tc.ServerConn(0) + + startKey := rhsDesc.StartKey + endKey := rhsDesc.EndKey + // Put a consistent workload on the range, so it can pick a split point. + if err := tc.Stopper().RunAsyncTask(ctx, "workload", func(_ context.Context) { + store := tc.GetFirstStoreFromServer(t, 0) + key := rhsDesc.StartKey + keys := make([]roachpb.RKey, 10) + for i := 0; i < 10; i++ { + keys[i] = key + key = key.Next() + } + for { + select { + case <-tc.Stopper().ShouldQuiesce(): + return // All done. + default: + // Keep going. + } + if err := store.DB().Put(ctx, keys[rand.Intn(10)], "foo"); err != nil { + t.Fatal(err) + } + key = key.Next() + time.Sleep(10 * time.Millisecond) + } + }); err != nil { + t.Fatal(err) + } + + _, err := db.Exec("ALTER RANGE $1 SPLIT", rhsDesc.RangeID) + require.NoError(t, err) + + testutils.SucceedsSoon(t, func() error { + repl := tc.GetFirstStoreFromServer(t, 0).LookupReplica(startKey) + require.NotNil(t, repl) + if repl.Desc().EndKey.Equal(endKey) { + return errors.Errorf("No split yet, since the end key is the same") + } + return nil + }) +} diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index a1de1382349f..e0567960bcca 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -71,8 +71,11 @@ var sendSnapshotTimeout = envutil.EnvOrDefaultDuration( func (r *Replica) AdminSplit( ctx context.Context, args roachpb.AdminSplitRequest, reason string, ) (reply roachpb.AdminSplitResponse, _ *roachpb.Error) { - if len(args.SplitKey) == 0 { - return roachpb.AdminSplitResponse{}, roachpb.NewErrorf("cannot split range with no key provided") + if len(args.SplitKey) == 0 && !args.LoadBased { + return roachpb.AdminSplitResponse{}, roachpb.NewErrorf("cannot split range with no key provided or load based flag set") + } + if len(args.SplitKey) != 0 && args.LoadBased { + return roachpb.AdminSplitResponse{}, roachpb.NewErrorf("cannot do a load based split with the provided key") } err := r.executeAdminCommandWithDescriptor(ctx, func(desc *roachpb.RangeDescriptor) error { @@ -317,6 +320,18 @@ func (r *Replica) adminSplitWithDescriptor( return reply, err } + // A load based split is forced through the split queue, so this just sets + // the flags to force it to happen, rather than actually perform it. + // To perform an effective load based split we need to collect data on the + // keys to use and that data is only collected once we are ready to split. + // Thus forcing the loadBasedSplitter to start collecting data, will quickly + // force a split regardless of QPS threshold, assuming there is still + // traffic to this range. + if args.LoadBased { + r.loadBasedSplitter.ForceSplitDecision() + return reply, nil + } + // Determine split key if not provided with args. This scan is // allowed to be relatively slow because admin commands don't block // other commands. diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index c25f4fde2817..d9c7166ed181 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -98,6 +98,11 @@ func newUnloadedReplica( return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV)) }, func() time.Duration { return kvserverbase.SplitByLoadMergeDelay.Get(&store.cfg.Settings.SV) + }, func() time.Duration { + if store.cfg.TestingKnobs.LoadBasedSplitRecordDurationThreshold != 0 { + return store.cfg.TestingKnobs.LoadBasedSplitRecordDurationThreshold + } + return split.DefaultRecordDurationThreshold }) r.mu.proposals = map[kvserverbase.CmdIDKey]*ProposalData{} r.mu.checksums = map[uuid.UUID]*replicaChecksum{} diff --git a/pkg/kv/kvserver/split/decider.go b/pkg/kv/kvserver/split/decider.go index c29e316d2eee..c12f1baaab1d 100644 --- a/pkg/kv/kvserver/split/decider.go +++ b/pkg/kv/kvserver/split/decider.go @@ -49,9 +49,10 @@ const minQueriesPerSecondSampleDuration = time.Second // have consistently remained below a certain QPS threshold for a sufficiently // long period of time. type Decider struct { - intn func(n int) int // supplied to Init - qpsThreshold func() float64 // supplied to Init - qpsRetention func() time.Duration // supplied to Init + intn func(n int) int // supplied to Init + qpsThreshold func() float64 // supplied to Init + qpsRetention func() time.Duration // supplied to Init + recordDurationThreshold func() time.Duration // supplied to Init mu struct { syncutil.Mutex @@ -67,6 +68,11 @@ type Decider struct { // Fields tracking split key suggestions. splitFinder *Finder // populated when engaged or decided lastSplitSuggestion time.Time // last stipulation to client to carry out split + + // Setting to force a load based split. When forceLoadSplit is set to true + // collector will ignore the QPS threshold and force a split once enough data + // is collected. + forceSplitDecision bool } } @@ -79,10 +85,22 @@ func Init( intn func(n int) int, qpsThreshold func() float64, qpsRetention func() time.Duration, + recordDurationThreshold func() time.Duration, ) { lbs.intn = intn lbs.qpsThreshold = qpsThreshold lbs.qpsRetention = qpsRetention + lbs.recordDurationThreshold = recordDurationThreshold +} + +// ForceSplitDecision forces the Decider to enable tracking of individual keys +// to split on, which will result in a split happening once enough +// data is collected. +func (d *Decider) ForceSplitDecision() { + d.mu.Lock() + defer d.mu.Unlock() + + d.mu.forceSplitDecision = true } // Record notifies the Decider that 'n' operations are being carried out which @@ -123,9 +141,9 @@ func (d *Decider) recordLocked(now time.Time, n int, span func() roachpb.Span) b // begin to Record requests so it can find a split point. If a // splitFinder already exists, we check if a split point is ready // to be used. - if d.mu.lastQPS >= d.qpsThreshold() { + if d.mu.lastQPS >= d.qpsThreshold() || d.mu.forceSplitDecision { if d.mu.splitFinder == nil { - d.mu.splitFinder = NewFinder(now) + d.mu.splitFinder = NewFinder(now, d.recordDurationThreshold()) } } else { d.mu.splitFinder = nil @@ -240,6 +258,7 @@ func (d *Decider) Reset(now time.Time) { d.mu.maxQPS.reset(now, d.qpsRetention()) d.mu.splitFinder = nil d.mu.lastSplitSuggestion = time.Time{} + d.mu.forceSplitDecision = false } // maxQPSTracker collects a series of queries-per-second measurement samples and diff --git a/pkg/kv/kvserver/split/decider_test.go b/pkg/kv/kvserver/split/decider_test.go index 3ea1821c3734..ad2b9b07cee6 100644 --- a/pkg/kv/kvserver/split/decider_test.go +++ b/pkg/kv/kvserver/split/decider_test.go @@ -38,7 +38,9 @@ func TestDecider(t *testing.T) { intn := rand.New(rand.NewSource(12)).Intn var d Decider - Init(&d, intn, func() float64 { return 10.0 }, func() time.Duration { return 2 * time.Second }) + Init(&d, intn, func() float64 { return 10.0 }, + func() time.Duration { return 2 * time.Second }, + func() time.Duration { return DefaultRecordDurationThreshold }) op := func(s string) func() roachpb.Span { return func() roachpb.Span { return roachpb.Span{Key: roachpb.Key(s)} } @@ -194,7 +196,9 @@ func TestDecider_MaxQPS(t *testing.T) { intn := rand.New(rand.NewSource(11)).Intn var d Decider - Init(&d, intn, func() float64 { return 100.0 }, func() time.Duration { return 10 * time.Second }) + Init(&d, intn, func() float64 { return 100.0 }, + func() time.Duration { return 10 * time.Second }, + func() time.Duration { return DefaultRecordDurationThreshold }) assertMaxQPS := func(i int, expMaxQPS float64, expOK bool) { t.Helper() @@ -237,7 +241,9 @@ func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) { intn := rand.New(rand.NewSource(11)).Intn var d Decider - Init(&d, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }) + Init(&d, intn, func() float64 { return 1.0 }, + func() time.Duration { return time.Second }, + func() time.Duration { return DefaultRecordDurationThreshold }) baseKey := keys.SystemSQLCodec.TablePrefix(51) for i := 0; i < 4; i++ { @@ -270,7 +276,9 @@ func TestDeciderIgnoresEnsureSafeSplitKeyOnError(t *testing.T) { intn := rand.New(rand.NewSource(11)).Intn var d Decider - Init(&d, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second }) + Init(&d, intn, func() float64 { return 1.0 }, + func() time.Duration { return time.Second }, + func() time.Duration { return DefaultRecordDurationThreshold }) baseKey := keys.SystemSQLCodec.TablePrefix(51) for i := 0; i < 4; i++ { diff --git a/pkg/kv/kvserver/split/finder.go b/pkg/kv/kvserver/split/finder.go index 942216a36db6..e8f22934d630 100644 --- a/pkg/kv/kvserver/split/finder.go +++ b/pkg/kv/kvserver/split/finder.go @@ -43,13 +43,13 @@ import ( // key as split key, and provide hint to scatter the replicas. const ( - // RecordDurationThreshold is the minimum duration of time the split finder + // DefaultRecordDurationThreshold is the minimum duration of time the split finder // will record a range for, before being ready for a split. - RecordDurationThreshold = 10 * time.Second // 10s - splitKeySampleSize = 20 // size of split key sample - splitKeyMinCounter = 100 // min aggregate counters before consideration - splitKeyThreshold = 0.25 // 25% difference between left/right counters - splitKeyContainedThreshold = 0.50 // too many spanning queries over split point + DefaultRecordDurationThreshold = 10 * time.Second // 10s + splitKeySampleSize = 20 // size of split key sample + splitKeyMinCounter = 100 // min aggregate counters before consideration + splitKeyThreshold = 0.25 // 25% difference between left/right counters + splitKeyContainedThreshold = 0.50 // too many spanning queries over split point ) type sample struct { @@ -60,22 +60,24 @@ type sample struct { // Finder is a structure that is used to determine the split point // using the Reservoir Sampling method. type Finder struct { - startTime time.Time - samples [splitKeySampleSize]sample - count int + startTime time.Time + samples [splitKeySampleSize]sample + count int + recordDurationThreshold time.Duration } // NewFinder initiates a Finder with the given time. -func NewFinder(startTime time.Time) *Finder { +func NewFinder(startTime time.Time, recordDurationThreshold time.Duration) *Finder { return &Finder{ - startTime: startTime, + startTime: startTime, + recordDurationThreshold: recordDurationThreshold, } } // Ready checks if the Finder has been initialized with a sufficient // sample duration. func (f *Finder) Ready(nowTime time.Time) bool { - return nowTime.Sub(f.startTime) > RecordDurationThreshold + return nowTime.Sub(f.startTime) > f.recordDurationThreshold } // Record informs the Finder about where the span lies with diff --git a/pkg/kv/kvserver/split/finder_test.go b/pkg/kv/kvserver/split/finder_test.go index 6f6783868e78..8820e10b92b3 100644 --- a/pkg/kv/kvserver/split/finder_test.go +++ b/pkg/kv/kvserver/split/finder_test.go @@ -150,7 +150,7 @@ func TestSplitFinderKey(t *testing.T) { } for i, test := range testCases { - finder := NewFinder(timeutil.Now()) + finder := NewFinder(timeutil.Now(), DefaultRecordDurationThreshold) finder.samples = test.reservoir if splitByLoadKey := finder.Key(); !bytes.Equal(splitByLoadKey, test.splitByLoadKey) { t.Errorf( @@ -260,7 +260,7 @@ func TestSplitFinderRecorder(t *testing.T) { } for i, test := range testCases { - finder := NewFinder(timeutil.Now()) + finder := NewFinder(timeutil.Now(), DefaultRecordDurationThreshold) finder.samples = test.currReservoir finder.count = test.currCount finder.Record(test.recordSpan, test.intNFn) diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 8eb85129b526..6c4c3f8f3bf5 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -415,6 +415,9 @@ type StoreTestingKnobs struct { // AfterSendSnapshotThrottle intercepts replicas after receiving a spot in the // send snapshot semaphore. AfterSendSnapshotThrottle func() + // LoadBasedSplitRecordDurationThreshold determines how long the load based + // splitter should collect per key statistics before deciding on a split point. + LoadBasedSplitRecordDurationThreshold time.Duration // This method, if set, gets to see (and mutate, if desired) any local // StoreDescriptor before it is being sent out on the Gossip network. diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index 9cfe01a2ce66..f129f6917153 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -821,6 +821,10 @@ message AdminSplitRequest { // cause the split to be rejected. This can be used by a caller to effectively // send a "conditional split" request, i.e. a split if not already split. repeated bytes predicate_keys = 5 [(gogoproto.casttype) = "Key"]; + + // If set indicates that the system should perform a load based split. If this + // option is used the splitKey should be null. + bool load_based = 6; } // An AdminSplitResponse is the return value from the AdminSplit() diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 633b452ca5a8..f15d212c2e06 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -220,6 +220,7 @@ go_library( "show_zone_config.go", "sort.go", "split.go", + "split_range.go", "spool.go", "sql_cursor.go", "statement.go", diff --git a/pkg/sql/catalog/colinfo/result_columns.go b/pkg/sql/catalog/colinfo/result_columns.go index 4d648dbe105d..1098da9f7328 100644 --- a/pkg/sql/catalog/colinfo/result_columns.go +++ b/pkg/sql/catalog/colinfo/result_columns.go @@ -230,6 +230,14 @@ var AlterRangeRelocateColumns = ResultColumns{ {Name: "result", Typ: types.String}, } +// AlterRangeSplitColumns are the result columns of an +// ALTER RANGE .. SPLIT statement. +var AlterRangeSplitColumns = ResultColumns{ + {Name: "range_id", Typ: types.Int}, + {Name: "pretty", Typ: types.String}, + {Name: "result", Typ: types.String}, +} + // ScrubColumns are the result columns of a SCRUB statement. var ScrubColumns = ResultColumns{ {Name: "job_uuid", Typ: types.Uuid}, diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index 44e220113b4b..9d76a18205e0 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -1102,6 +1102,12 @@ func (e *distSQLSpecExecFactory) ConstructAlterRangeRelocate( return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: alter range relocate") } +func (e *distSQLSpecExecFactory) ConstructAlterRangeSplit( + input exec.Node, expiration tree.TypedExpr, +) (exec.Node, error) { + return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: alter range relocate") +} + func (e *distSQLSpecExecFactory) ConstructBuffer(input exec.Node, label string) (exec.Node, error) { return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: buffer") } diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index 9b42a104c97e..1761c7f8971a 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -319,6 +319,9 @@ func (b *Builder) buildRelational(e memo.RelExpr) (execPlan, error) { case *memo.AlterRangeRelocateExpr: ep, err = b.buildAlterRangeRelocate(t) + case *memo.AlterRangeSplitExpr: + ep, err = b.buildAlterRangeSplit(t) + case *memo.ControlJobsExpr: ep, err = b.buildControlJobs(t) diff --git a/pkg/sql/opt/exec/execbuilder/statement.go b/pkg/sql/opt/exec/execbuilder/statement.go index fe21920d9bf7..d136ee8e60a4 100644 --- a/pkg/sql/opt/exec/execbuilder/statement.go +++ b/pkg/sql/opt/exec/execbuilder/statement.go @@ -270,6 +270,23 @@ func (b *Builder) buildAlterRangeRelocate(relocate *memo.AlterRangeRelocateExpr) return planWithColumns(node, relocate.Columns), nil } +func (b *Builder) buildAlterRangeSplit(split *memo.AlterRangeSplitExpr) (execPlan, error) { + input, err := b.buildRelational(split.Input) + if err != nil { + return execPlan{}, err + } + scalarCtx := buildScalarCtx{} + expiration, err := b.buildScalar(&scalarCtx, split.Expiration) + if err != nil { + return execPlan{}, err + } + node, err := b.factory.ConstructAlterRangeSplit(input.root, expiration) + if err != nil { + return execPlan{}, err + } + return planWithColumns(node, split.Columns), nil +} + func (b *Builder) buildControlJobs(ctl *memo.ControlJobsExpr) (execPlan, error) { input, err := b.buildRelational(ctl.Input) if err != nil { diff --git a/pkg/sql/opt/exec/explain/emit.go b/pkg/sql/opt/exec/explain/emit.go index cd50a51984c4..20fcf5578f6e 100644 --- a/pkg/sql/opt/exec/explain/emit.go +++ b/pkg/sql/opt/exec/explain/emit.go @@ -273,6 +273,7 @@ func (e *emitter) nodeName(n *Node) (string, error) { var nodeNames = [...]string{ alterRangeRelocateOp: "relocate range", + alterRangeSplitOp: "split range", alterTableRelocateOp: "relocate table", alterTableSplitOp: "split", alterTableUnsplitAllOp: "unsplit all", @@ -932,6 +933,10 @@ func (e *emitter) emitNodeAttributes(n *Node) error { ob.Expr("from", a.fromStoreID, nil /* columns */) } + case alterRangeSplitOp: + a := n.args.(*alterRangeSplitArgs) + ob.Expr("expiration", a.Expiration, nil /* columns */) + case simpleProjectOp, serializingProjectOp, ordinalityOp, diff --git a/pkg/sql/opt/exec/explain/plan_gist_factory.go b/pkg/sql/opt/exec/explain/plan_gist_factory.go index 59df67ee0d30..dbd9dc7f2beb 100644 --- a/pkg/sql/opt/exec/explain/plan_gist_factory.go +++ b/pkg/sql/opt/exec/explain/plan_gist_factory.go @@ -35,7 +35,7 @@ import ( ) func init() { - if numOperators != 60 { + if numOperators != 61 { // This error occurs when an operator has been added or removed in // pkg/sql/opt/exec/explain/factory.opt. If an operator is added at the // end of factory.opt, simply adjust the hardcoded value above. If an diff --git a/pkg/sql/opt/exec/explain/result_columns.go b/pkg/sql/opt/exec/explain/result_columns.go index 5197d7e75876..0c1c9e796748 100644 --- a/pkg/sql/opt/exec/explain/result_columns.go +++ b/pkg/sql/opt/exec/explain/result_columns.go @@ -170,6 +170,9 @@ func getResultColumns( case alterRangeRelocateOp: return colinfo.AlterRangeRelocateColumns, nil + case alterRangeSplitOp: + return colinfo.AlterTableSplitColumns, nil + case exportOp: return colinfo.ExportColumns, nil diff --git a/pkg/sql/opt/exec/factory.opt b/pkg/sql/opt/exec/factory.opt index e655a7f5deda..998afb506dba 100644 --- a/pkg/sql/opt/exec/factory.opt +++ b/pkg/sql/opt/exec/factory.opt @@ -766,3 +766,9 @@ define LiteralValues { Rows tree.ExprContainer Columns colinfo.ResultColumns } + +# AlterRangeSplit implements ALTER RANGE SPLIT. +define AlterRangeSplit { + input exec.Node + Expiration tree.TypedExpr +} diff --git a/pkg/sql/opt/memo/logical_props_builder.go b/pkg/sql/opt/memo/logical_props_builder.go index 996531bc0cda..a92eda619cef 100644 --- a/pkg/sql/opt/memo/logical_props_builder.go +++ b/pkg/sql/opt/memo/logical_props_builder.go @@ -1069,6 +1069,12 @@ func (b *logicalPropsBuilder) buildAlterRangeRelocateProps( b.buildBasicProps(relocate, relocate.Columns, rel) } +func (b *logicalPropsBuilder) buildAlterRangeSplitProps( + split *AlterRangeSplitExpr, rel *props.Relational, +) { + b.buildBasicProps(split, split.Columns, rel) +} + func (b *logicalPropsBuilder) buildControlJobsProps(ctl *ControlJobsExpr, rel *props.Relational) { b.buildBasicProps(ctl, opt.ColList{}, rel) } diff --git a/pkg/sql/opt/ops/statement.opt b/pkg/sql/opt/ops/statement.opt index 385dfe63926b..9dd676aaaddc 100644 --- a/pkg/sql/opt/ops/statement.opt +++ b/pkg/sql/opt/ops/statement.opt @@ -342,3 +342,25 @@ define AlterRangeRelocatePrivate { # Props stores the required physical properties for the input expression. Props PhysProps } + +# AlterRangeSplit represents an `ALTER RANGE .. SPLIT ..` statement. +[Relational, Mutation] +define AlterRangeSplit { + # The input expression provides range IDs as integers. + Input RelExpr + + # Expiration is a string scalar that indicates a timestamp after which the + # ranges are eligible for automatic merging (or Null if there is no + # expiration). + Expiration ScalarExpr + _ AlterRangeSplitPrivate +} + +[Private] +define AlterRangeSplitPrivate { + # Columns stores the column IDs for the statement result columns. + Columns ColList + + # Props stores the required physical properties for the input expression. + Props PhysProps +} diff --git a/pkg/sql/opt/optbuilder/alter_range.go b/pkg/sql/opt/optbuilder/alter_range.go index 61cb87394716..63c8add0b7ff 100644 --- a/pkg/sql/opt/optbuilder/alter_range.go +++ b/pkg/sql/opt/optbuilder/alter_range.go @@ -32,15 +32,12 @@ func (b *Builder) buildAlterRangeRelocate( b.DisableMemoReuse = true outScope = inScope.push() - b.synthesizeResultColumns(outScope, colinfo.AlterTableRelocateColumns) + b.synthesizeResultColumns(outScope, colinfo.AlterRangeRelocateColumns) cmdName := "RELOCATE " + relocate.SubjectReplicas.String() colNames := []string{"range ids"} colTypes := []*types.T{types.Int} - outScope = inScope.push() - b.synthesizeResultColumns(outScope, colinfo.AlterRangeRelocateColumns) - // We don't allow the input statement to reference outer columns, so we // pass a "blank" scope rather than inScope. emptyScope := b.allocScope() @@ -74,3 +71,40 @@ func (b *Builder) buildAlterRangeRelocate( ) return outScope } + +// buildAlterRangeSplit builds an ALTER RANGE SPLIT. +func (b *Builder) buildAlterRangeSplit(split *tree.SplitRange, inScope *scope) (outScope *scope) { + + if err := b.catalog.RequireAdminRole(b.ctx, "ALTER RANGE SPLIT"); err != nil { + panic(err) + } + + // Disable optimizer caching, as we do for other ALTER statements. + b.DisableMemoReuse = true + + outScope = inScope.push() + b.synthesizeResultColumns(outScope, colinfo.AlterRangeSplitColumns) + + cmdName := "SPLIT" + colNames := []string{"range ids"} + colTypes := []*types.T{types.Int} + + // We don't allow the input statement to reference outer columns, so we + // pass a "blank" scope rather than inScope. + emptyScope := b.allocScope() + inputScope := b.buildStmt(split.Rows, colTypes, emptyScope) + checkInputColumns(cmdName, inputScope, colNames, colTypes, 1) + + // Build the expiration scalar. + expiration := buildExpirationScalar(split.ExpireExpr, exprKindAlterRangeSplit, b, emptyScope) + + outScope.expr = b.factory.ConstructAlterRangeSplit( + inputScope.expr, + expiration, + &memo.AlterRangeSplitPrivate{ + Columns: colsToColList(outScope.cols), + Props: physical.MinRequired, + }, + ) + return outScope +} diff --git a/pkg/sql/opt/optbuilder/alter_table.go b/pkg/sql/opt/optbuilder/alter_table.go index 4dbf794f1419..e793920459e9 100644 --- a/pkg/sql/opt/optbuilder/alter_table.go +++ b/pkg/sql/opt/optbuilder/alter_table.go @@ -51,26 +51,7 @@ func (b *Builder) buildAlterTableSplit(split *tree.Split, inScope *scope) (outSc checkInputColumns("SPLIT AT", inputScope, colNames, colTypes, 1) // Build the expiration scalar. - var expiration opt.ScalarExpr - if split.ExpireExpr != nil { - emptyScope.context = exprKindAlterTableSplitAt - // We need to save and restore the previous value of the field in - // semaCtx in case we are recursively called within a subquery - // context. - defer b.semaCtx.Properties.Restore(b.semaCtx.Properties) - b.semaCtx.Properties.Require(emptyScope.context.String(), tree.RejectSpecial) - - texpr := emptyScope.resolveType(split.ExpireExpr, types.String) - expiration = b.buildScalar( - texpr, - emptyScope, - nil, /* outScope */ - nil, /* outCol */ - nil, /* colRefs */ - ) - } else { - expiration = b.factory.ConstructNull(types.String) - } + expiration := buildExpirationScalar(split.ExpireExpr, exprKindAlterTableSplitAt, b, emptyScope) outScope = inScope.push() b.synthesizeResultColumns(outScope, colinfo.AlterTableSplitColumns) @@ -237,3 +218,26 @@ func checkInputColumns( } } } + +func buildExpirationScalar( + expireExpr tree.Expr, kind exprKind, b *Builder, emptyScope *scope, +) opt.ScalarExpr { + if expireExpr != nil { + emptyScope.context = kind + // We need to save and restore the previous value of the field in + // semaCtx in case we are recursively called within a subquery + // context. + defer b.semaCtx.Properties.Restore(b.semaCtx.Properties) + b.semaCtx.Properties.Require(emptyScope.context.String(), tree.RejectSpecial) + + texpr := emptyScope.resolveType(expireExpr, types.String) + return b.buildScalar( + texpr, + emptyScope, + nil, /* outScope */ + nil, /* outCol */ + nil, /* colRefs */ + ) + } + return b.factory.ConstructNull(types.String) +} diff --git a/pkg/sql/opt/optbuilder/builder.go b/pkg/sql/opt/optbuilder/builder.go index 8b47e01aacb3..b76ff3364968 100644 --- a/pkg/sql/opt/optbuilder/builder.go +++ b/pkg/sql/opt/optbuilder/builder.go @@ -284,7 +284,7 @@ func (b *Builder) buildStmt( case *tree.Delete, *tree.Insert, *tree.Update, *tree.CreateTable, *tree.CreateView, *tree.Split, *tree.Unsplit, *tree.Relocate, *tree.RelocateRange, *tree.ControlJobs, *tree.ControlSchedules, *tree.CancelQueries, *tree.CancelSessions, - *tree.CreateFunction: + *tree.CreateFunction, *tree.SplitRange: panic(pgerror.Newf( pgcode.Syntax, "%s cannot be used inside a view definition", stmt.StatementTag(), )) @@ -353,6 +353,9 @@ func (b *Builder) buildStmt( case *tree.RelocateRange: return b.buildAlterRangeRelocate(stmt, inScope) + case *tree.SplitRange: + return b.buildAlterRangeSplit(stmt, inScope) + case *tree.ControlJobs: return b.buildControlJobs(stmt, inScope) diff --git a/pkg/sql/opt/optbuilder/scope.go b/pkg/sql/opt/optbuilder/scope.go index f6486bb3124d..dfe41f60c165 100644 --- a/pkg/sql/opt/optbuilder/scope.go +++ b/pkg/sql/opt/optbuilder/scope.go @@ -139,6 +139,7 @@ const ( exprKindWhere exprKindWindowFrameStart exprKindWindowFrameEnd + exprKindAlterRangeSplit ) var exprKindName = [...]string{ @@ -160,6 +161,7 @@ var exprKindName = [...]string{ exprKindWhere: "WHERE", exprKindWindowFrameStart: "WINDOW FRAME START", exprKindWindowFrameEnd: "WINDOW FRAME END", + exprKindAlterRangeSplit: "ALTER RANGE SPLIT", } func (k exprKind) String() string { diff --git a/pkg/sql/opt/optbuilder/testdata/alter_range b/pkg/sql/opt/optbuilder/testdata/alter_range index 270718f35972..338cc122ad69 100644 --- a/pkg/sql/opt/optbuilder/testdata/alter_range +++ b/pkg/sql/opt/optbuilder/testdata/alter_range @@ -6,10 +6,10 @@ CREATE TABLE abc (a INT PRIMARY KEY, b INT, c STRING, INDEX b (b), UNIQUE INDEX build ALTER RANGE 1 RELOCATE FROM 1+2 TO 3+4 ---- -alter-range-relocate &{VOTERS [3 4 5] []} - ├── columns: range_id:3 pretty:4 result:5 +alter-range-relocate &{VOTERS [1 2 3] []} + ├── columns: range_id:1 pretty:2 result:3 ├── values - │ ├── columns: column1:6!null + │ ├── columns: column1:4!null │ └── (1,) ├── 3 + 4 └── 1 + 2 @@ -17,12 +17,12 @@ alter-range-relocate &{VOTERS [3 4 5] []} build ALTER RANGE RELOCATE FROM 1+2 TO 3+4 FOR SELECT a FROM abc ---- -alter-range-relocate &{VOTERS [3 4 5] []} - ├── columns: range_id:3 pretty:4 result:5 +alter-range-relocate &{VOTERS [1 2 3] []} + ├── columns: range_id:1 pretty:2 result:3 ├── project - │ ├── columns: a:6!null + │ ├── columns: a:4!null │ └── scan abc - │ └── columns: a:6!null b:7 c:8 crdb_internal_mvcc_timestamp:9 tableoid:10 + │ └── columns: a:4!null b:5 c:6 crdb_internal_mvcc_timestamp:7 tableoid:8 ├── 3 + 4 └── 1 + 2 @@ -39,10 +39,10 @@ error (42601): RELOCATE VOTERS data column 1 (range ids) must be of type int, no build ALTER RANGE 1 RELOCATE NONVOTERS FROM 1+2 TO 3+4 ---- -alter-range-relocate &{NONVOTERS [3 4 5] []} - ├── columns: range_id:3 pretty:4 result:5 +alter-range-relocate &{NONVOTERS [1 2 3] []} + ├── columns: range_id:1 pretty:2 result:3 ├── values - │ ├── columns: column1:6!null + │ ├── columns: column1:4!null │ └── (1,) ├── 3 + 4 └── 1 + 2 @@ -50,12 +50,12 @@ alter-range-relocate &{NONVOTERS [3 4 5] []} build ALTER RANGE RELOCATE NONVOTERS FROM 1+2 TO 3+4 FOR SELECT a FROM abc ---- -alter-range-relocate &{NONVOTERS [3 4 5] []} - ├── columns: range_id:3 pretty:4 result:5 +alter-range-relocate &{NONVOTERS [1 2 3] []} + ├── columns: range_id:1 pretty:2 result:3 ├── project - │ ├── columns: a:6!null + │ ├── columns: a:4!null │ └── scan abc - │ └── columns: a:6!null b:7 c:8 crdb_internal_mvcc_timestamp:9 tableoid:10 + │ └── columns: a:4!null b:5 c:6 crdb_internal_mvcc_timestamp:7 tableoid:8 ├── 3 + 4 └── 1 + 2 @@ -63,10 +63,10 @@ alter-range-relocate &{NONVOTERS [3 4 5] []} build ALTER RANGE 1 RELOCATE LEASE TO 1+2 ---- -alter-range-relocate &{LEASE [3 4 5] []} - ├── columns: range_id:3 pretty:4 result:5 +alter-range-relocate &{LEASE [1 2 3] []} + ├── columns: range_id:1 pretty:2 result:3 ├── values - │ ├── columns: column1:6!null + │ ├── columns: column1:4!null │ └── (1,) ├── 1 + 2 └── NULL::INT8 @@ -74,12 +74,12 @@ alter-range-relocate &{LEASE [3 4 5] []} build ALTER RANGE RELOCATE LEASE TO 1+2 FOR SELECT a FROM abc ---- -alter-range-relocate &{LEASE [3 4 5] []} - ├── columns: range_id:3 pretty:4 result:5 +alter-range-relocate &{LEASE [1 2 3] []} + ├── columns: range_id:1 pretty:2 result:3 ├── project - │ ├── columns: a:6!null + │ ├── columns: a:4!null │ └── scan abc - │ └── columns: a:6!null b:7 c:8 crdb_internal_mvcc_timestamp:9 tableoid:10 + │ └── columns: a:4!null b:5 c:6 crdb_internal_mvcc_timestamp:7 tableoid:8 ├── 1 + 2 └── NULL::INT8 @@ -87,3 +87,49 @@ build ALTER RANGE RELOCATE LEASE TO 1+2 FOR SELECT c FROM abc ---- error (42601): RELOCATE LEASE data column 1 (range ids) must be of type int, not type string + +# Tests for ALTER RANGE SPLIT. +build +ALTER RANGE 1 SPLIT +---- +alter-range-split + ├── columns: range_id:1 pretty:2 result:3 + ├── values + │ ├── columns: column1:4!null + │ └── (1,) + └── CAST(NULL AS STRING) + +build +ALTER RANGE 1 SPLIT WITH EXPIRATION '1 day' +---- +alter-range-split + ├── columns: range_id:1 pretty:2 result:3 + ├── values + │ ├── columns: column1:4!null + │ └── (1,) + └── '1 day' + +build +ALTER RANGE SPLIT FOR VALUES (1) +---- +alter-range-split + ├── columns: range_id:1 pretty:2 result:3 + ├── values + │ ├── columns: column1:4!null + │ └── (1,) + └── CAST(NULL AS STRING) + +build +ALTER RANGE SPLIT FOR VALUES (1) WITH EXPIRATION '1 day' +---- +alter-range-split + ├── columns: range_id:1 pretty:2 result:3 + ├── values + │ ├── columns: column1:4!null + │ └── (1,) + └── '1 day' + +build +ALTER RANGE SPLIT FOR SELECT c FROM abc +---- +error (42601): SPLIT data column 1 (range ids) must be of type int, not type string diff --git a/pkg/sql/opt/ordering/ordering.go b/pkg/sql/opt/ordering/ordering.go index 99ad41e8e0c5..e990f7647547 100644 --- a/pkg/sql/opt/ordering/ordering.go +++ b/pkg/sql/opt/ordering/ordering.go @@ -266,6 +266,11 @@ func init() { buildChildReqOrdering: alterRangeRelocateBuildChildReqOrdering, buildProvidedOrdering: noProvidedOrdering, } + funcMap[opt.AlterRangeSplitOp] = funcs{ + canProvideOrdering: canNeverProvideOrdering, + buildChildReqOrdering: alterRangeSplitBuildChildReqOrdering, + buildProvidedOrdering: noProvidedOrdering, + } funcMap[opt.ControlJobsOp] = funcs{ canProvideOrdering: canNeverProvideOrdering, buildChildReqOrdering: controlJobsBuildChildReqOrdering, diff --git a/pkg/sql/opt/ordering/statement.go b/pkg/sql/opt/ordering/statement.go index aad679d5977e..230a0fbb66d5 100644 --- a/pkg/sql/opt/ordering/statement.go +++ b/pkg/sql/opt/ordering/statement.go @@ -83,6 +83,15 @@ func alterRangeRelocateBuildChildReqOrdering( ) } +func alterRangeSplitBuildChildReqOrdering( + parent memo.RelExpr, required *props.OrderingChoice, childIdx int, +) props.OrderingChoice { + if childIdx != 0 { + return props.OrderingChoice{} + } + return parent.(*memo.AlterRangeSplitExpr).Props.Ordering +} + func controlJobsBuildChildReqOrdering( parent memo.RelExpr, required *props.OrderingChoice, childIdx int, ) props.OrderingChoice { diff --git a/pkg/sql/opt/xform/physical_props.go b/pkg/sql/opt/xform/physical_props.go index c987130eaf2f..a745411d4873 100644 --- a/pkg/sql/opt/xform/physical_props.go +++ b/pkg/sql/opt/xform/physical_props.go @@ -75,6 +75,8 @@ func BuildChildPhysicalProps( childProps.Presentation = parent.(*memo.AlterTableRelocateExpr).Props.Presentation case opt.AlterRangeRelocateOp: childProps.Presentation = parent.(*memo.AlterRangeRelocateExpr).Props.Presentation + case opt.AlterRangeSplitOp: + childProps.Presentation = parent.(*memo.AlterRangeSplitExpr).Props.Presentation case opt.ControlJobsOp: childProps.Presentation = parent.(*memo.ControlJobsExpr).Props.Presentation case opt.CancelQueriesOp: diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 70fcc249b7b3..fa5c3a54b392 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -2046,6 +2046,25 @@ func (ef *execFactory) ConstructAlterRangeRelocate( }, nil } +// ConstructAlterRangeSplit is part of the exec.Factory interface. +func (ef *execFactory) ConstructAlterRangeSplit( + input exec.Node, expiration tree.TypedExpr, +) (exec.Node, error) { + if !ef.planner.ExecCfg().Codec.ForSystemTenant() { + return nil, errorutil.UnsupportedWithMultiTenancy(54250) + } + + expirationTime, err := parseExpirationTime(ef.planner.EvalContext(), expiration) + if err != nil { + return nil, err + } + + return &splitRange{ + rows: input.(planNode), + expirationTime: expirationTime, + }, nil +} + // ConstructControlJobs is part of the exec.Factory interface. func (ef *execFactory) ConstructControlJobs( command tree.JobCommand, input exec.Node, reason tree.TypedExpr, diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 319f268aafe7..72a960c09b3a 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -1008,6 +1008,7 @@ func (u *sqlSymUnion) functionObjs() tree.FuncObjs { // ALTER RANGE %type alter_zone_range_stmt %type alter_range_relocate_stmt +%type alter_range_split_load_stmt // ALTER TABLE %type alter_onetable_stmt @@ -2124,6 +2125,7 @@ alter_database_set_zone_config_extension_stmt: alter_range_stmt: alter_zone_range_stmt | alter_range_relocate_stmt +| alter_range_split_load_stmt | ALTER RANGE error // SHOW HELP: ALTER RANGE // %Help: ALTER INDEX - change the definition of an index @@ -2380,6 +2382,40 @@ alter_range_relocate_stmt: } } +alter_range_split_load_stmt: +ALTER RANGE SPLIT FOR select_stmt + { + $$.val = &tree.SplitRange{ + Rows: $5.slct(), + ExpireExpr: tree.Expr(nil), + } + } +| ALTER RANGE SPLIT FOR select_stmt WITH EXPIRATION a_expr + { + $$.val = &tree.SplitRange{ + Rows: $5.slct(), + ExpireExpr: $8.expr(), + } + } +| ALTER RANGE a_expr SPLIT + { + $$.val = &tree.SplitRange{ + Rows: &tree.Select{ + Select: &tree.ValuesClause{Rows: []tree.Exprs{tree.Exprs{$3.expr()}}}, + }, + ExpireExpr: tree.Expr(nil), + } + } +| ALTER RANGE a_expr SPLIT WITH EXPIRATION a_expr + { + $$.val = &tree.SplitRange{ + Rows: &tree.Select{ + Select: &tree.ValuesClause{Rows: []tree.Exprs{tree.Exprs{$3.expr()}}}, + }, + ExpireExpr: $7.expr(), + } + } + set_zone_config: CONFIGURE ZONE to_or_eq a_expr { diff --git a/pkg/sql/parser/testdata/alter_range b/pkg/sql/parser/testdata/alter_range index 0fb8e649cd7b..46505c5b232d 100644 --- a/pkg/sql/parser/testdata/alter_range +++ b/pkg/sql/parser/testdata/alter_range @@ -150,3 +150,35 @@ ALTER RANGE RELOCATE NONVOTERS FROM 1 + 2 TO 1 + 1 FOR SELECT range_id FROM foo ALTER RANGE RELOCATE NONVOTERS FROM ((1) + (2)) TO ((1) + (1)) FOR SELECT (range_id) FROM foo -- fully parenthesized ALTER RANGE RELOCATE NONVOTERS FROM _ + _ TO _ + _ FOR SELECT range_id FROM foo -- literals removed ALTER RANGE RELOCATE NONVOTERS FROM 1 + 2 TO 1 + 1 FOR SELECT _ FROM _ -- identifiers removed + +parse +ALTER RANGE 1+3 SPLIT +---- +ALTER RANGE SPLIT FOR VALUES (1 + 3) -- normalized! +ALTER RANGE SPLIT FOR VALUES (((1) + (3))) -- fully parenthesized +ALTER RANGE SPLIT FOR VALUES (_ + _) -- literals removed +ALTER RANGE SPLIT FOR VALUES (1 + 3) -- identifiers removed + +parse +ALTER RANGE 1+3 SPLIT WITH EXPIRATION '1 day' +---- +ALTER RANGE SPLIT FOR VALUES (1 + 3) WITH EXPIRATION '1 day' -- normalized! +ALTER RANGE SPLIT FOR VALUES (((1) + (3))) WITH EXPIRATION ('1 day') -- fully parenthesized +ALTER RANGE SPLIT FOR VALUES (_ + _) WITH EXPIRATION '_' -- literals removed +ALTER RANGE SPLIT FOR VALUES (1 + 3) WITH EXPIRATION '1 day' -- identifiers removed + +parse +ALTER RANGE SPLIT FOR SELECT range_id FROM foo +---- +ALTER RANGE SPLIT FOR SELECT range_id FROM foo +ALTER RANGE SPLIT FOR SELECT (range_id) FROM foo -- fully parenthesized +ALTER RANGE SPLIT FOR SELECT range_id FROM foo -- literals removed +ALTER RANGE SPLIT FOR SELECT _ FROM _ -- identifiers removed + +parse +ALTER RANGE SPLIT FOR SELECT range_id FROM foo WITH EXPIRATION '1 day' +---- +ALTER RANGE SPLIT FOR SELECT range_id FROM foo WITH EXPIRATION '1 day' +ALTER RANGE SPLIT FOR SELECT (range_id) FROM foo WITH EXPIRATION ('1 day') -- fully parenthesized +ALTER RANGE SPLIT FOR SELECT range_id FROM foo WITH EXPIRATION '_' -- literals removed +ALTER RANGE SPLIT FOR SELECT _ FROM _ WITH EXPIRATION '1 day' -- identifiers removed diff --git a/pkg/sql/plan_columns.go b/pkg/sql/plan_columns.go index 195e42036d36..8c97ca1a11e7 100644 --- a/pkg/sql/plan_columns.go +++ b/pkg/sql/plan_columns.go @@ -110,6 +110,8 @@ func getPlanColumns(plan planNode, mut bool) colinfo.ResultColumns { return n.getColumns(mut, colinfo.AlterTableRelocateColumns) case *relocateRange: return n.getColumns(mut, colinfo.AlterRangeRelocateColumns) + case *splitRange: + return n.getColumns(mut, colinfo.AlterRangeSplitColumns) case *scatterNode: return n.getColumns(mut, colinfo.AlterTableScatterColumns) case *showFingerprintsNode: diff --git a/pkg/sql/sem/tree/alter_range.go b/pkg/sql/sem/tree/alter_range.go index b353d3b5fa83..d59ca1998470 100644 --- a/pkg/sql/sem/tree/alter_range.go +++ b/pkg/sql/sem/tree/alter_range.go @@ -65,3 +65,22 @@ func (n *RelocateRange) Format(ctx *FmtCtx) { ctx.WriteString(" FOR ") ctx.FormatNode(n.Rows) } + +// SplitRange represents an `ALTER RANGE .. SPLIT ..` +// statement. +type SplitRange struct { + Rows *Select + // Splits can last a specified amount of time before becoming eligible for + // automatic merging. + ExpireExpr Expr +} + +// Format implements the NodeFormatter interface. +func (node *SplitRange) Format(ctx *FmtCtx) { + ctx.WriteString("ALTER RANGE SPLIT FOR ") + ctx.FormatNode(node.Rows) + if node.ExpireExpr != nil { + ctx.WriteString(" WITH EXPIRATION ") + ctx.FormatNode(node.ExpireExpr) + } +} diff --git a/pkg/sql/sem/tree/stmt.go b/pkg/sql/sem/tree/stmt.go index 8948121de5eb..96caf9eb50f5 100644 --- a/pkg/sql/sem/tree/stmt.go +++ b/pkg/sql/sem/tree/stmt.go @@ -133,7 +133,7 @@ func CanWriteData(stmt Statement) bool { case *CopyFrom, *Import, *Restore: return true // CockroachDB extensions. - case *Split, *Unsplit, *Relocate, *RelocateRange, *Scatter: + case *Split, *Unsplit, *Relocate, *RelocateRange, *Scatter, *SplitRange: return true } return false @@ -1840,6 +1840,17 @@ func (*Split) StatementType() StatementType { return TypeDML } // StatementTag returns a short string identifying the type of statement. func (*Split) StatementTag() string { return "SPLIT" } +// StatementReturnType implements the Statement interface. +func (*SplitRange) StatementReturnType() StatementReturnType { return Rows } + +// StatementType implements the Statement interface. +func (*SplitRange) StatementType() StatementType { return TypeDML } + +// StatementTag returns a short string identifying the type of statement. +func (n *SplitRange) StatementTag() string { + return "SPLIT" +} + // StatementReturnType implements the Statement interface. func (*StreamIngestion) StatementReturnType() StatementReturnType { return Rows } @@ -2165,6 +2176,7 @@ func (n *ShowFingerprints) String() string { return AsString( func (n *ShowDefaultPrivileges) String() string { return AsString(n) } func (n *ShowCompletions) String() string { return AsString(n) } func (n *Split) String() string { return AsString(n) } +func (n *SplitRange) String() string { return AsString(n) } func (n *StreamIngestion) String() string { return AsString(n) } func (n *Unsplit) String() string { return AsString(n) } func (n *Truncate) String() string { return AsString(n) } diff --git a/pkg/sql/split_range.go b/pkg/sql/split_range.go new file mode 100644 index 000000000000..501d2a7d205f --- /dev/null +++ b/pkg/sql/split_range.go @@ -0,0 +1,97 @@ +// Copyright 2017 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" +) + +type splitRange struct { + optColumnsSlot + + rows planNode + expirationTime hlc.Timestamp + run splitRangeState +} + +// splitRangeState contains the run-time state of +// splitRange during local execution. +type splitRangeState struct { + lastRangeID roachpb.RangeID + lastRangeDesc *roachpb.RangeDescriptor + lastErr error +} + +func (n *splitRange) startExec(runParams) error { + return nil +} + +func (n *splitRange) Next(params runParams) (bool, error) { + if ok, err := n.rows.Next(params); err != nil || !ok { + return ok, err + } + datum := n.rows.Values()[0] + if datum == tree.DNull { + return true, nil + } + rangeID := roachpb.RangeID(tree.MustBeDInt(datum)) + + rangeDesc, err := split(params, rangeID, n.expirationTime) + + // record the results of the relocation run, so we can output it. + n.run = splitRangeState{ + lastRangeID: rangeID, + lastRangeDesc: rangeDesc, + lastErr: err, + } + return true, nil +} + +func (n *splitRange) Values() tree.Datums { + // TODO(lunevalex): figure out how not to hard code this and instead + // get the value from kv/kvserver/split/finder. + result := "requested, may take 10s to take effect" + if n.run.lastErr != nil { + result = n.run.lastErr.Error() + } + pretty := "" + if n.run.lastRangeDesc != nil { + pretty = keys.PrettyPrint(nil /* valDirs */, n.run.lastRangeDesc.StartKey.AsRawKey()) + } + return tree.Datums{ + tree.NewDInt(tree.DInt(n.run.lastRangeID)), + tree.NewDString(pretty), + tree.NewDString(result), + } +} + +func (n *splitRange) Close(ctx context.Context) { + n.rows.Close(ctx) +} + +func split( + params runParams, rangeID roachpb.RangeID, expirationTime hlc.Timestamp, +) (*roachpb.RangeDescriptor, error) { + rangeDesc, err := lookupRangeDescriptorByRangeID(params.ctx, params.extendedEvalCtx.ExecCfg.DB, rangeID) + if err != nil { + return nil, errors.Wrapf(err, "error looking up range descriptor") + } + if err := params.ExecCfg().DB.AdminLoadBasedSplit(params.ctx, rangeDesc.StartKey, expirationTime); err != nil { + return rangeDesc, err + } + return rangeDesc, nil +} diff --git a/pkg/sql/walk.go b/pkg/sql/walk.go index 2a57fe543325..ed258ae5ded5 100644 --- a/pkg/sql/walk.go +++ b/pkg/sql/walk.go @@ -188,6 +188,9 @@ func (v *planVisitor) visitInternal(plan planNode, name string) { case *relocateRange: n.rows = v.visit(n.rows) + case *splitRange: + n.rows = v.visit(n.rows) + case *insertNode, *insertFastPathNode: if ins, ok := n.(*insertNode); ok { ins.source = v.visit(ins.source) @@ -475,4 +478,5 @@ var planNodeNames = map[reflect.Type]string{ reflect.TypeOf(&zeroNode{}): "norows", reflect.TypeOf(&zigzagJoinNode{}): "zigzag join", reflect.TypeOf(&schemaChangePlanNode{}): "schema change", + reflect.TypeOf(&splitRange{}): "split range", }