Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: add ALTER RANGE x SPLIT syntax #77972

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/sql/bnf/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ FILES = [
"alter_primary_key",
"alter_range_relocate_stmt",
"alter_range_stmt",
"alter_range_split_load_stmt",
"alter_rename_view_stmt",
"alter_role_stmt",
"alter_scatter_index_stmt",
Expand Down
5 changes: 5 additions & 0 deletions docs/generated/sql/bnf/alter_range_split_load_stmt.bnf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
alter_range_split_load_stmt ::=
'ALTER' 'RANGE' 'SPLIT' 'FOR' select_stmt
| 'ALTER' 'RANGE' 'SPLIT' 'FOR' select_stmt 'WITH' 'EXPIRATION' a_expr
| 'ALTER' 'RANGE' a_expr 'SPLIT'
| 'ALTER' 'RANGE' a_expr 'SPLIT' 'WITH' 'EXPIRATION' a_expr
1 change: 1 addition & 0 deletions docs/generated/sql/bnf/alter_range_stmt.bnf
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
alter_range_stmt ::=
alter_zone_range_stmt
| alter_range_relocate_stmt
| alter_range_split_load_stmt
7 changes: 7 additions & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -1568,6 +1568,7 @@ alter_database_stmt ::=
alter_range_stmt ::=
alter_zone_range_stmt
| alter_range_relocate_stmt
| alter_range_split_load_stmt

alter_partition_stmt ::=
alter_zone_partition_stmt
Expand Down Expand Up @@ -2168,6 +2169,12 @@ alter_range_relocate_stmt ::=
| 'ALTER' 'RANGE' relocate_kw relocate_subject_nonlease 'FROM' a_expr 'TO' a_expr 'FOR' select_stmt
| 'ALTER' 'RANGE' a_expr relocate_kw relocate_subject_nonlease 'FROM' a_expr 'TO' a_expr

alter_range_split_load_stmt ::=
'ALTER' 'RANGE' 'SPLIT' 'FOR' select_stmt
| 'ALTER' 'RANGE' 'SPLIT' 'FOR' select_stmt 'WITH' 'EXPIRATION' a_expr
| 'ALTER' 'RANGE' a_expr 'SPLIT'
| 'ALTER' 'RANGE' a_expr 'SPLIT' 'WITH' 'EXPIRATION' a_expr

alter_zone_partition_stmt ::=
'ALTER' 'PARTITION' partition_name 'OF' 'TABLE' table_name set_zone_config
| 'ALTER' 'PARTITION' partition_name 'OF' 'INDEX' table_index_name set_zone_config
Expand Down
1 change: 1 addition & 0 deletions pkg/gen/bnf.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ BNF_SRCS = [
"//docs/generated/sql/bnf:alter_partition_stmt.bnf",
"//docs/generated/sql/bnf:alter_primary_key.bnf",
"//docs/generated/sql/bnf:alter_range_relocate_stmt.bnf",
"//docs/generated/sql/bnf:alter_range_split_load_stmt.bnf",
"//docs/generated/sql/bnf:alter_range_stmt.bnf",
"//docs/generated/sql/bnf:alter_rename_view_stmt.bnf",
"//docs/generated/sql/bnf:alter_role_stmt.bnf",
Expand Down
1 change: 1 addition & 0 deletions pkg/gen/diagrams.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ DIAGRAMS_SRCS = [
"//docs/generated/sql/bnf:alter_primary_key.html",
"//docs/generated/sql/bnf:alter_range.html",
"//docs/generated/sql/bnf:alter_range_relocate.html",
"//docs/generated/sql/bnf:alter_range_split_load.html",
"//docs/generated/sql/bnf:alter_rename_view.html",
"//docs/generated/sql/bnf:alter_role.html",
"//docs/generated/sql/bnf:alter_scatter.html",
Expand Down
1 change: 1 addition & 0 deletions pkg/gen/docs.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ DOCS_SRCS = [
"//docs/generated/sql/bnf:alter_partition_stmt.bnf",
"//docs/generated/sql/bnf:alter_primary_key.bnf",
"//docs/generated/sql/bnf:alter_range_relocate_stmt.bnf",
"//docs/generated/sql/bnf:alter_range_split_load_stmt.bnf",
"//docs/generated/sql/bnf:alter_range_stmt.bnf",
"//docs/generated/sql/bnf:alter_rename_view_stmt.bnf",
"//docs/generated/sql/bnf:alter_role_stmt.bnf",
Expand Down
17 changes: 17 additions & 0 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,23 @@ func (b *Batch) adminSplit(
b.initResult(1, 0, notRaw, nil)
}

func (b *Batch) adminLoadBasedSplit(startKeyIn interface{}, expirationTime hlc.Timestamp) {
startKey, err := marshalKey(startKeyIn)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
req := &roachpb.AdminSplitRequest{
RequestHeader: roachpb.RequestHeader{
Key: startKey,
},
ExpirationTime: expirationTime,
LoadBased: true,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

func (b *Batch) adminUnsplit(splitKeyIn interface{}) {
splitKey, err := marshalKey(splitKeyIn)
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,24 @@ func (db *DB) AdminSplit(
return getOneErr(db.Run(ctx, b), b)
}

// AdminLoadBasedSplit splits the range at the load based key as determined
// a 10s snapshot of observed query load.
//
// startKey is the start key of the range we want to target for the load split..
//
// expirationTime is the timestamp when the split expires and is eligible for
// automatic merging by the merge queue. To specify that a split should
// immediately be eligible for automatic merging, set expirationTime to
// hlc.Timestamp{} (I.E. the zero timestamp). To specify that a split should
// never be eligible, set expirationTime to hlc.MaxTimestamp.
func (db *DB) AdminLoadBasedSplit(
ctx context.Context, startKey interface{}, expirationTime hlc.Timestamp,
) error {
b := &Batch{}
b.adminLoadBasedSplit(startKey, expirationTime)
return getOneErr(db.Run(ctx, b), b)
}

// AdminScatter scatters the range containing the specified key.
//
// maxSize greater than non-zero specified a maximum size of the range above
Expand Down
76 changes: 76 additions & 0 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
Expand Down Expand Up @@ -3649,3 +3650,78 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
repl = store.LookupReplica(roachpb.RKey(splitKey))
require.Equal(t, descKey, repl.Desc().StartKey.AsRawKey())
}

// TestAlterRangeSplit verifies that the ALTER_RANGE SPLIT commands work as expected.
func TestAlterRangeSplit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

settings := cluster.MakeTestingClusterSettings()
sv := &settings.SV
ctx := context.Background()
const numStores = 3
tc := testcluster.StartTestCluster(t, numStores,
base.TestClusterArgs{
ReplicationMode: base.ReplicationAuto,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
// Set the threshold to 1 second instead of the default 10, to speed
// up test.
LoadBasedSplitRecordDurationThreshold: 500 * time.Millisecond,
},
},
},
},
)
defer tc.Stopper().Stop(ctx)
require.NoError(t, tc.WaitForFullReplication())

// We set the threshold artificially high, so we know that the automatic
// process never splits the range.
kvserver.SplitByLoadQPSThreshold.Override(ctx, sv, 10000)

_, rhsDesc := tc.SplitRangeOrFatal(t, bootstrap.TestingUserTableDataMin())
// We start with having the range under test on (1,2,3).
db := tc.ServerConn(0)

startKey := rhsDesc.StartKey
endKey := rhsDesc.EndKey
// Put a consistent workload on the range, so it can pick a split point.
if err := tc.Stopper().RunAsyncTask(ctx, "workload", func(_ context.Context) {
store := tc.GetFirstStoreFromServer(t, 0)
key := rhsDesc.StartKey
keys := make([]roachpb.RKey, 10)
for i := 0; i < 10; i++ {
keys[i] = key
key = key.Next()
}
for {
select {
case <-tc.Stopper().ShouldQuiesce():
return // All done.
default:
// Keep going.
}
if err := store.DB().Put(ctx, keys[rand.Intn(10)], "foo"); err != nil {
t.Fatal(err)
}
key = key.Next()
time.Sleep(10 * time.Millisecond)
}
}); err != nil {
t.Fatal(err)
}

_, err := db.Exec("ALTER RANGE $1 SPLIT", rhsDesc.RangeID)
require.NoError(t, err)

testutils.SucceedsSoon(t, func() error {
repl := tc.GetFirstStoreFromServer(t, 0).LookupReplica(startKey)
require.NotNil(t, repl)
if repl.Desc().EndKey.Equal(endKey) {
return errors.Errorf("No split yet, since the end key is the same")
}
return nil
})
}
19 changes: 17 additions & 2 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,11 @@ var sendSnapshotTimeout = envutil.EnvOrDefaultDuration(
func (r *Replica) AdminSplit(
ctx context.Context, args roachpb.AdminSplitRequest, reason string,
) (reply roachpb.AdminSplitResponse, _ *roachpb.Error) {
if len(args.SplitKey) == 0 {
return roachpb.AdminSplitResponse{}, roachpb.NewErrorf("cannot split range with no key provided")
if len(args.SplitKey) == 0 && !args.LoadBased {
return roachpb.AdminSplitResponse{}, roachpb.NewErrorf("cannot split range with no key provided or load based flag set")
}
if len(args.SplitKey) != 0 && args.LoadBased {
return roachpb.AdminSplitResponse{}, roachpb.NewErrorf("cannot do a load based split with the provided key")
}

err := r.executeAdminCommandWithDescriptor(ctx, func(desc *roachpb.RangeDescriptor) error {
Expand Down Expand Up @@ -317,6 +320,18 @@ func (r *Replica) adminSplitWithDescriptor(
return reply, err
}

// A load based split is forced through the split queue, so this just sets
// the flags to force it to happen, rather than actually perform it.
// To perform an effective load based split we need to collect data on the
// keys to use and that data is only collected once we are ready to split.
// Thus forcing the loadBasedSplitter to start collecting data, will quickly
// force a split regardless of QPS threshold, assuming there is still
// traffic to this range.
if args.LoadBased {
r.loadBasedSplitter.ForceSplitDecision()
return reply, nil
}

// Determine split key if not provided with args. This scan is
// allowed to be relatively slow because admin commands don't block
// other commands.
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func newUnloadedReplica(
return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV))
}, func() time.Duration {
return kvserverbase.SplitByLoadMergeDelay.Get(&store.cfg.Settings.SV)
}, func() time.Duration {
if store.cfg.TestingKnobs.LoadBasedSplitRecordDurationThreshold != 0 {
return store.cfg.TestingKnobs.LoadBasedSplitRecordDurationThreshold
}
return split.DefaultRecordDurationThreshold
})
r.mu.proposals = map[kvserverbase.CmdIDKey]*ProposalData{}
r.mu.checksums = map[uuid.UUID]*replicaChecksum{}
Expand Down
29 changes: 24 additions & 5 deletions pkg/kv/kvserver/split/decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ const minQueriesPerSecondSampleDuration = time.Second
// have consistently remained below a certain QPS threshold for a sufficiently
// long period of time.
type Decider struct {
intn func(n int) int // supplied to Init
qpsThreshold func() float64 // supplied to Init
qpsRetention func() time.Duration // supplied to Init
intn func(n int) int // supplied to Init
qpsThreshold func() float64 // supplied to Init
qpsRetention func() time.Duration // supplied to Init
recordDurationThreshold func() time.Duration // supplied to Init

mu struct {
syncutil.Mutex
Expand All @@ -67,6 +68,11 @@ type Decider struct {
// Fields tracking split key suggestions.
splitFinder *Finder // populated when engaged or decided
lastSplitSuggestion time.Time // last stipulation to client to carry out split

// Setting to force a load based split. When forceLoadSplit is set to true
// collector will ignore the QPS threshold and force a split once enough data
// is collected.
forceSplitDecision bool
}
}

Expand All @@ -79,10 +85,22 @@ func Init(
intn func(n int) int,
qpsThreshold func() float64,
qpsRetention func() time.Duration,
recordDurationThreshold func() time.Duration,
) {
lbs.intn = intn
lbs.qpsThreshold = qpsThreshold
lbs.qpsRetention = qpsRetention
lbs.recordDurationThreshold = recordDurationThreshold
}

// ForceSplitDecision forces the Decider to enable tracking of individual keys
// to split on, which will result in a split happening once enough
// data is collected.
func (d *Decider) ForceSplitDecision() {
d.mu.Lock()
defer d.mu.Unlock()

d.mu.forceSplitDecision = true
}

// Record notifies the Decider that 'n' operations are being carried out which
Expand Down Expand Up @@ -123,9 +141,9 @@ func (d *Decider) recordLocked(now time.Time, n int, span func() roachpb.Span) b
// begin to Record requests so it can find a split point. If a
// splitFinder already exists, we check if a split point is ready
// to be used.
if d.mu.lastQPS >= d.qpsThreshold() {
if d.mu.lastQPS >= d.qpsThreshold() || d.mu.forceSplitDecision {
if d.mu.splitFinder == nil {
d.mu.splitFinder = NewFinder(now)
d.mu.splitFinder = NewFinder(now, d.recordDurationThreshold())
}
} else {
d.mu.splitFinder = nil
Expand Down Expand Up @@ -240,6 +258,7 @@ func (d *Decider) Reset(now time.Time) {
d.mu.maxQPS.reset(now, d.qpsRetention())
d.mu.splitFinder = nil
d.mu.lastSplitSuggestion = time.Time{}
d.mu.forceSplitDecision = false
}

// maxQPSTracker collects a series of queries-per-second measurement samples and
Expand Down
16 changes: 12 additions & 4 deletions pkg/kv/kvserver/split/decider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ func TestDecider(t *testing.T) {
intn := rand.New(rand.NewSource(12)).Intn

var d Decider
Init(&d, intn, func() float64 { return 10.0 }, func() time.Duration { return 2 * time.Second })
Init(&d, intn, func() float64 { return 10.0 },
func() time.Duration { return 2 * time.Second },
func() time.Duration { return DefaultRecordDurationThreshold })

op := func(s string) func() roachpb.Span {
return func() roachpb.Span { return roachpb.Span{Key: roachpb.Key(s)} }
Expand Down Expand Up @@ -194,7 +196,9 @@ func TestDecider_MaxQPS(t *testing.T) {
intn := rand.New(rand.NewSource(11)).Intn

var d Decider
Init(&d, intn, func() float64 { return 100.0 }, func() time.Duration { return 10 * time.Second })
Init(&d, intn, func() float64 { return 100.0 },
func() time.Duration { return 10 * time.Second },
func() time.Duration { return DefaultRecordDurationThreshold })

assertMaxQPS := func(i int, expMaxQPS float64, expOK bool) {
t.Helper()
Expand Down Expand Up @@ -237,7 +241,9 @@ func TestDeciderCallsEnsureSafeSplitKey(t *testing.T) {
intn := rand.New(rand.NewSource(11)).Intn

var d Decider
Init(&d, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second })
Init(&d, intn, func() float64 { return 1.0 },
func() time.Duration { return time.Second },
func() time.Duration { return DefaultRecordDurationThreshold })

baseKey := keys.SystemSQLCodec.TablePrefix(51)
for i := 0; i < 4; i++ {
Expand Down Expand Up @@ -270,7 +276,9 @@ func TestDeciderIgnoresEnsureSafeSplitKeyOnError(t *testing.T) {
intn := rand.New(rand.NewSource(11)).Intn

var d Decider
Init(&d, intn, func() float64 { return 1.0 }, func() time.Duration { return time.Second })
Init(&d, intn, func() float64 { return 1.0 },
func() time.Duration { return time.Second },
func() time.Duration { return DefaultRecordDurationThreshold })

baseKey := keys.SystemSQLCodec.TablePrefix(51)
for i := 0; i < 4; i++ {
Expand Down
Loading