From d84fceb8720e5e0f8dfb1e13c378e0fe043ef39a Mon Sep 17 00:00:00 2001 From: Rafael Chacon Date: Tue, 27 Feb 2018 15:32:52 -0800 Subject: [PATCH] Enhanced key range support * The following is the first towards enhanced shard targeting as described in github #3666. * The following adds support for targeting a keyrange using the following syntaxt: keyspace[range]@tablet_type. * TODO: key range targeting within a statement. * Renames handleRange to shardRangeExec * Be consistent on how stats are calculated Address PR comments * Be more DRY and consolidate some code repitition. * Rename variables for consistency. * Oops - update to not use result, err var and just rss Signed-off-by: Rafael Chacon --- go/vt/vtgate/autocommit_test.go | 22 ++++++ go/vt/vtgate/executor.go | 111 +++++++++++++++++++++--------- go/vt/vtgate/executor_dml_test.go | 100 +++++++++++++++++++++++++++ go/vt/vtgate/executor_test.go | 87 +++++++++++++++++++++-- go/vt/vtgate/resolver.go | 5 +- go/vt/vtgate/vtgate.go | 1 - 6 files changed, 285 insertions(+), 41 deletions(-) diff --git a/go/vt/vtgate/autocommit_test.go b/go/vt/vtgate/autocommit_test.go index 59e19846a37..b76d136c8f1 100644 --- a/go/vt/vtgate/autocommit_test.go +++ b/go/vt/vtgate/autocommit_test.go @@ -311,6 +311,28 @@ func TestAutocommitDirectTarget(t *testing.T) { testCommitCount(t, "sbclookup", sbclookup, 1) } +// TestAutocommitDirectRangeTarget: no instant-commit. +func TestAutocommitDirectRangeTarget(t *testing.T) { + executor, sbc1, _, _ := createExecutorEnv() + + session := &vtgatepb.Session{ + TargetString: "TestExecutor[-]@master", + Autocommit: true, + TransactionMode: vtgatepb.TransactionMode_MULTI, + } + sql := "DELETE FROM sharded_user_msgs LIMIT 1000" + + if _, err := executor.Execute(context.Background(), "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{}); err != nil { + t.Error(err) + } + testQueries(t, "sbc1", sbc1, []*querypb.BoundQuery{{ + Sql: sql + "/* vtgate:: filtered_replication_unfriendly */", + BindVariables: map[string]*querypb.BindVariable{}, + }}) + testAsTransactionCount(t, "sbc1", sbc1, 0) + testCommitCount(t, "sbc1", sbc1, 1) +} + func autocommitExec(executor *Executor, sql string) (*sqltypes.Result, error) { session := &vtgatepb.Session{ TargetString: "@master", diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 87394a748a5..0e4d14b72b8 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -53,7 +53,7 @@ import ( ) var ( - errNoKeyspace = vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no keyspace in database name specified. Supported database name format: keyspace[:shard][@type]") + errNoKeyspace = vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no keyspace in database name specified. Supported database name format (items in <> are optional): keyspace<:shard><@type> or keyspace<[range]><@type>") defaultTabletType topodatapb.TabletType ) @@ -213,12 +213,32 @@ func (e *Executor) execute(ctx context.Context, safeSession *SafeSession, sql st } func (e *Executor) handleExec(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, target querypb.Target, logStats *LogStats) (*sqltypes.Result, error) { - if target.Shard != "" { - // V1 mode or V3 mode with a forced shard target + keyRange, err := parseRange(safeSession.TargetString) + if err != nil { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "could not parse target %s (%s)", safeSession.TargetString, err.Error()) + } + + if keyRange != nil || target.Shard != "" { + // V1 mode or V3 mode with a forced shard or range target // TODO(sougou): change this flow to go through V3 functions // which will allow us to benefit from the autocommitable flag. - sql = sqlannotation.AnnotateIfDML(sql, nil) + if target.Keyspace == "" { + return nil, errNoKeyspace + } + + var destination key.Destination + if keyRange != nil { + stmtType := sqlparser.Preview(sql) + if stmtType == sqlparser.StmtInsert { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "range queries not supported for inserts: %s", safeSession.TargetString) + } + destination = key.DestinationExactKeyRange{KeyRange: keyRange} + } else { + destination = key.DestinationShard(target.Shard) + } + execStart := time.Now() + sql = sqlannotation.AnnotateIfDML(sql, nil) if e.normalize { query, comments := sqlparser.SplitTrailingComments(sql) stmt, err := sqlparser.Parse(query) @@ -229,18 +249,11 @@ func (e *Executor) handleExec(ctx context.Context, safeSession *SafeSession, sql normalized := sqlparser.String(stmt) sql = normalized + comments } - + logStats.PlanTime = execStart.Sub(logStats.StartTime) logStats.SQL = sql logStats.BindVariables = bindVars - - execStart := time.Now() - logStats.PlanTime = execStart.Sub(logStats.StartTime) - - result, err := e.shardExec(ctx, safeSession, sql, bindVars, target, logStats) + result, err := e.destinationExec(ctx, safeSession, sql, bindVars, target, destination, logStats) logStats.ExecuteTime = time.Now().Sub(execStart) - - logStats.ShardQueries = 1 - return result, err } @@ -284,11 +297,16 @@ func (e *Executor) handleExec(ctx context.Context, safeSession *SafeSession, sql return qr, err } -func (e *Executor) shardExec(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, target querypb.Target, logStats *LogStats) (*sqltypes.Result, error) { +func (e *Executor) destinationExec(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, target querypb.Target, destination key.Destination, logStats *LogStats) (*sqltypes.Result, error) { f := func() ([]*srvtopo.ResolvedShard, error) { - return e.resolver.resolver.ResolveDestination(ctx, target.Keyspace, target.TabletType, key.DestinationShard(target.Shard)) + rss, err := e.resolver.resolver.ResolveDestination(ctx, target.Keyspace, target.TabletType, destination) + if err != nil { + return nil, err + } + logStats.ShardQueries = uint32(len(rss)) + return rss, nil } - return e.resolver.Execute(ctx, sql, bindVars, target.Keyspace, target.TabletType, safeSession.Session, f, false /* notInTransaction */, safeSession.Options, logStats) + return e.resolver.Execute(ctx, sql, bindVars, target.TabletType, safeSession.Session, f, false /* notInTransaction */, safeSession.Options, logStats) } func (e *Executor) handleDDL(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, target querypb.Target, logStats *LogStats) (*sqltypes.Result, error) { @@ -296,24 +314,23 @@ func (e *Executor) handleDDL(ctx context.Context, safeSession *SafeSession, sql return nil, errNoKeyspace } - f := func() ([]*srvtopo.ResolvedShard, error) { - var destination key.Destination - if target.Shard == "" { - destination = key.DestinationAllShards{} - } else { - destination = key.DestinationShard(target.Shard) - } - rss, err := e.resolver.resolver.ResolveDestination(ctx, target.Keyspace, target.TabletType, destination) - if err != nil { - return nil, err - } - logStats.ShardQueries = uint32(len(rss)) - return rss, nil + keyRange, err := parseRange(safeSession.TargetString) + if err != nil { + return nil, errNoKeyspace + } + + var destination key.Destination + if keyRange != nil { + destination = key.DestinationExactKeyRange{KeyRange: keyRange} + } else if target.Shard != "" { + destination = key.DestinationShard(target.Shard) + } else { + destination = key.DestinationAllShards{} } execStart := time.Now() logStats.PlanTime = execStart.Sub(logStats.StartTime) - result, err := e.resolver.Execute(ctx, sql, bindVars, target.Keyspace, target.TabletType, safeSession.Session, f, false /* notInTransaction */, safeSession.Options, logStats) + result, err := e.destinationExec(ctx, safeSession, sql, bindVars, target, destination, logStats) logStats.ExecuteTime = time.Since(execStart) return result, err } @@ -708,6 +725,11 @@ func (e *Executor) handleUse(ctx context.Context, safeSession *SafeSession, sql return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unrecognized USE statement: %v", sql) } target := e.ParseTarget(use.DBName.String()) + + if _, ok := e.VSchema().Keyspaces[target.Keyspace]; target.Keyspace != "" && !ok { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid keyspace provided: %s", target.Keyspace) + } + if safeSession.InTransaction() && target.TabletType != topodatapb.TabletType_MASTER { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot change to a non-master type in the middle of a transaction: %v", target.TabletType) } @@ -731,7 +753,7 @@ func (e *Executor) handleOther(ctx context.Context, safeSession *SafeSession, sq target.Keyspace, target.Shard = rss[0].Target.Keyspace, rss[0].Target.Shard } execStart := time.Now() - result, err := e.shardExec(ctx, safeSession, sql, bindVars, target, logStats) + result, err := e.destinationExec(ctx, safeSession, sql, bindVars, target, key.DestinationShard(target.Shard), logStats) logStats.ExecuteTime = time.Since(execStart) return result, err } @@ -1064,6 +1086,11 @@ func (e *Executor) ParseTarget(targetString string) querypb.Target { target.Shard = targetString[last+1:] targetString = targetString[:last] } + // Remove range query from string if present + last = strings.LastIndexAny(targetString, "[") + if last != -1 { + targetString = targetString[:last] + } if targetString == "" && len(e.VSchema().Keyspaces) == 1 { // Loop to extract the only keyspace name. for k := range e.VSchema().Keyspaces { @@ -1074,6 +1101,28 @@ func (e *Executor) ParseTarget(targetString string) querypb.Target { return target } +// parseRange parses range from target string. +func parseRange(targetString string) (*topodatapb.KeyRange, error) { + last := strings.LastIndexAny(targetString, "[") + if last != -1 { + rangeEnd := strings.LastIndexAny(targetString, "]") + if rangeEnd == -1 { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid key range provided. Couldn't find range end ']'") + + } + rangeString := targetString[last+1 : rangeEnd] + keyRange, err := key.ParseShardingSpec(rangeString) + if err != nil { + return nil, err + } + if len(keyRange) != 1 { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "single keyrange expected in %s", rangeString) + } + return keyRange[0], nil + } + return nil, nil +} + // getPlan computes the plan for the given query. If one is in // the cache, it reuses it. func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments string, bindVars map[string]*querypb.BindVariable, skipQueryPlanCache bool, logStats *LogStats) (*engine.Plan, error) { diff --git a/go/vt/vtgate/executor_dml_test.go b/go/vt/vtgate/executor_dml_test.go index ff27351021a..c3affd6aab0 100644 --- a/go/vt/vtgate/executor_dml_test.go +++ b/go/vt/vtgate/executor_dml_test.go @@ -1513,3 +1513,103 @@ func TestMultiInsertGeneratorSparse(t *testing.T) { t.Errorf("result: %+v, want %+v", result, &wantResult) } } + +func TestKeyRangeQuery(t *testing.T) { + executor, sbc1, sbc2, _ := createExecutorEnv() + // it works in a single shard key range + masterSession.TargetString = "TestExecutor[40-60]" + + _, err := executorExec(executor, "DELETE FROM sharded_user_msgs LIMIT 1000", nil) + if err != nil { + t.Error(err) + } + sql := "DELETE FROM sharded_user_msgs LIMIT 1000" + wantQueries := []*querypb.BoundQuery{{ + Sql: sql + "/* vtgate:: filtered_replication_unfriendly */", + BindVariables: map[string]*querypb.BindVariable{}, + }} + + if len(sbc1.Queries) != 0 { + t.Errorf("sbc1.Queries: %+v, want %+v\n", sbc1.Queries, []*querypb.BoundQuery{}) + } + testQueries(t, "sbc2", sbc2, wantQueries) + + sbc1.Queries = nil + sbc2.Queries = nil + + // it works with keyrange spanning two shards + masterSession.TargetString = "TestExecutor[-60]" + + _, err = executorExec(executor, sql, nil) + if err != nil { + t.Error(err) + } + testQueries(t, "sbc1", sbc1, wantQueries) + testQueries(t, "sbc1", sbc2, wantQueries) + + sbc1.Queries = nil + sbc2.Queries = nil + + // it works with open ended key range + masterSession.TargetString = "TestExecutor[-]" + + _, err = executorExec(executor, sql, nil) + if err != nil { + t.Error(err) + } + + testQueries(t, "sbc1", sbc1, wantQueries) + testQueries(t, "sbc1", sbc2, wantQueries) + + sbc1.Queries = nil + sbc2.Queries = nil + + // it works for select + sql = "SELECT * FROM sharded_user_msgs LIMIT 1" + wantQueries = []*querypb.BoundQuery{{ + Sql: sql, + BindVariables: map[string]*querypb.BindVariable{}, + }} + + _, err = executorExec(executor, sql, nil) + if err != nil { + t.Error(err) + } + + testQueries(t, "sbc1", sbc1, wantQueries) + testQueries(t, "sbc2", sbc2, wantQueries) + + sbc1.Queries = nil + sbc2.Queries = nil + + // it works for updates + sql = "UPDATE sharded_user_msgs set message='test' LIMIT 1" + + wantQueries = []*querypb.BoundQuery{{ + Sql: sql + "/* vtgate:: filtered_replication_unfriendly */", + BindVariables: map[string]*querypb.BindVariable{}, + }} + + _, err = executorExec(executor, sql, nil) + if err != nil { + t.Error(err) + } + + testQueries(t, "sbc1", sbc1, wantQueries) + testQueries(t, "sbc2", sbc2, wantQueries) + + sbc1.Queries = nil + sbc2.Queries = nil + + // it does not work for inserts + _, err = executorExec(executor, "INSERT INTO sharded_user_msgs(message) VALUE('test')", nil) + + want := "range queries not supported for inserts: TestExecutor[-]" + if err == nil || err.Error() != want { + t.Errorf("got: %v, want %s", err, want) + } + + sbc1.Queries = nil + sbc2.Queries = nil + masterSession.TargetString = "" +} diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index 3d8fd98a8e9..d39892c6db3 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -18,6 +18,7 @@ package vtgate import ( "bytes" + "encoding/hex" "html/template" "reflect" "strings" @@ -721,12 +722,12 @@ func TestExecutorUse(t *testing.T) { session := NewSafeSession(&vtgatepb.Session{Autocommit: true, TargetString: "@master"}) stmts := []string{ - "use db", - "use `ks:-80@master`", + "use TestExecutor", + "use `TestExecutor:-80@master`", } want := []string{ - "db", - "ks:-80@master", + "TestExecutor", + "TestExecutor:-80@master", } for i, stmt := range stmts { _, err := executor.Execute(context.Background(), "TestExecute", session, stmt, nil) @@ -742,7 +743,13 @@ func TestExecutorUse(t *testing.T) { _, err := executor.Execute(context.Background(), "TestExecute", NewSafeSession(&vtgatepb.Session{}), "use 1", nil) wantErr := "syntax error at position 6 near '1'" if err == nil || err.Error() != wantErr { - t.Errorf("use 1: %v, want %v", err, wantErr) + t.Errorf("got: %v, want %v", err, wantErr) + } + + _, err = executor.Execute(context.Background(), "TestExecute", NewSafeSession(&vtgatepb.Session{}), "use UnexistentKeyspace", nil) + wantErr = "invalid keyspace provided: UnexistentKeyspace" + if err == nil || err.Error() != wantErr { + t.Errorf("got: %v, want %v", err, wantErr) } } @@ -1210,7 +1217,6 @@ func TestGetPlanNormalized(t *testing.T) { func TestParseTarget(t *testing.T) { r, _, _, _ := createExecutorEnv() - testcases := []struct { targetString string target querypb.Target @@ -1257,6 +1263,30 @@ func TestParseTarget(t *testing.T) { target: querypb.Target{ TabletType: topodatapb.TabletType_UNKNOWN, }, + }, { + targetString: "ks[10-20]@master", + target: querypb.Target{ + TabletType: topodatapb.TabletType_MASTER, + Keyspace: "ks", + }, + }, { + targetString: "ks[-]@master", + target: querypb.Target{ + TabletType: topodatapb.TabletType_MASTER, + Keyspace: "ks", + }, + }, { + targetString: "ks[10-]@master", + target: querypb.Target{ + TabletType: topodatapb.TabletType_MASTER, + Keyspace: "ks", + }, + }, { + targetString: "ks[-20]@master", + target: querypb.Target{ + TabletType: topodatapb.TabletType_MASTER, + Keyspace: "ks", + }, }} for _, tcase := range testcases { @@ -1266,6 +1296,34 @@ func TestParseTarget(t *testing.T) { } } +func TestParseRange(t *testing.T) { + tenHexBytes, _ := hex.DecodeString("10") + twentyHexBytes, _ := hex.DecodeString("20") + + testcases := []struct { + targetString string + target *topodatapb.KeyRange + }{{ + targetString: "ks[10-20]@master", + target: &topodatapb.KeyRange{Start: tenHexBytes, End: twentyHexBytes}, + }, { + targetString: "ks[-]@master", + target: &topodatapb.KeyRange{}, + }, { + targetString: "ks[10-]@master", + target: &topodatapb.KeyRange{Start: tenHexBytes}, + }, { + targetString: "ks[-20]@master", + target: &topodatapb.KeyRange{End: twentyHexBytes}, + }} + + for _, tcase := range testcases { + if target, _ := parseRange(tcase.targetString); !proto.Equal(target, tcase.target) { + t.Errorf("ParseRange(%s) - got: %v, want %v", tcase.targetString, target, tcase.target) + } + } +} + func TestParseTargetSingleKeyspace(t *testing.T) { r, _, _, _ := createExecutorEnv() altVSchema := &vindexes.VSchema{ @@ -1322,6 +1380,23 @@ func TestPassthroughDDL(t *testing.T) { } sbc2.Queries = nil masterSession.TargetString = "" + + // Use range query + masterSession.TargetString = "TestExecutor[-]" + executor.normalize = true + + _, err = executorExec(executor, "/* leading */ create table passthrough_ddl (col bigint default 123) /* trailing */", nil) + if err != nil { + t.Error(err) + } + if !reflect.DeepEqual(sbc1.Queries, wantQueries) { + t.Errorf("sbc2.Queries: %+v, want %+v\n", sbc1.Queries, wantQueries) + } + if !reflect.DeepEqual(sbc2.Queries, wantQueries) { + t.Errorf("sbc2.Queries: %+v, want %+v\n", sbc2.Queries, wantQueries) + } + sbc2.Queries = nil + masterSession.TargetString = "" } func TestParseEmptyTargetSingleKeyspace(t *testing.T) { diff --git a/go/vt/vtgate/resolver.go b/go/vt/vtgate/resolver.go index 83eb2f1ddf5..d86a7ec89d4 100644 --- a/go/vt/vtgate/resolver.go +++ b/go/vt/vtgate/resolver.go @@ -85,7 +85,7 @@ func (res *Resolver) ExecuteKeyspaceIds(ctx context.Context, sql string, bindVar tabletType, key.DestinationKeyspaceIDs(keyspaceIds)) } - return res.Execute(ctx, sql, bindVariables, keyspace, tabletType, session, mapToShards, notInTransaction, options, nil /* LogStats */) + return res.Execute(ctx, sql, bindVariables, tabletType, session, mapToShards, notInTransaction, options, nil /* LogStats */) } // ExecuteKeyRanges executes a non-streaming query based on KeyRanges. @@ -98,7 +98,7 @@ func (res *Resolver) ExecuteKeyRanges(ctx context.Context, sql string, bindVaria tabletType, key.DestinationKeyRanges(keyRanges)) } - return res.Execute(ctx, sql, bindVariables, keyspace, tabletType, session, mapToShards, notInTransaction, options, nil) + return res.Execute(ctx, sql, bindVariables, tabletType, session, mapToShards, notInTransaction, options, nil) } // Execute executes a non-streaming query based on shards resolved by given func. @@ -107,7 +107,6 @@ func (res *Resolver) Execute( ctx context.Context, sql string, bindVars map[string]*querypb.BindVariable, - keyspace string, tabletType topodatapb.TabletType, session *vtgatepb.Session, mapToShards func() ([]*srvtopo.ResolvedShard, error), diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 6e306eda460..d33ae2c9027 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -407,7 +407,6 @@ func (vtg *VTGate) ExecuteShards(ctx context.Context, sql string, bindVariables ctx, sql, bindVariables, - keyspace, tabletType, session, func() ([]*srvtopo.ResolvedShard, error) {