From a247fb13c49fe433f386dc51f643c97aa5d353f4 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Wed, 20 May 2020 11:00:10 +0530 Subject: [PATCH] Get cached vschema from executor Signed-off-by: Harshit Gangal (cherry picked from commit 905b74c66d6f8db6eb9b8669e1ed516668fd90a8) --- go/vt/vtgate/executor.go | 4 ++-- go/vt/vtgate/executor_test.go | 16 ++++++++-------- go/vt/vtgate/plan_execute.go | 2 +- go/vt/vtgate/plan_executor_test.go | 14 +++++++------- go/vt/vtgate/vcursor_impl.go | 6 +----- go/vt/vtgate/vcursor_impl_test.go | 4 ++-- 6 files changed, 21 insertions(+), 25 deletions(-) diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 1fe0ba8916b..7feb16a8bf4 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -1044,7 +1044,7 @@ func (e *Executor) StreamExecute(ctx context.Context, method string, safeSession bindVars = make(map[string]*querypb.BindVariable) } query, comments := sqlparser.SplitMarginComments(sql) - vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.resolver.resolver) + vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver) switch stmtType { case sqlparser.StmtStream: @@ -1505,7 +1505,7 @@ func (e *Executor) prepare(ctx context.Context, safeSession *SafeSession, sql st func (e *Executor) handlePrepare(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, destKeyspace string, destTabletType topodatapb.TabletType, logStats *LogStats) ([]*querypb.Field, error) { // V3 mode. query, comments := sqlparser.SplitMarginComments(sql) - vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.resolver.resolver) + vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver) plan, err := e.getPlan( vcursor, query, diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index 650f5a5d9a7..284d6f618b1 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -1881,8 +1881,8 @@ func TestVSchemaStats(t *testing.T) { func TestGetPlanUnnormalized(t *testing.T) { r, _, _, _ := createExecutorEnv() - emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver) - unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver) + emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) + unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) logStats1 := NewLogStats(context.Background(), "Test", "", nil) query1 := "select * from music_user_map where id = 1" @@ -1940,7 +1940,7 @@ func TestGetPlanUnnormalized(t *testing.T) { func TestGetPlanCacheUnnormalized(t *testing.T) { r, _, _, _ := createExecutorEnv() - emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver) + emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) query1 := "select * from music_user_map where id = 1" logStats1 := NewLogStats(context.Background(), "Test", "", nil) _, err := r.getPlan(emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true /* skipQueryPlanCache */, logStats1) @@ -1965,7 +1965,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { // Skip cache using directive r, _, _, _ = createExecutorEnv() - unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver) + unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) query1 = "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)" logStats1 = NewLogStats(context.Background(), "Test", "", nil) @@ -1986,7 +1986,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { func TestGetPlanCacheNormalized(t *testing.T) { r, _, _, _ := createExecutorEnv() r.normalize = true - emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver) + emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) query1 := "select * from music_user_map where id = 1" logStats1 := NewLogStats(context.Background(), "Test", "", nil) _, err := r.getPlan(emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true /* skipQueryPlanCache */, logStats1) @@ -2011,7 +2011,7 @@ func TestGetPlanCacheNormalized(t *testing.T) { // Skip cache using directive r, _, _, _ = createExecutorEnv() r.normalize = true - unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver) + unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) query1 = "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)" logStats1 = NewLogStats(context.Background(), "Test", "", nil) @@ -2032,8 +2032,8 @@ func TestGetPlanCacheNormalized(t *testing.T) { func TestGetPlanNormalized(t *testing.T) { r, _, _, _ := createExecutorEnv() r.normalize = true - emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver) - unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver) + emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) + unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) query1 := "select * from music_user_map where id = 1" query2 := "select * from music_user_map where id = 2" diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index dddff3542a1..b1c93d7747d 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -95,7 +95,7 @@ func (e *planExecute) execute(ctx context.Context, safeSession *SafeSession, sql } query, comments := sqlparser.SplitMarginComments(sql) - vcursor, err := newVCursorImpl(ctx, safeSession, comments, e.e, logStats, e.e.vm, e.e.resolver.resolver) + vcursor, err := newVCursorImpl(ctx, safeSession, comments, e.e, logStats, e.e.vm, e.e.VSchema(), e.e.resolver.resolver) if err != nil { return nil, err } diff --git a/go/vt/vtgate/plan_executor_test.go b/go/vt/vtgate/plan_executor_test.go index a3af07077dd..304ef208cd7 100644 --- a/go/vt/vtgate/plan_executor_test.go +++ b/go/vt/vtgate/plan_executor_test.go @@ -1222,8 +1222,8 @@ func TestPlanVSchemaStats(t *testing.T) { func TestPlanGetPlanUnnormalized(t *testing.T) { r, _, _, _ := createExecutorEnvUsing(planAllTheThings) - emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver) - unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver) + emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) + unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) logStats1 := NewLogStats(context.Background(), "Test", "", nil) query1 := "select * from music_user_map where id = 1" @@ -1281,7 +1281,7 @@ func TestPlanGetPlanUnnormalized(t *testing.T) { func TestPlanGetPlanCacheUnnormalized(t *testing.T) { r, _, _, _ := createExecutorEnvUsing(planAllTheThings) - emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver) + emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) query1 := "select * from music_user_map where id = 1" logStats1 := NewLogStats(context.Background(), "Test", "", nil) _, err := r.getPlan(emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true /* skipQueryPlanCache */, logStats1) @@ -1306,7 +1306,7 @@ func TestPlanGetPlanCacheUnnormalized(t *testing.T) { // Skip cache using directive r, _, _, _ = createExecutorEnvUsing(planAllTheThings) - unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver) + unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) query1 = "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)" logStats1 = NewLogStats(context.Background(), "Test", "", nil) @@ -1327,7 +1327,7 @@ func TestPlanGetPlanCacheUnnormalized(t *testing.T) { func TestPlanGetPlanCacheNormalized(t *testing.T) { r, _, _, _ := createExecutorEnvUsing(planAllTheThings) r.normalize = true - emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver) + emptyvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) query1 := "select * from music_user_map where id = 1" logStats1 := NewLogStats(context.Background(), "Test", "", nil) _, err := r.getPlan(emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true /* skipQueryPlanCache */, logStats1) @@ -1352,7 +1352,7 @@ func TestPlanGetPlanCacheNormalized(t *testing.T) { // Skip cache using directive r, _, _, _ = createExecutorEnvUsing(planAllTheThings) r.normalize = true - unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.resolver.resolver) + unshardedvc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver) query1 = "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)" logStats1 = NewLogStats(context.Background(), "Test", "", nil) @@ -1464,7 +1464,7 @@ func TestPlanGetPlanNormalized(t *testing.T) { } func createVcursorImpl(r *Executor, targetString string) (*vcursorImpl, error) { - return newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: targetString}), makeComments(""), r, nil, &fakeVSchemaOperator{vschema: r.VSchema()}, r.resolver.resolver) + return newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: targetString}), makeComments(""), r, nil, &fakeVSchemaOperator{vschema: r.VSchema()}, r.VSchema(), r.resolver.resolver) } func TestPlanParseEmptyTargetSingleKeyspace(t *testing.T) { diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index ab0c45e8f26..c5871fa077f 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -129,11 +129,7 @@ func (vc *vcursorImpl) ExecuteVSchema(keyspace string, vschemaDDL *sqlparser.DDL // the query and supply it here. Trailing comments are typically sent by the application for various reasons, // including as identifying markers. So, they have to be added back to all queries that are executed // on behalf of the original query. -func newVCursorImpl(ctx context.Context, safeSession *SafeSession, marginComments sqlparser.MarginComments, executor *Executor, logStats *LogStats, vm VSchemaOperator, resolver *srvtopo.Resolver) (*vcursorImpl, error) { - vschema, err := vm.GetCurrentVschema() - if err != nil { - return nil, err - } +func newVCursorImpl(ctx context.Context, safeSession *SafeSession, marginComments sqlparser.MarginComments, executor *Executor, logStats *LogStats, vm VSchemaOperator, vschema *vindexes.VSchema, resolver *srvtopo.Resolver) (*vcursorImpl, error) { keyspace, tabletType, destination, err := parseDestinationTarget(safeSession.TargetString, vschema) if err != nil { return nil, err diff --git a/go/vt/vtgate/vcursor_impl_test.go b/go/vt/vtgate/vcursor_impl_test.go index f6b36c68fa8..b4d59d50906 100644 --- a/go/vt/vtgate/vcursor_impl_test.go +++ b/go/vt/vtgate/vcursor_impl_test.go @@ -138,7 +138,7 @@ func TestDestinationKeyspace(t *testing.T) { }} for _, tc := range tests { - impl, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: tc.targetString}), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, nil) + impl, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: tc.targetString}), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil) impl.vschema = tc.vschema dest, keyspace, tabletType, err := impl.TargetDestination(tc.qualifier) if tc.expectedError == "" { @@ -206,7 +206,7 @@ func TestSetTarget(t *testing.T) { for i, tc := range tests { t.Run(string(i)+"#"+tc.targetString, func(t *testing.T) { - vc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{InTransaction: true}), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, nil) + vc, _ := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{InTransaction: true}), sqlparser.MarginComments{}, nil, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil) vc.vschema = tc.vschema err := vc.SetTarget(tc.targetString) if tc.expectedError == "" {