Skip to content

Commit

Permalink
sql: add ALTER RANGE x SPLIT syntax
Browse files Browse the repository at this point in the history
Fixes #55116

Extend the ALTER RANGE syntax to support splitting a range.
ALTER RANGE x SPLIT will split a range at the best available
load based key. The new command will not perform a split directly
but rather advise the load based split decider to ignore the QPS
threshold and perform a load based split based on the lastest
available information. The decider has a 10s timeout built in between
deciding to split and determing the split key, so the actual split will
occur approximately 10s after the commnad is run.

To support all of this we changed AdminSplitRequest and added a new parameter
LoadBased. When set this will trigger a load based split on the range
through the split_queue.

Release note(sql change): Enhance the ALTER RANGE syntax to
allow an operator to manually perform a load based split of
a range.
  • Loading branch information
lunevalex committed Mar 16, 2022
1 parent 51cbdce commit 002d094
Show file tree
Hide file tree
Showing 32 changed files with 433 additions and 32 deletions.
1 change: 1 addition & 0 deletions docs/generated/sql/bnf/alter_range_stmt.bnf
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
alter_range_stmt ::=
alter_zone_range_stmt
| alter_range_relocate_stmt
| alter_range_split_load_stmt
7 changes: 7 additions & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,7 @@ alter_database_stmt ::=
alter_range_stmt ::=
alter_zone_range_stmt
| alter_range_relocate_stmt
| alter_range_split_load_stmt

alter_partition_stmt ::=
alter_zone_partition_stmt
Expand Down Expand Up @@ -2000,6 +2001,12 @@ alter_range_relocate_stmt ::=
| 'ALTER' 'RANGE' relocate_kw relocate_subject_nonlease 'FROM' a_expr 'TO' a_expr 'FOR' select_stmt
| 'ALTER' 'RANGE' a_expr relocate_kw relocate_subject_nonlease 'FROM' a_expr 'TO' a_expr

alter_range_split_load_stmt ::=
'ALTER' 'RANGE' 'SPLIT' 'FOR' select_stmt
| 'ALTER' 'RANGE' 'SPLIT' 'FOR' select_stmt 'WITH' 'EXPIRATION' a_expr
| 'ALTER' 'RANGE' iconst64 'SPLIT'
| 'ALTER' 'RANGE' iconst64 'SPLIT' 'WITH' 'EXPIRATION' a_expr

alter_zone_partition_stmt ::=
'ALTER' 'PARTITION' partition_name 'OF' 'TABLE' table_name set_zone_config
| 'ALTER' 'PARTITION' partition_name 'OF' 'INDEX' table_index_name set_zone_config
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/cliflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1211,7 +1211,7 @@ can also be specified (e.g. .25).`,

DemoWorkloadMaxQPS = FlagInfo{
Name: "workload-max-qps",
Description: "The maximum QPS when a workload is running.",
Description: "The maximum QPS when a workload is running.",
}

DemoNodeLocality = FlagInfo{
Expand Down
19 changes: 19 additions & 0 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,25 @@ func (b *Batch) adminSplit(
b.initResult(1, 0, notRaw, nil)
}

func (b *Batch) adminLoadBasedSplit(
startKeyIn interface{}, expirationTime hlc.Timestamp,
) {
startKey, err := marshalKey(startKeyIn)
if err != nil {
b.initResult(0, 0, notRaw, err)
return
}
req := &roachpb.AdminSplitRequest{
RequestHeader: roachpb.RequestHeader{
Key: startKey,
},
ExpirationTime: expirationTime,
LoadBased: true,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}

func (b *Batch) adminUnsplit(splitKeyIn interface{}) {
splitKey, err := marshalKey(splitKeyIn)
if err != nil {
Expand Down
19 changes: 19 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,25 @@ func (db *DB) AdminSplit(
return getOneErr(db.Run(ctx, b), b)
}

// AdminLoadBasedSplit splits the range at the load based key as determined by the
// range stats.
//
// expirationTime is the timestamp when the split expires and is eligible for
// automatic merging by the merge queue. To specify that a split should
// immediately be eligible for automatic merging, set expirationTime to
// hlc.Timestamp{} (I.E. the zero timestamp). To specify that a split should
// never be eligible, set expirationTime to hlc.MaxTimestamp.
//
func (db *DB) AdminLoadBasedSplit(
ctx context.Context,
startKey interface{},
expirationTime hlc.Timestamp,
) error {
b := &Batch{}
b.adminLoadBasedSplit(startKey, expirationTime)
return getOneErr(db.Run(ctx, b), b)
}

// SplitAndScatterResult carries wraps information about the SplitAndScatter
// call, including how long each step took or stats for range scattered.
type SplitAndScatterResult struct {
Expand Down
19 changes: 17 additions & 2 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ var sendSnapshotTimeout = envutil.EnvOrDefaultDuration(
func (r *Replica) AdminSplit(
ctx context.Context, args roachpb.AdminSplitRequest, reason string,
) (reply roachpb.AdminSplitResponse, _ *roachpb.Error) {
if len(args.SplitKey) == 0 {
return roachpb.AdminSplitResponse{}, roachpb.NewErrorf("cannot split range with no key provided")
if len(args.SplitKey) == 0 && !args.LoadBased {
return roachpb.AdminSplitResponse{}, roachpb.NewErrorf("cannot split range with no key provided or load based flag set")
}
if len(args.SplitKey) != 0 && args.LoadBased {
return roachpb.AdminSplitResponse{}, roachpb.NewErrorf("cannot do a load based split with the provided key")
}

err := r.executeAdminCommandWithDescriptor(ctx, func(desc *roachpb.RangeDescriptor) error {
Expand Down Expand Up @@ -321,6 +324,18 @@ func (r *Replica) adminSplitWithDescriptor(
return reply, err
}

// A load based split is forced through the split queue, so this just sets
// the flags to force it to happen, rather than actually perform it.
// To perform an effective load based split we need to collect data on the
// keys to use and that data is only collected once we are ready to split.
// Thus forcing the loadBasedSplitter to start collecting data, will quickly
// force a split regardless of QPS threshold, assuming there is still
// traffic to this range.
if args.LoadBased {
r.loadBasedSplitter.ForceSplitDecision()
return reply, nil
}

// Determine split key if not provided with args. This scan is
// allowed to be relatively slow because admin commands don't block
// other commands.
Expand Down
18 changes: 17 additions & 1 deletion pkg/kv/kvserver/split/decider.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ type Decider struct {
// Fields tracking split key suggestions.
splitFinder *Finder // populated when engaged or decided
lastSplitSuggestion time.Time // last stipulation to client to carry out split

// Setting to force a load based split. When forceLoadSplit is set to true
// collector will ignore the QPS threshold and force a split once enough data
// is collected.
forceSplitDecision bool
}
}

Expand All @@ -85,6 +90,16 @@ func Init(
lbs.qpsRetention = qpsRetention
}

// ForceSplitDecision forces the Decider to enable tracking of individual keys
// to split on, which will result in a actual split happening once enough
// data is collected. .
func (d *Decider) ForceSplitDecision() {
d.mu.Lock()
defer d.mu.Unlock()

d.mu.forceSplitDecision = true
}

// Record notifies the Decider that 'n' operations are being carried out which
// operate on the span returned by the supplied method. The closure will only
// be called when necessary, that is, when the Decider is considering a split
Expand Down Expand Up @@ -123,7 +138,7 @@ func (d *Decider) recordLocked(now time.Time, n int, span func() roachpb.Span) b
// begin to Record requests so it can find a split point. If a
// splitFinder already exists, we check if a split point is ready
// to be used.
if d.mu.lastQPS >= d.qpsThreshold() {
if d.mu.lastQPS >= d.qpsThreshold() || d.mu.forceSplitDecision {
if d.mu.splitFinder == nil {
d.mu.splitFinder = NewFinder(now)
}
Expand Down Expand Up @@ -240,6 +255,7 @@ func (d *Decider) Reset(now time.Time) {
d.mu.maxQPS.reset(now, d.qpsRetention())
d.mu.splitFinder = nil
d.mu.lastSplitSuggestion = time.Time{}
d.mu.forceSplitDecision = false
}

// maxQPSTracker collects a series of queries-per-second measurement samples and
Expand Down
4 changes: 4 additions & 0 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,10 @@ message AdminSplitRequest {
// cause the split to be rejected. This can be used by a caller to effectively
// send a "conditional split" request, i.e. a split if not already split.
repeated bytes predicate_keys = 5 [(gogoproto.casttype) = "Key"];

// If set indicates that the system should perform a load based split. If this
// option is used the splitKey should be null.
bool load_based = 6;
}

// An AdminSplitResponse is the return value from the AdminSplit()
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ go_library(
"show_zone_config.go",
"sort.go",
"split.go",
"split_range.go",
"spool.go",
"sql_cursor.go",
"statement.go",
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/catalog/colinfo/result_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,14 @@ var AlterRangeRelocateColumns = ResultColumns{
{Name: "result", Typ: types.String},
}

// AlterRangeSplitColumns are the result columns of an
// ALTER RANGE .. SPLIT statement.
var AlterRangeSplitColumns = ResultColumns{
{Name: "range_id", Typ: types.Int},
{Name: "pretty", Typ: types.String},
{Name: "result", Typ: types.String},
}

// ScrubColumns are the result columns of a SCRUB statement.
var ScrubColumns = ResultColumns{
{Name: "job_uuid", Typ: types.Uuid},
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,13 @@ func (e *distSQLSpecExecFactory) ConstructAlterRangeRelocate(
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: alter range relocate")
}

func (e *distSQLSpecExecFactory) ConstructAlterRangeSplit(
input exec.Node,
expiration tree.TypedExpr,
) (exec.Node, error) {
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: alter range relocate")
}

func (e *distSQLSpecExecFactory) ConstructBuffer(input exec.Node, label string) (exec.Node, error) {
return nil, unimplemented.NewWithIssue(47473, "experimental opt-driven distsql planning: buffer")
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,10 @@ func (b *Builder) buildRelational(e memo.RelExpr) (execPlan, error) {
case *memo.AlterRangeRelocateExpr:
ep, err = b.buildAlterRangeRelocate(t)

case *memo.ControlJobsExpr:
case *memo.AlterRangeSplitExpr:
ep, err = b.buildAlterRangeSplit(t)

case *memo.ControlJobsExpr:
ep, err = b.buildControlJobs(t)

case *memo.ControlSchedulesExpr:
Expand Down
17 changes: 17 additions & 0 deletions pkg/sql/opt/exec/execbuilder/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,23 @@ func (b *Builder) buildAlterRangeRelocate(relocate *memo.AlterRangeRelocateExpr)
return planWithColumns(node, relocate.Columns), nil
}

func (b *Builder) buildAlterRangeSplit(split *memo.AlterRangeSplitExpr) (execPlan, error) {
input, err := b.buildRelational(split.Input)
if err != nil {
return execPlan{}, err
}
scalarCtx := buildScalarCtx{}
expiration, err := b.buildScalar(&scalarCtx, split.Expiration)
if err != nil {
return execPlan{}, err
}
node, err := b.factory.ConstructAlterRangeSplit(input.root, expiration)
if err != nil {
return execPlan{}, err
}
return planWithColumns(node, split.Columns), nil
}

func (b *Builder) buildControlJobs(ctl *memo.ControlJobsExpr) (execPlan, error) {
input, err := b.buildRelational(ctl.Input)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/opt/exec/explain/emit.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ func (e *emitter) nodeName(n *Node) (string, error) {

var nodeNames = [...]string{
alterRangeRelocateOp: "relocate range",
alterRangeSplitOp: "split range",
alterTableRelocateOp: "relocate table",
alterTableSplitOp: "split",
alterTableUnsplitAllOp: "unsplit all",
Expand Down Expand Up @@ -853,6 +854,10 @@ func (e *emitter) emitNodeAttributes(n *Node) error {
ob.Expr("from", a.fromStoreID, nil /* columns */)
}

case alterRangeSplitOp:
a := n.args.(*alterRangeSplitArgs)
ob.Expr("expiration", a.Expiration, nil /* columns */)

case simpleProjectOp,
serializingProjectOp,
ordinalityOp,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/opt/exec/explain/plan_gist_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

func init() {
if numOperators != 58 {
if numOperators != 59 {
// If this error occurs please make sure the new op is the last one in order
// to not invalidate existing plan gists/hashes. If we are just adding an
// operator at the end there's no need to update version below and we can
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/opt/exec/explain/result_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ func getResultColumns(
case alterRangeRelocateOp:
return colinfo.AlterRangeRelocateColumns, nil

case alterRangeSplitOp:
return colinfo.AlterTableSplitColumns, nil

case exportOp:
return colinfo.ExportColumns, nil

Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/opt/exec/factory.opt
Original file line number Diff line number Diff line change
Expand Up @@ -740,3 +740,9 @@ define AlterRangeRelocate {
toStoreID tree.TypedExpr
fromStoreID tree.TypedExpr
}

# AlterRangeSplit implements ALTER RANGE SPLIT.
define AlterRangeSplit {
input exec.Node
Expiration tree.TypedExpr
}
6 changes: 6 additions & 0 deletions pkg/sql/opt/memo/logical_props_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,12 @@ func (b *logicalPropsBuilder) buildAlterRangeRelocateProps(
b.buildBasicProps(relocate, relocate.Columns, rel)
}

func (b *logicalPropsBuilder) buildAlterRangeSplitProps(
split *AlterRangeSplitExpr, rel *props.Relational,
) {
b.buildBasicProps(split, split.Columns, rel)
}

func (b *logicalPropsBuilder) buildControlJobsProps(ctl *ControlJobsExpr, rel *props.Relational) {
b.buildBasicProps(ctl, opt.ColList{}, rel)
}
Expand Down
23 changes: 23 additions & 0 deletions pkg/sql/opt/ops/statement.opt
Original file line number Diff line number Diff line change
Expand Up @@ -317,3 +317,26 @@ define AlterRangeRelocatePrivate {
# Props stores the required physical properties for the input expression.
Props PhysProps
}

# AlterRangeSplit represents an `ALTER RANGE .. SPLIT ..` statement.
[Relational, Mutation]
define AlterRangeSplit {
# The input expression provides range IDs as integers.
Input RelExpr

# Expiration is a string scalar that indicates a timestamp after which the
# ranges are eligible for automatic merging (or Null if there is no
# expiration).
Expiration ScalarExpr

_ AlterRangeSplitPrivate
}

[Private]
define AlterRangeSplitPrivate {
# Columns stores the column IDs for the statement result columns.
Columns ColList

# Props stores the required physical properties for the input expression.
Props PhysProps
}
44 changes: 40 additions & 4 deletions pkg/sql/opt/optbuilder/alter_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,12 @@ func (b *Builder) buildAlterRangeRelocate(
b.DisableMemoReuse = true

outScope = inScope.push()
b.synthesizeResultColumns(outScope, colinfo.AlterTableRelocateColumns)
b.synthesizeResultColumns(outScope, colinfo.AlterRangeRelocateColumns)

cmdName := "RELOCATE " + relocate.SubjectReplicas.String()
colNames := []string{"range ids"}
colTypes := []*types.T{types.Int}

outScope = inScope.push()
b.synthesizeResultColumns(outScope, colinfo.AlterRangeRelocateColumns)

// We don't allow the input statement to reference outer columns, so we
// pass a "blank" scope rather than inScope.
emptyScope := b.allocScope()
Expand Down Expand Up @@ -74,3 +71,42 @@ func (b *Builder) buildAlterRangeRelocate(
)
return outScope
}

// buildAlterRangeSplit builds an ALTER RANGE SPLIT.
func (b *Builder) buildAlterRangeSplit(
split *tree.SplitRange, inScope *scope,
) (outScope *scope) {

if err := b.catalog.RequireAdminRole(b.ctx, "ALTER RANGE SPLIT"); err != nil {
panic(err)
}

// Disable optimizer caching, as we do for other ALTER statements.
b.DisableMemoReuse = true

outScope = inScope.push()
b.synthesizeResultColumns(outScope, colinfo.AlterRangeSplitColumns)

cmdName := "SPLIT"
colNames := []string{"range ids"}
colTypes := []*types.T{types.Int}

// We don't allow the input statement to reference outer columns, so we
// pass a "blank" scope rather than inScope.
emptyScope := b.allocScope()
inputScope := b.buildStmt(split.Rows, colTypes, emptyScope)
checkInputColumns(cmdName, inputScope, colNames, colTypes, 1)

// Build the expiration scalar.
expiration := buildExpirationScalar(split.ExpireExpr, exprKindAlterRangeSplit, b, emptyScope)

outScope.expr = b.factory.ConstructAlterRangeSplit(
inputScope.expr,
expiration,
&memo.AlterRangeSplitPrivate{
Columns: colsToColList(outScope.cols),
Props: physical.MinRequired,
},
)
return outScope
}
Loading

0 comments on commit 002d094

Please sign in to comment.