Skip to content

Commit

Permalink
Merge pull request #8480 from planetscale/gen4-dba-tables
Browse files Browse the repository at this point in the history
Gen4: extracting routing information from predicates for system tables
  • Loading branch information
harshit-gangal authored Jul 20, 2021
2 parents ede1c3b + 08fc0be commit b472ac6
Show file tree
Hide file tree
Showing 24 changed files with 1,365 additions and 810 deletions.
20 changes: 20 additions & 0 deletions go/test/endtoend/vtgate/gen4/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ create table t2(
}
}
}`

routingRules = `
{"rules": [
{
"from_table": "ks.t1000",
"to_tables": ["ks.t1"]
}
]}
`
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -104,6 +113,17 @@ func TestMain(m *testing.M) {
return 1
}

// apply routing rules
err = clusterInstance.VtctlclientProcess.ApplyRoutingRules(routingRules)
if err != nil {
return 1
}

err = clusterInstance.VtctlclientProcess.ExecuteCommand("RebuildVSchemaGraph")
if err != nil {
return 1
}

// Start vtgate
clusterInstance.VtGateExtraArgs = []string{"-planner_version", "Gen4"} // enable Gen4 planner.
err = clusterInstance.StartVtgate()
Expand Down
202 changes: 202 additions & 0 deletions go/test/endtoend/vtgate/gen4/system_schema_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
Copyright 2020 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vtgate

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
)

func TestDbNameOverride(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
defer conn.Close()
qr, err := conn.ExecuteFetch("SELECT distinct database() FROM information_schema.tables WHERE table_schema = database()", 1000, true)

require.Nil(t, err)
assert.Equal(t, 1, len(qr.Rows), "did not get enough rows back")
assert.Equal(t, "vt_ks", qr.Rows[0][0].ToString())
}

func TestInformationSchemaQuery(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

assertSingleRowIsReturned(t, conn, "table_schema = 'ks'", "vt_ks")
assertSingleRowIsReturned(t, conn, "table_schema = 'vt_ks'", "vt_ks")
assertResultIsEmpty(t, conn, "table_schema = 'NONE'")
assertSingleRowIsReturned(t, conn, "table_schema = 'performance_schema'", "performance_schema")
assertResultIsEmpty(t, conn, "table_schema = 'PERFORMANCE_SCHEMA'")
assertSingleRowIsReturned(t, conn, "table_schema = 'performance_schema' and table_name = 'users'", "performance_schema")
assertResultIsEmpty(t, conn, "table_schema = 'performance_schema' and table_name = 'foo'")
assertSingleRowIsReturned(t, conn, "table_schema = 'vt_ks' and table_name = 't1'", "vt_ks")
assertSingleRowIsReturned(t, conn, "table_schema = 'ks' and table_name = 't1'", "vt_ks")
}

func assertResultIsEmpty(t *testing.T, conn *mysql.Conn, pre string) {
t.Run(pre, func(t *testing.T) {
qr, err := conn.ExecuteFetch("SELECT distinct table_schema FROM information_schema.tables WHERE "+pre, 1000, true)
require.NoError(t, err)
assert.Empty(t, qr.Rows)
})
}

func assertSingleRowIsReturned(t *testing.T, conn *mysql.Conn, predicate string, expectedKs string) {
t.Run(predicate, func(t *testing.T) {
qr, err := conn.ExecuteFetch("SELECT distinct table_schema FROM information_schema.tables WHERE "+predicate, 1000, true)
require.NoError(t, err)
assert.Equal(t, 1, len(qr.Rows), "did not get enough rows back")
assert.Equal(t, expectedKs, qr.Rows[0][0].ToString())
})
}

func TestInformationSchemaWithSubquery(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

result := checkedExec(t, conn, "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = (SELECT SCHEMA()) AND TABLE_NAME = 'not_exists'")
assert.Empty(t, result.Rows)
}

func TestInformationSchemaQueryGetsRoutedToTheRightTableAndKeyspace(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

_ = checkedExec(t, conn, "SELECT id FROM t1000") // test that the routed table is available to us
result := checkedExec(t, conn, "SELECT * FROM information_schema.tables WHERE table_schema = database() and table_name='t1000'")
assert.NotEmpty(t, result.Rows)
}

func TestFKConstraintUsingInformationSchema(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

checkedExec(t, conn, "create table t7_xxhash(uid varchar(50),phone bigint,msg varchar(100),primary key(uid)) Engine=InnoDB")
checkedExec(t, conn, "create table t7_fk(id bigint,t7_uid varchar(50),primary key(id),CONSTRAINT t7_fk_ibfk_1 foreign key (t7_uid) references t7_xxhash(uid) on delete set null on update cascade) Engine=InnoDB;")
defer checkedExec(t, conn, "drop table t7_fk, t7_xxhash")

query := "select fk.referenced_table_name as to_table, fk.referenced_column_name as primary_key, fk.column_name as `column`, fk.constraint_name as name, rc.update_rule as on_update, rc.delete_rule as on_delete from information_schema.referential_constraints as rc join information_schema.key_column_usage as fk using (constraint_schema, constraint_name) where fk.referenced_column_name is not null and fk.table_schema = database() and fk.table_name = 't7_fk' and rc.constraint_schema = database() and rc.table_name = 't7_fk'"
assertMatches(t, conn, query, `[[VARCHAR("t7_xxhash") VARCHAR("uid") VARCHAR("t7_uid") VARCHAR("t7_fk_ibfk_1") VARCHAR("CASCADE") VARCHAR("SET NULL")]]`)
}

func TestConnectWithSystemSchema(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
for _, dbname := range []string{"information_schema", "mysql", "performance_schema", "sys"} {
connParams := vtParams
connParams.DbName = dbname
conn, err := mysql.Connect(ctx, &connParams)
require.NoError(t, err)
checkedExec(t, conn, `select @@max_allowed_packet from dual`)
conn.Close()
}
}

func TestUseSystemSchema(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()
for _, dbname := range []string{"information_schema", "mysql", "performance_schema", "sys"} {
checkedExec(t, conn, fmt.Sprintf("use %s", dbname))
checkedExec(t, conn, `select @@max_allowed_packet from dual`)
}
}

func TestSystemSchemaQueryWithoutQualifier(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

queryWithQualifier := fmt.Sprintf("select t.table_schema,t.table_name,c.column_name,c.column_type "+
"from information_schema.tables t "+
"join information_schema.columns c "+
"on c.table_schema = t.table_schema and c.table_name = t.table_name "+
"where t.table_schema = '%s' and c.table_schema = '%s' "+
"order by t.table_schema,t.table_name,c.column_name", KeyspaceName, KeyspaceName)
qr1 := checkedExec(t, conn, queryWithQualifier)

checkedExec(t, conn, "use information_schema")
queryWithoutQualifier := fmt.Sprintf("select t.table_schema,t.table_name,c.column_name,c.column_type "+
"from tables t "+
"join columns c "+
"on c.table_schema = t.table_schema and c.table_name = t.table_name "+
"where t.table_schema = '%s' and c.table_schema = '%s' "+
"order by t.table_schema,t.table_name,c.column_name", KeyspaceName, KeyspaceName)
qr2 := checkedExec(t, conn, queryWithoutQualifier)
require.Equal(t, qr1, qr2)

connParams := vtParams
connParams.DbName = "information_schema"
conn2, err := mysql.Connect(ctx, &connParams)
require.NoError(t, err)
defer conn2.Close()

qr3 := checkedExec(t, conn2, queryWithoutQualifier)
require.Equal(t, qr2, qr3)
}

func TestMultipleSchemaPredicates(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

query := fmt.Sprintf("select t.table_schema,t.table_name,c.column_name,c.column_type "+
"from information_schema.tables t "+
"join information_schema.columns c "+
"on c.table_schema = t.table_schema and c.table_name = t.table_name "+
"where t.table_schema = '%s' and c.table_schema = '%s' and c.table_schema = '%s' and c.table_schema = '%s'", KeyspaceName, KeyspaceName, KeyspaceName, KeyspaceName)
qr1 := checkedExec(t, conn, query)
require.EqualValues(t, 4, len(qr1.Fields))

// test a query with two keyspace names
query = fmt.Sprintf("select t.table_schema,t.table_name,c.column_name,c.column_type "+
"from information_schema.tables t "+
"join information_schema.columns c "+
"on c.table_schema = t.table_schema and c.table_name = t.table_name "+
"where t.table_schema = '%s' and c.table_schema = '%s' and c.table_schema = '%s'", KeyspaceName, KeyspaceName, "a")
_, err = conn.ExecuteFetch(query, 1000, true)
require.Error(t, err)
require.Contains(t, err.Error(), "specifying two different database in the query is not supported")
}
32 changes: 32 additions & 0 deletions go/test/endtoend/vtgate/mysql80/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package vtgate

import (
"context"
"fmt"
"testing"

"github.com/google/go-cmp/cmp"

"vitess.io/vitess/go/test/endtoend/cluster"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -71,6 +74,35 @@ ts12 TIMESTAMP DEFAULT LOCALTIME()
exec(t, conn, "drop table function_default")
}

// TestCheckConstraint test check constraints on CREATE TABLE
// This feature is supported from MySQL 8.0.16 and MariaDB 10.2.1.
func TestCheckConstraint(t *testing.T) {
conn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)
defer conn.Close()

query := `CREATE TABLE t7 (CHECK (c1 <> c2), c1 INT CHECK (c1 > 10), c2 INT CONSTRAINT c2_positive CHECK (c2 > 0), c3 INT CHECK (c3 < 100), CONSTRAINT c1_nonzero CHECK (c1 <> 0), CHECK (c1 > c3));`
exec(t, conn, query)

checkQuery := `SELECT CONSTRAINT_NAME FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS WHERE TABLE_NAME = 't7' order by CONSTRAINT_NAME;`
expected := `[[VARCHAR("c1_nonzero")] [VARCHAR("c2_positive")] [VARCHAR("t7_chk_1")] [VARCHAR("t7_chk_2")] [VARCHAR("t7_chk_3")] [VARCHAR("t7_chk_4")]]`

assertMatches(t, conn, checkQuery, expected)

cleanup := `DROP TABLE t7`
exec(t, conn, cleanup)
}

func assertMatches(t *testing.T, conn *mysql.Conn, query, expected string) {
t.Helper()
qr := exec(t, conn, query)
got := fmt.Sprintf("%v", qr.Rows)
diff := cmp.Diff(expected, got)
if diff != "" {
t.Errorf("Query: %s (-want +got):\n%s", query, diff)
}
}

func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
t.Helper()
qr, err := conn.ExecuteFetch(query, 1000, true)
Expand Down
30 changes: 5 additions & 25 deletions go/test/endtoend/vtgate/system_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,6 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
)

// TestCheckConstraint test check constraints on CREATE TABLE
// This feature is supported from MySQL 8.0.16 and MariaDB 10.2.1.
func TestCheckConstraint(t *testing.T) {
// Skipping as tests are run against MySQL 5.7
t.Skip()

conn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)
defer conn.Close()

query := `CREATE TABLE t7 (CHECK (c1 <> c2), c1 INT CHECK (c1 > 10), c2 INT CONSTRAINT c2_positive CHECK (c2 > 0), c3 INT CHECK (c3 < 100), CONSTRAINT c1_nonzero CHECK (c1 <> 0), CHECK (c1 > c3));`
exec(t, conn, query)

checkQuery := `SELECT CONSTRAINT_NAME FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS WHERE TABLE_NAME = 't7';`
expected := `[[VARCHAR("t7_chk_1")] [VARCHAR("t7_chk_2")] [VARCHAR("c2_positive")] [VARCHAR("t7_chk_3")] [VARCHAR("c1_nonzero")] [VARCHAR("t7_chk_4")]]`

assertMatches(t, conn, checkQuery, expected)

cleanup := `DROP TABLE t7`
exec(t, conn, cleanup)
}

func TestDbNameOverride(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
Expand Down Expand Up @@ -169,15 +147,17 @@ func TestSystemSchemaQueryWithoutQualifier(t *testing.T) {
"from information_schema.tables t "+
"join information_schema.columns c "+
"on c.table_schema = t.table_schema and c.table_name = t.table_name "+
"where t.table_schema = '%s' and c.table_schema = '%s'", KeyspaceName, KeyspaceName)
"where t.table_schema = '%s' and c.table_schema = '%s' "+
"order by t.table_schema,t.table_name,c.column_name", KeyspaceName, KeyspaceName)
qr1 := exec(t, conn, queryWithQualifier)

exec(t, conn, "use information_schema")
queryWithoutQualifier := fmt.Sprintf("select t.table_schema,t.table_name,c.column_name,c.column_type "+
"from tables t "+
"join columns c "+
"on c.table_schema = t.table_schema and c.table_name = t.table_name "+
"where t.table_schema = '%s' and c.table_schema = '%s'", KeyspaceName, KeyspaceName)
exec(t, conn, "use information_schema")
"where t.table_schema = '%s' and c.table_schema = '%s' "+
"order by t.table_schema,t.table_name,c.column_name", KeyspaceName, KeyspaceName)
qr2 := exec(t, conn, queryWithoutQualifier)
require.Equal(t, qr1, qr2)

Expand Down
24 changes: 17 additions & 7 deletions go/vt/vtgate/engine/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/fuzz_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func execRouteSelectDBA(query, field, tablename, schema, shards string) {
Query: query,
FieldQuery: field,
SysTableTableSchema: stringToExpr(schema),
SysTableTableName: stringToExpr(tablename),
SysTableTableName: map[string]evalengine.Expr{"table_name": evalengine.NewLiteralString([]byte(tablename))},
}
vc := &loggingVCursor{
shards: []string{shards},
Expand Down
Loading

0 comments on commit b472ac6

Please sign in to comment.