Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
Enhanced key range support
Browse files Browse the repository at this point in the history
* The following is the first towards enhanced shard targeting as described in
github vitessio#3666.
* The following adds support for targeting a keyrange using the following
syntaxt: keyspace[range]@tablet_type.
* TODO: key range targeting within a statement.
* Renames handleRange to shardRangeExec
* Be consistent on how stats are calculated

Address PR comments

* Be more DRY and consolidate some code repitition.
* Rename variables for consistency.
* Oops - update to not use result, err var and just rss

Signed-off-by: Rafael Chacon <[email protected]>
  • Loading branch information
rafael committed Mar 5, 2018
1 parent fdcb52f commit d84fceb
Show file tree
Hide file tree
Showing 6 changed files with 285 additions and 41 deletions.
22 changes: 22 additions & 0 deletions go/vt/vtgate/autocommit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,28 @@ func TestAutocommitDirectTarget(t *testing.T) {
testCommitCount(t, "sbclookup", sbclookup, 1)
}

// TestAutocommitDirectRangeTarget: no instant-commit.
func TestAutocommitDirectRangeTarget(t *testing.T) {
executor, sbc1, _, _ := createExecutorEnv()

session := &vtgatepb.Session{
TargetString: "TestExecutor[-]@master",
Autocommit: true,
TransactionMode: vtgatepb.TransactionMode_MULTI,
}
sql := "DELETE FROM sharded_user_msgs LIMIT 1000"

if _, err := executor.Execute(context.Background(), "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{}); err != nil {
t.Error(err)
}
testQueries(t, "sbc1", sbc1, []*querypb.BoundQuery{{
Sql: sql + "/* vtgate:: filtered_replication_unfriendly */",
BindVariables: map[string]*querypb.BindVariable{},
}})
testAsTransactionCount(t, "sbc1", sbc1, 0)
testCommitCount(t, "sbc1", sbc1, 1)
}

func autocommitExec(executor *Executor, sql string) (*sqltypes.Result, error) {
session := &vtgatepb.Session{
TargetString: "@master",
Expand Down
111 changes: 80 additions & 31 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import (
)

var (
errNoKeyspace = vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no keyspace in database name specified. Supported database name format: keyspace[:shard][@type]")
errNoKeyspace = vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no keyspace in database name specified. Supported database name format (items in <> are optional): keyspace<:shard><@type> or keyspace<[range]><@type>")
defaultTabletType topodatapb.TabletType
)

Expand Down Expand Up @@ -213,12 +213,32 @@ func (e *Executor) execute(ctx context.Context, safeSession *SafeSession, sql st
}

func (e *Executor) handleExec(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, target querypb.Target, logStats *LogStats) (*sqltypes.Result, error) {
if target.Shard != "" {
// V1 mode or V3 mode with a forced shard target
keyRange, err := parseRange(safeSession.TargetString)
if err != nil {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "could not parse target %s (%s)", safeSession.TargetString, err.Error())
}

if keyRange != nil || target.Shard != "" {
// V1 mode or V3 mode with a forced shard or range target
// TODO(sougou): change this flow to go through V3 functions
// which will allow us to benefit from the autocommitable flag.
sql = sqlannotation.AnnotateIfDML(sql, nil)
if target.Keyspace == "" {
return nil, errNoKeyspace
}

var destination key.Destination
if keyRange != nil {
stmtType := sqlparser.Preview(sql)
if stmtType == sqlparser.StmtInsert {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "range queries not supported for inserts: %s", safeSession.TargetString)
}
destination = key.DestinationExactKeyRange{KeyRange: keyRange}
} else {
destination = key.DestinationShard(target.Shard)
}

execStart := time.Now()
sql = sqlannotation.AnnotateIfDML(sql, nil)
if e.normalize {
query, comments := sqlparser.SplitTrailingComments(sql)
stmt, err := sqlparser.Parse(query)
Expand All @@ -229,18 +249,11 @@ func (e *Executor) handleExec(ctx context.Context, safeSession *SafeSession, sql
normalized := sqlparser.String(stmt)
sql = normalized + comments
}

logStats.PlanTime = execStart.Sub(logStats.StartTime)
logStats.SQL = sql
logStats.BindVariables = bindVars

execStart := time.Now()
logStats.PlanTime = execStart.Sub(logStats.StartTime)

result, err := e.shardExec(ctx, safeSession, sql, bindVars, target, logStats)
result, err := e.destinationExec(ctx, safeSession, sql, bindVars, target, destination, logStats)
logStats.ExecuteTime = time.Now().Sub(execStart)

logStats.ShardQueries = 1

return result, err
}

Expand Down Expand Up @@ -284,36 +297,40 @@ func (e *Executor) handleExec(ctx context.Context, safeSession *SafeSession, sql
return qr, err
}

func (e *Executor) shardExec(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, target querypb.Target, logStats *LogStats) (*sqltypes.Result, error) {
func (e *Executor) destinationExec(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, target querypb.Target, destination key.Destination, logStats *LogStats) (*sqltypes.Result, error) {
f := func() ([]*srvtopo.ResolvedShard, error) {
return e.resolver.resolver.ResolveDestination(ctx, target.Keyspace, target.TabletType, key.DestinationShard(target.Shard))
rss, err := e.resolver.resolver.ResolveDestination(ctx, target.Keyspace, target.TabletType, destination)
if err != nil {
return nil, err
}
logStats.ShardQueries = uint32(len(rss))
return rss, nil
}
return e.resolver.Execute(ctx, sql, bindVars, target.Keyspace, target.TabletType, safeSession.Session, f, false /* notInTransaction */, safeSession.Options, logStats)
return e.resolver.Execute(ctx, sql, bindVars, target.TabletType, safeSession.Session, f, false /* notInTransaction */, safeSession.Options, logStats)
}

func (e *Executor) handleDDL(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, target querypb.Target, logStats *LogStats) (*sqltypes.Result, error) {
if target.Keyspace == "" {
return nil, errNoKeyspace
}

f := func() ([]*srvtopo.ResolvedShard, error) {
var destination key.Destination
if target.Shard == "" {
destination = key.DestinationAllShards{}
} else {
destination = key.DestinationShard(target.Shard)
}
rss, err := e.resolver.resolver.ResolveDestination(ctx, target.Keyspace, target.TabletType, destination)
if err != nil {
return nil, err
}
logStats.ShardQueries = uint32(len(rss))
return rss, nil
keyRange, err := parseRange(safeSession.TargetString)
if err != nil {
return nil, errNoKeyspace
}

var destination key.Destination
if keyRange != nil {
destination = key.DestinationExactKeyRange{KeyRange: keyRange}
} else if target.Shard != "" {
destination = key.DestinationShard(target.Shard)
} else {
destination = key.DestinationAllShards{}
}

execStart := time.Now()
logStats.PlanTime = execStart.Sub(logStats.StartTime)
result, err := e.resolver.Execute(ctx, sql, bindVars, target.Keyspace, target.TabletType, safeSession.Session, f, false /* notInTransaction */, safeSession.Options, logStats)
result, err := e.destinationExec(ctx, safeSession, sql, bindVars, target, destination, logStats)
logStats.ExecuteTime = time.Since(execStart)
return result, err
}
Expand Down Expand Up @@ -708,6 +725,11 @@ func (e *Executor) handleUse(ctx context.Context, safeSession *SafeSession, sql
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unrecognized USE statement: %v", sql)
}
target := e.ParseTarget(use.DBName.String())

if _, ok := e.VSchema().Keyspaces[target.Keyspace]; target.Keyspace != "" && !ok {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid keyspace provided: %s", target.Keyspace)
}

if safeSession.InTransaction() && target.TabletType != topodatapb.TabletType_MASTER {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot change to a non-master type in the middle of a transaction: %v", target.TabletType)
}
Expand All @@ -731,7 +753,7 @@ func (e *Executor) handleOther(ctx context.Context, safeSession *SafeSession, sq
target.Keyspace, target.Shard = rss[0].Target.Keyspace, rss[0].Target.Shard
}
execStart := time.Now()
result, err := e.shardExec(ctx, safeSession, sql, bindVars, target, logStats)
result, err := e.destinationExec(ctx, safeSession, sql, bindVars, target, key.DestinationShard(target.Shard), logStats)
logStats.ExecuteTime = time.Since(execStart)
return result, err
}
Expand Down Expand Up @@ -1064,6 +1086,11 @@ func (e *Executor) ParseTarget(targetString string) querypb.Target {
target.Shard = targetString[last+1:]
targetString = targetString[:last]
}
// Remove range query from string if present
last = strings.LastIndexAny(targetString, "[")
if last != -1 {
targetString = targetString[:last]
}
if targetString == "" && len(e.VSchema().Keyspaces) == 1 {
// Loop to extract the only keyspace name.
for k := range e.VSchema().Keyspaces {
Expand All @@ -1074,6 +1101,28 @@ func (e *Executor) ParseTarget(targetString string) querypb.Target {
return target
}

// parseRange parses range from target string.
func parseRange(targetString string) (*topodatapb.KeyRange, error) {
last := strings.LastIndexAny(targetString, "[")
if last != -1 {
rangeEnd := strings.LastIndexAny(targetString, "]")
if rangeEnd == -1 {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid key range provided. Couldn't find range end ']'")

}
rangeString := targetString[last+1 : rangeEnd]
keyRange, err := key.ParseShardingSpec(rangeString)
if err != nil {
return nil, err
}
if len(keyRange) != 1 {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "single keyrange expected in %s", rangeString)
}
return keyRange[0], nil
}
return nil, nil
}

// getPlan computes the plan for the given query. If one is in
// the cache, it reuses it.
func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments string, bindVars map[string]*querypb.BindVariable, skipQueryPlanCache bool, logStats *LogStats) (*engine.Plan, error) {
Expand Down
100 changes: 100 additions & 0 deletions go/vt/vtgate/executor_dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1513,3 +1513,103 @@ func TestMultiInsertGeneratorSparse(t *testing.T) {
t.Errorf("result: %+v, want %+v", result, &wantResult)
}
}

func TestKeyRangeQuery(t *testing.T) {
executor, sbc1, sbc2, _ := createExecutorEnv()
// it works in a single shard key range
masterSession.TargetString = "TestExecutor[40-60]"

_, err := executorExec(executor, "DELETE FROM sharded_user_msgs LIMIT 1000", nil)
if err != nil {
t.Error(err)
}
sql := "DELETE FROM sharded_user_msgs LIMIT 1000"
wantQueries := []*querypb.BoundQuery{{
Sql: sql + "/* vtgate:: filtered_replication_unfriendly */",
BindVariables: map[string]*querypb.BindVariable{},
}}

if len(sbc1.Queries) != 0 {
t.Errorf("sbc1.Queries: %+v, want %+v\n", sbc1.Queries, []*querypb.BoundQuery{})
}
testQueries(t, "sbc2", sbc2, wantQueries)

sbc1.Queries = nil
sbc2.Queries = nil

// it works with keyrange spanning two shards
masterSession.TargetString = "TestExecutor[-60]"

_, err = executorExec(executor, sql, nil)
if err != nil {
t.Error(err)
}
testQueries(t, "sbc1", sbc1, wantQueries)
testQueries(t, "sbc1", sbc2, wantQueries)

sbc1.Queries = nil
sbc2.Queries = nil

// it works with open ended key range
masterSession.TargetString = "TestExecutor[-]"

_, err = executorExec(executor, sql, nil)
if err != nil {
t.Error(err)
}

testQueries(t, "sbc1", sbc1, wantQueries)
testQueries(t, "sbc1", sbc2, wantQueries)

sbc1.Queries = nil
sbc2.Queries = nil

// it works for select
sql = "SELECT * FROM sharded_user_msgs LIMIT 1"
wantQueries = []*querypb.BoundQuery{{
Sql: sql,
BindVariables: map[string]*querypb.BindVariable{},
}}

_, err = executorExec(executor, sql, nil)
if err != nil {
t.Error(err)
}

testQueries(t, "sbc1", sbc1, wantQueries)
testQueries(t, "sbc2", sbc2, wantQueries)

sbc1.Queries = nil
sbc2.Queries = nil

// it works for updates
sql = "UPDATE sharded_user_msgs set message='test' LIMIT 1"

wantQueries = []*querypb.BoundQuery{{
Sql: sql + "/* vtgate:: filtered_replication_unfriendly */",
BindVariables: map[string]*querypb.BindVariable{},
}}

_, err = executorExec(executor, sql, nil)
if err != nil {
t.Error(err)
}

testQueries(t, "sbc1", sbc1, wantQueries)
testQueries(t, "sbc2", sbc2, wantQueries)

sbc1.Queries = nil
sbc2.Queries = nil

// it does not work for inserts
_, err = executorExec(executor, "INSERT INTO sharded_user_msgs(message) VALUE('test')", nil)

want := "range queries not supported for inserts: TestExecutor[-]"
if err == nil || err.Error() != want {
t.Errorf("got: %v, want %s", err, want)
}

sbc1.Queries = nil
sbc2.Queries = nil
masterSession.TargetString = ""
}
Loading

0 comments on commit d84fceb

Please sign in to comment.