From 093c12d9525281f9185d2e9376d3344d13425192 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Thu, 25 Aug 2022 00:51:11 -0400 Subject: [PATCH 1/2] tabletserver stream replace schema name bindvar Tabletserver Execute() replaces bindVars[BvSchemaName] with the tablet database name. In practice this means that queries like: SELECT * FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = 'data'; are rewritten to queries like: SELECT * FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = 'vt_data'; Tabletserver Stream() and StreamExecute() do not replicate this behavior, breaking queries like the one above when workload is OLAP. This PR applies the schema-name-replace logic to Stream() and StreamExecute(). Signed-off-by: Max Englander --- go/mysql/fakesqldb/server.go | 8 ++ go/vt/vttablet/tabletserver/query_executor.go | 7 ++ .../tabletserver/query_executor_test.go | 75 +++++++++++++++++++ 3 files changed, 90 insertions(+) diff --git a/go/mysql/fakesqldb/server.go b/go/mysql/fakesqldb/server.go index 1f1e071826b..a4275d14207 100644 --- a/go/mysql/fakesqldb/server.go +++ b/go/mysql/fakesqldb/server.go @@ -198,6 +198,14 @@ func New(t testing.TB) *DB { return db } +// Name returns the name of the DB. +func (db *DB) Name() string { + db.mu.Lock() + defer db.mu.Unlock() + + return db.name +} + // SetName sets the name of the DB. to differentiate them in tests if needed. func (db *DB) SetName(name string) *DB { db.mu.Lock() diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index f0f1297e790..11cccfa5e0e 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -286,6 +286,13 @@ func (qre *QueryExecutor) Stream(callback StreamCallback) error { return err } + switch qre.plan.PlanID { + case p.PlanSelectStream: + if qre.bindVars[sqltypes.BvReplaceSchemaName] != nil { + qre.bindVars[sqltypes.BvSchemaName] = sqltypes.StringBindVariable(qre.tsv.config.DB.DBName) + } + } + sql, sqlWithoutComments, err := qre.generateFinalSQL(qre.plan.FullQuery, qre.bindVars) if err != nil { return err diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index da31a570715..98b74faaaa3 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -1203,6 +1203,64 @@ func TestQueryExecutorDenyListQRRetry(t *testing.T) { } } +func TestReplaceSchemaName(t *testing.T) { + db := setUpQueryExecutorTest(t) + defer db.Close() + + queryFmt := "select * from information_schema.schema_name where schema_name = %s" + inQuery := fmt.Sprintf(queryFmt, ":"+sqltypes.BvSchemaName) + wantQuery := fmt.Sprintf(queryFmt, fmt.Sprintf( + "'%s' limit %d", + db.Name(), + 10001, + )) + wantQueryStream := fmt.Sprintf(queryFmt, fmt.Sprintf( + "'%s'", + db.Name(), + )) + + ctx := context.Background() + tsv := newTestTabletServer(ctx, noFlags, db) + defer tsv.StopService() + + db.AddQuery(wantQuery, &sqltypes.Result{ + Fields: getTestTableFields(), + }) + + db.AddQuery(wantQueryStream, &sqltypes.Result{ + Fields: getTestTableFields(), + }) + + // Test non streaming execute. + { + qre := newTestQueryExecutor(ctx, tsv, inQuery, 0) + assert.Equal(t, planbuilder.PlanSelect, qre.plan.PlanID) + // Any value other than nil should cause QueryExecutor to replace the + // schema name. + qre.bindVars[sqltypes.BvReplaceSchemaName] = sqltypes.NullBindVariable + _, err := qre.Execute() + require.NoError(t, err) + _, ok := qre.bindVars[sqltypes.BvSchemaName] + require.True(t, ok) + } + + // Test streaming execute. + { + qre := newTestQueryExecutorStreaming(ctx, tsv, inQuery, 0) + // Stream only replaces schema name when plan is PlanSelectStream. + assert.Equal(t, planbuilder.PlanSelectStream, qre.plan.PlanID) + // Any value other than nil should cause QueryExecutor to replace the + // schema name. + qre.bindVars[sqltypes.BvReplaceSchemaName] = sqltypes.NullBindVariable + err := qre.Stream(func(_ *sqltypes.Result) error { + _, ok := qre.bindVars[sqltypes.BvSchemaName] + require.True(t, ok) + return nil + }) + require.NoError(t, err) + } +} + type executorFlags int64 const ( @@ -1288,6 +1346,23 @@ func newTestQueryExecutor(ctx context.Context, tsv *TabletServer, sql string, tx } } +func newTestQueryExecutorStreaming(ctx context.Context, tsv *TabletServer, sql string, txID int64) *QueryExecutor { + logStats := tabletenv.NewLogStats(ctx, "TestQueryExecutorStreaming") + plan, err := tsv.qe.GetStreamPlan(sql, false) + if err != nil { + panic(err) + } + return &QueryExecutor{ + ctx: ctx, + query: sql, + bindVars: make(map[string]*querypb.BindVariable), + connID: txID, + plan: plan, + logStats: logStats, + tsv: tsv, + } +} + func setUpQueryExecutorTest(t *testing.T) *fakesqldb.DB { db := fakesqldb.New(t) initQueryExecutorTestDB(db) From ee74889775a2eac9d0768f2eabcec8640ac95fe3 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Tue, 6 Sep 2022 09:34:38 -0400 Subject: [PATCH 2/2] tabletserver stream replace schema name bindvar: add e2e test Signed-off-by: Max Englander --- go/test/endtoend/vtgate/gen4/system_schema_test.go | 10 ++++++++++ go/vt/vttablet/tabletserver/query_executor_test.go | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/vtgate/gen4/system_schema_test.go b/go/test/endtoend/vtgate/gen4/system_schema_test.go index a935bbcb7be..5f9bec3287f 100644 --- a/go/test/endtoend/vtgate/gen4/system_schema_test.go +++ b/go/test/endtoend/vtgate/gen4/system_schema_test.go @@ -37,11 +37,21 @@ func TestDbNameOverride(t *testing.T) { conn, err := mysql.Connect(ctx, &vtParams) require.Nil(t, err) defer conn.Close() + + // Test query in OLTP workload (default). qr, err := conn.ExecuteFetch("SELECT distinct database() FROM information_schema.tables WHERE table_schema = database()", 1000, true) require.Nil(t, err) assert.Equal(t, 1, len(qr.Rows), "did not get enough rows back") assert.Equal(t, "vt_ks", qr.Rows[0][0].ToString()) + + // Test again in OLAP workload (default). + utils.Exec(t, conn, "SET workload=OLAP") + qr, err = conn.ExecuteFetch("SELECT distinct database() FROM information_schema.tables WHERE table_schema = database()", 1000, true) + + require.Nil(t, err) + assert.Equal(t, 1, len(qr.Rows), "did not get enough rows back") + assert.Equal(t, "vt_ks", qr.Rows[0][0].ToString()) } func TestInformationSchemaQuery(t *testing.T) { diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index 98b74faaaa3..526419c4f49 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -1348,7 +1348,7 @@ func newTestQueryExecutor(ctx context.Context, tsv *TabletServer, sql string, tx func newTestQueryExecutorStreaming(ctx context.Context, tsv *TabletServer, sql string, txID int64) *QueryExecutor { logStats := tabletenv.NewLogStats(ctx, "TestQueryExecutorStreaming") - plan, err := tsv.qe.GetStreamPlan(sql, false) + plan, err := tsv.qe.GetStreamPlan(sql) if err != nil { panic(err) }