Skip to content

Commit

Permalink
Get cached vschema from executor
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed May 20, 2020
1 parent 778227b commit 905b74c
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 25 deletions.
4 changes: 2 additions & 2 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1067,7 +1067,7 @@ func (e *Executor) StreamExecute(ctx context.Context, method string, safeSession
bindVars = make(map[string]*querypb.BindVariable)
}
query, comments := sqlparser.SplitMarginComments(sql)
vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.resolver.resolver)
vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver)

switch stmtType {
case sqlparser.StmtStream:
Expand Down Expand Up @@ -1528,7 +1528,7 @@ func (e *Executor) prepare(ctx context.Context, safeSession *SafeSession, sql st
func (e *Executor) handlePrepare(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, destKeyspace string, destTabletType topodatapb.TabletType, logStats *LogStats) ([]*querypb.Field, error) {
// V3 mode.
query, comments := sqlparser.SplitMarginComments(sql)
vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.resolver.resolver)
vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver)
plan, err := e.getPlan(
vcursor,
query,
Expand Down
16 changes: 8 additions & 8 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1913,8 +1913,8 @@ func TestVSchemaStats(t *testing.T) {

func TestGetPlanUnnormalized(t *testing.T) {
r, _, _, _ := createExecutorEnv()
emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver)
unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver)
emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver)
unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver)

logStats1 := NewLogStats(context.Background(), "Test", "", nil)
query1 := "select * from music_user_map where id = 1"
Expand Down Expand Up @@ -1972,7 +1972,7 @@ func TestGetPlanUnnormalized(t *testing.T) {

func TestGetPlanCacheUnnormalized(t *testing.T) {
r, _, _, _ := createExecutorEnv()
emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver)
emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver)
query1 := "select * from music_user_map where id = 1"
logStats1 := NewLogStats(context.Background(), "Test", "", nil)
_, err := r.getPlan(emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true /* skipQueryPlanCache */, logStats1)
Expand All @@ -1997,7 +1997,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) {

// Skip cache using directive
r, _, _, _ = createExecutorEnv()
unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver)
unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver)

query1 = "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)"
logStats1 = NewLogStats(context.Background(), "Test", "", nil)
Expand All @@ -2018,7 +2018,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) {
func TestGetPlanCacheNormalized(t *testing.T) {
r, _, _, _ := createExecutorEnv()
r.normalize = true
emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver)
emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver)
query1 := "select * from music_user_map where id = 1"
logStats1 := NewLogStats(context.Background(), "Test", "", nil)
_, err := r.getPlan(emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true /* skipQueryPlanCache */, logStats1)
Expand All @@ -2043,7 +2043,7 @@ func TestGetPlanCacheNormalized(t *testing.T) {
// Skip cache using directive
r, _, _, _ = createExecutorEnv()
r.normalize = true
unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver)
unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver)

query1 = "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)"
logStats1 = NewLogStats(context.Background(), "Test", "", nil)
Expand All @@ -2064,8 +2064,8 @@ func TestGetPlanCacheNormalized(t *testing.T) {
func TestGetPlanNormalized(t *testing.T) {
r, _, _, _ := createExecutorEnv()
r.normalize = true
emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver)
unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver)
emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver)
unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver)

query1 := "select * from music_user_map where id = 1"
query2 := "select * from music_user_map where id = 2"
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/plan_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (e *planExecute) execute(ctx context.Context, safeSession *SafeSession, sql
}

query, comments := sqlparser.SplitMarginComments(sql)
vcursor, err := newVCursorImpl(ctx, safeSession, comments, e.e, logStats, e.e.vm, e.e.resolver.resolver)
vcursor, err := newVCursorImpl(ctx, safeSession, comments, e.e, logStats, e.e.vm, e.e.VSchema(), e.e.resolver.resolver)
if err != nil {
return 0, nil, err
}
Expand Down
14 changes: 7 additions & 7 deletions go/vt/vtgate/plan_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,8 +1234,8 @@ func TestPlanVSchemaStats(t *testing.T) {

func TestPlanGetPlanUnnormalized(t *testing.T) {
r, _, _, _ := createExecutorEnvUsing(planAllTheThings)
emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver)
unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver)
emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver)
unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver)

logStats1 := NewLogStats(context.Background(), "Test", "", nil)
query1 := "select * from music_user_map where id = 1"
Expand Down Expand Up @@ -1293,7 +1293,7 @@ func TestPlanGetPlanUnnormalized(t *testing.T) {

func TestPlanGetPlanCacheUnnormalized(t *testing.T) {
r, _, _, _ := createExecutorEnvUsing(planAllTheThings)
emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver)
emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver)
query1 := "select * from music_user_map where id = 1"
logStats1 := NewLogStats(context.Background(), "Test", "", nil)
_, err := r.getPlan(emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true /* skipQueryPlanCache */, logStats1)
Expand All @@ -1318,7 +1318,7 @@ func TestPlanGetPlanCacheUnnormalized(t *testing.T) {

// Skip cache using directive
r, _, _, _ = createExecutorEnvUsing(planAllTheThings)
unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver)
unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver)

query1 = "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)"
logStats1 = NewLogStats(context.Background(), "Test", "", nil)
Expand All @@ -1339,7 +1339,7 @@ func TestPlanGetPlanCacheUnnormalized(t *testing.T) {
func TestPlanGetPlanCacheNormalized(t *testing.T) {
r, _, _, _ := createExecutorEnvUsing(planAllTheThings)
r.normalize = true
emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver)
emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver)
query1 := "select * from music_user_map where id = 1"
logStats1 := NewLogStats(context.Background(), "Test", "", nil)
_, err := r.getPlan(emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true /* skipQueryPlanCache */, logStats1)
Expand All @@ -1364,7 +1364,7 @@ func TestPlanGetPlanCacheNormalized(t *testing.T) {
// Skip cache using directive
r, _, _, _ = createExecutorEnvUsing(planAllTheThings)
r.normalize = true
unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver)
unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver)

query1 = "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)"
logStats1 = NewLogStats(context.Background(), "Test", "", nil)
Expand Down Expand Up @@ -1476,7 +1476,7 @@ func TestPlanGetPlanNormalized(t *testing.T) {
}

func createVcursorImpl(r *Executor, targetString string) (*vcursorImpl, error) {
return newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: targetString}), makeComments(""), r, nil, &fakeVSchemaOperator{vschema: r.VSchema()}, r.resolver.resolver)
return newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: targetString}), makeComments(""), r, nil, &fakeVSchemaOperator{vschema: r.VSchema()}, r.VSchema(), r.resolver.resolver)
}

func TestPlanParseEmptyTargetSingleKeyspace(t *testing.T) {
Expand Down
6 changes: 1 addition & 5 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,7 @@ func (vc *vcursorImpl) ExecuteVSchema(keyspace string, vschemaDDL *sqlparser.DDL
// the query and supply it here. Trailing comments are typically sent by the application for various reasons,
// including as identifying markers. So, they have to be added back to all queries that are executed
// on behalf of the original query.
func newVCursorImpl(ctx context.Context, safeSession *SafeSession, marginComments sqlparser.MarginComments, executor *Executor, logStats *LogStats, vm VSchemaOperator, resolver *srvtopo.Resolver) (*vcursorImpl, error) {
vschema, err := vm.GetCurrentVschema()
if err != nil {
return nil, err
}
func newVCursorImpl(ctx context.Context, safeSession *SafeSession, marginComments sqlparser.MarginComments, executor *Executor, logStats *LogStats, vm VSchemaOperator, vschema *vindexes.VSchema, resolver *srvtopo.Resolver) (*vcursorImpl, error) {
keyspace, tabletType, destination, err := parseDestinationTarget(safeSession.TargetString, vschema)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/vcursor_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestDestinationKeyspace(t *testing.T) {
}}

for _, tc := range tests {
impl, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: tc.targetString}), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, nil)
impl, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: tc.targetString}), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil)
impl.vschema = tc.vschema
dest, keyspace, tabletType, err := impl.TargetDestination(tc.qualifier)
if tc.expectedError == "" {
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestSetTarget(t *testing.T) {

for i, tc := range tests {
t.Run(string(i)+"#"+tc.targetString, func(t *testing.T) {
vc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{InTransaction: true}), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, nil)
vc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{InTransaction: true}), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil)
vc.vschema = tc.vschema
err := vc.SetTarget(tc.targetString)
if tc.expectedError == "" {
Expand Down

0 comments on commit 905b74c

Please sign in to comment.