Skip to content

Commit

Permalink
Merge pull request #6459 from planetscale/sysset-reserveconn
Browse files Browse the repository at this point in the history
System Variables support using Reserved Connections
  • Loading branch information
harshit-gangal authored Jul 24, 2020
2 parents b9b5e0b + e63dcb7 commit 68f6713
Show file tree
Hide file tree
Showing 25 changed files with 748 additions and 171 deletions.
7 changes: 3 additions & 4 deletions go/mysql/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,11 +560,10 @@ var CharacterSetMap = map[string]uint8{

// IsNum returns true if a MySQL type is a numeric value.
// It is the same as IS_NUM defined in mysql.h.
//
// FIXME(alainjobart) This needs to use the constants in
// replication/constants.go, so we are using numerical values here.
func IsNum(typ uint8) bool {
return ((typ <= 9 /* MYSQL_TYPE_INT24 */ && typ != 7 /* MYSQL_TYPE_TIMESTAMP */) || typ == 13 /* MYSQL_TYPE_YEAR */ || typ == 246 /* MYSQL_TYPE_NEWDECIMAL */)
return (typ <= TypeInt24 && typ != TypeTimestamp) ||
typ == TypeYear ||
typ == TypeNewDecimal
}

// IsConnErr returns true if the error is a connection error.
Expand Down
9 changes: 9 additions & 0 deletions go/test/endtoend/vtgate/setstatement/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"os"
"testing"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
Expand Down Expand Up @@ -131,3 +133,10 @@ func exec(t *testing.T, conn *mysql.Conn, query string) (*sqltypes.Result, error
t.Helper()
return conn.ExecuteFetch(query, 1000, true)
}

func checkedExec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
t.Helper()
qr, err := conn.ExecuteFetch(query, 1000, true)
require.NoError(t, err)
return qr
}
201 changes: 154 additions & 47 deletions go/test/endtoend/vtgate/setstatement/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,11 @@ func TestCharsetIntro(t *testing.T) {
require.NoError(t, err)
defer conn.Close()

_, err = exec(t, conn, "delete from test")
require.NoError(t, err)
_, err = exec(t, conn, "insert into test (id,val1) values (666, _binary'abc')")
require.NoError(t, err)
_, err = exec(t, conn, "update test set val1 = _latin1'xyz' where id = 666")
require.NoError(t, err)
_, err = exec(t, conn, "delete from test where val1 = _utf8'xyz'")
require.NoError(t, err)
qr, err := exec(t, conn, "select id from test where val1 = _utf8mb4'xyz'")
require.NoError(t, err)
checkedExec(t, conn, "delete from test")
checkedExec(t, conn, "insert into test (id,val1) values (666, _binary'abc')")
checkedExec(t, conn, "update test set val1 = _latin1'xyz' where id = 666")
checkedExec(t, conn, "delete from test where val1 = _utf8'xyz'")
qr := checkedExec(t, conn, "select id from test where val1 = _utf8mb4'xyz'")
require.EqualValues(t, 0, qr.RowsAffected)
}

Expand All @@ -60,57 +55,169 @@ func TestSetSysVar(t *testing.T) {
Port: clusterInstance.VtgateMySQLPort,
}
type queriesWithExpectations struct {
query string
expectedRows string
rowsAffected int
errMsg string
expectedWarning string
name, expr, expected string
}

queries := []queriesWithExpectations{{
query: `set @@default_storage_engine = INNODB`,
expectedRows: ``, rowsAffected: 0,
expectedWarning: "[[VARCHAR(\"Warning\") UINT16(1235) VARCHAR(\"Ignored inapplicable SET default_storage_engine = INNODB\")]]",
name: "default_storage_engine",
expr: "INNODB",
expected: `[[VARCHAR("InnoDB")]]`,
}, {
query: `set @@sql_mode = @@sql_mode`,
expectedRows: ``, rowsAffected: 0,
name: "sql_mode",
expr: "''",
expected: `[[VARCHAR("")]]`,
}, {
query: `set @@sql_mode = concat(@@sql_mode,"")`,
expectedRows: ``, rowsAffected: 0,
name: "sql_mode",
expr: `concat(@@sql_mode,"NO_ZERO_DATE")`,
expected: `[[VARCHAR("NO_ZERO_DATE")]]`,
}, {
query: `set @@sql_mode = concat(@@sql_mode,"ALLOW_INVALID_DATES")`,
expectedWarning: "[[VARCHAR(\"Warning\") UINT16(1235) VARCHAR(\"Modification not allowed using set construct for: sql_mode\")]]",
name: "sql_mode",
expr: "@@sql_mode",
expected: `[[VARCHAR("NO_ZERO_DATE")]]`,
}, {
query: `set @@SQL_SAFE_UPDATES = 1`,
expectedRows: ``, rowsAffected: 0,
name: "SQL_SAFE_UPDATES",
expr: "1",
expected: "[[INT64(1)]]",
}}

conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

for i, q := range queries {
t.Run(fmt.Sprintf("%d-%s", i, q.query), func(t *testing.T) {
qr, err := exec(t, conn, q.query)
if q.errMsg != "" {
require.Contains(t, err.Error(), q.errMsg)
} else {
require.NoError(t, err)
require.Equal(t, uint64(q.rowsAffected), qr.RowsAffected, "rows affected wrong for query: %s", q.query)
if q.expectedRows != "" {
result := fmt.Sprintf("%v", qr.Rows)
if diff := cmp.Diff(q.expectedRows, result); diff != "" {
t.Errorf("%s\nfor query: %s", diff, q.query)
}
}
if q.expectedWarning != "" {
qr, err := exec(t, conn, "show warnings")
require.NoError(t, err)
if got, want := fmt.Sprintf("%v", qr.Rows), q.expectedWarning; got != want {
t.Errorf("select:\n%v want\n%v", got, want)
}
}
}
query := fmt.Sprintf("set %s = %s", q.name, q.expr)
t.Run(fmt.Sprintf("%d-%s", i, query), func(t *testing.T) {
_, err := exec(t, conn, query)
require.NoError(t, err)
assertMatches(t, conn, fmt.Sprintf("select @@%s", q.name), q.expected)
})
}
}

func TestSetSystemVariable(t *testing.T) {
vtParams := mysql.ConnParams{
Host: "localhost",
Port: clusterInstance.VtgateMySQLPort,
}
conn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)
defer conn.Close()

checkedExec(t, conn, "set @@sql_mode = 'NO_ZERO_DATE'")
q := `select str_to_date('00/00/0000', '%m/%d/%Y')`
assertMatches(t, conn, q, `[[NULL]]`)

assertMatches(t, conn, "select @@sql_mode", `[[VARCHAR("NO_ZERO_DATE")]]`)
checkedExec(t, conn, "set @@sql_mode = ''")

assertMatches(t, conn, q, `[[DATE("0000-00-00")]]`)

checkedExec(t, conn, "SET @@SESSION.sql_mode = CONCAT(CONCAT(@@sql_mode, ',STRICT_ALL_TABLES'), ',NO_AUTO_VALUE_ON_ZERO'), @@SESSION.sql_auto_is_null = 0, @@SESSION.wait_timeout = 2147483")
assertMatches(t, conn, "select @@sql_mode", `[[VARCHAR("NO_AUTO_VALUE_ON_ZERO,STRICT_ALL_TABLES")]]`)
}

func TestSetSystemVarWithTxFailure(t *testing.T) {
vtParams := mysql.ConnParams{
Host: "localhost",
Port: clusterInstance.VtgateMySQLPort,
}

conn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)
defer conn.Close()

checkedExec(t, conn, "insert into test (id, val1) values (80, null)")

// before changing any settings, let's confirm sql_safe_updates value
assertMatches(t, conn, `select @@sql_safe_updates from test where id = 80`, `[[INT64(0)]]`)

checkedExec(t, conn, "set sql_safe_updates = 1")
checkedExec(t, conn, "begin")

qr := checkedExec(t, conn, "select connection_id() from test where id = 80")

// kill the mysql connection shard which has transaction open.
vttablet1 := clusterInstance.Keyspaces[0].Shards[0].MasterTablet() // 80-
vttablet1.VttabletProcess.QueryTablet(fmt.Sprintf("kill %s", qr.Rows[0][0].ToString()), keyspaceName, false)

// transaction fails on commit - we should no longer be in a transaction
_, err = conn.ExecuteFetch("commit", 1, true)
require.Error(t, err)

// we still want to have our system setting applied
assertMatches(t, conn, `select @@sql_safe_updates`, `[[INT64(1)]]`)
}

func TestSetSystemVarWithConnectionFailure(t *testing.T) {
t.Skip("failing at the moment")
vtParams := mysql.ConnParams{
Host: "localhost",
Port: clusterInstance.VtgateMySQLPort,
}

conn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)
defer conn.Close()
checkedExec(t, conn, "delete from test")

checkedExec(t, conn, "insert into test (id, val1) values (80, null)")
checkedExec(t, conn, "set sql_safe_updates = 1")
qr := checkedExec(t, conn, "select connection_id() from test where id = 80")

// kill the mysql connection shard which has transaction open.
vttablet1 := clusterInstance.Keyspaces[0].Shards[0].MasterTablet() // 80-
vttablet1.VttabletProcess.QueryTablet(fmt.Sprintf("kill %s", qr.Rows[0][0].ToString()), keyspaceName, false)

// we still want to have our system setting applied
_, err = exec(t, conn, `select @@sql_safe_updates from test where id = 80`)
require.NoError(t, err)
}

func TestSetSystemVariableAndThenSuccessfulTx(t *testing.T) {
vtParams := mysql.ConnParams{
Host: "localhost",
Port: clusterInstance.VtgateMySQLPort,
}

conn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)
defer conn.Close()
checkedExec(t, conn, "delete from test")

checkedExec(t, conn, "set sql_safe_updates = 1")
checkedExec(t, conn, "begin")
checkedExec(t, conn, "insert into test (id, val1) values (80, null)")
checkedExec(t, conn, "commit")
assertMatches(t, conn, "select id, val1 from test", "[[INT64(80) NULL]]")
assertMatches(t, conn, "select @@sql_safe_updates", "[[INT64(1)]]")
}

func TestStartTxAndSetSystemVariableAndThenSuccessfulCommit(t *testing.T) {
vtParams := mysql.ConnParams{
Host: "localhost",
Port: clusterInstance.VtgateMySQLPort,
}

conn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)
defer conn.Close()
checkedExec(t, conn, "delete from test")

checkedExec(t, conn, "begin")
checkedExec(t, conn, "set sql_safe_updates = 1")
checkedExec(t, conn, "insert into test (id, val1) values (54, null)")
checkedExec(t, conn, "commit")
assertMatches(t, conn, "select id, val1 from test", "[[INT64(54) NULL]]")
assertMatches(t, conn, "select @@sql_safe_updates", "[[INT64(1)]]")
}

func assertMatches(t *testing.T, conn *mysql.Conn, query, expected string) {
t.Helper()
qr, err := exec(t, conn, query)
require.NoError(t, err)
got := fmt.Sprintf("%v", qr.Rows)
diff := cmp.Diff(expected, got)
if diff != "" {
t.Errorf("Query: %s (-want +got):\n%s", query, diff)
}
}
3 changes: 3 additions & 0 deletions go/vt/sqlparser/normalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ func newNormalizer(stmt Statement, bindVars map[string]*querypb.BindVariable, pr
// where variables are deduped.
func (nz *normalizer) WalkStatement(node SQLNode) (bool, error) {
switch node := node.(type) {
// no need to normalize the statement types
case *Set, *Show, *Begin, *Commit, *Rollback, *Savepoint, *SetTransaction, *DDL, *SRollback, *Release, *OtherAdmin, *OtherRead:
return false, nil
case *Select:
_ = Walk(nz.WalkSelect, node)
// Don't continue
Expand Down
18 changes: 18 additions & 0 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ func (t noopVCursor) SetSysVar(name string, expr string) {
//panic("implement me")
}

func (t noopVCursor) InReservedConn() bool {
panic("implement me")
}

func (t noopVCursor) ShardSession() []*srvtopo.ResolvedShard {
panic("implement me")
}

func (t noopVCursor) ExecuteVSchema(keyspace string, vschemaDDL *sqlparser.DDL) error {
panic("implement me")
}
Expand All @@ -76,6 +84,7 @@ func (t noopVCursor) Session() SessionActions {
func (t noopVCursor) SetTarget(target string) error {
panic("implement me")
}

func (t noopVCursor) Context() context.Context {
if t.ctx == nil {
return context.Background()
Expand Down Expand Up @@ -132,6 +141,7 @@ func (t noopVCursor) ResolveDestinations(keyspace string, ids []*querypb.Value,
}

var _ VCursor = (*loggingVCursor)(nil)

var _ SessionActions = (*loggingVCursor)(nil)

// loggingVCursor logs requests and allows you to verify
Expand Down Expand Up @@ -171,6 +181,14 @@ func (f *loggingVCursor) SetSysVar(name string, expr string) {
func (f *loggingVCursor) NeedsReservedConn() {
}

func (f *loggingVCursor) InReservedConn() bool {
panic("implement me")
}

func (f *loggingVCursor) ShardSession() []*srvtopo.ResolvedShard {
return nil
}

func (f *loggingVCursor) ExecuteVSchema(keyspace string, vschemaDDL *sqlparser.DDL) error {
panic("implement me")
}
Expand Down
8 changes: 7 additions & 1 deletion go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,14 @@ type (

SetSysVar(name string, expr string)

// NeedsReservedConn marks this session as needing a dedicated connection to mysql
// NeedsReservedConn marks this session as needing a dedicated connection to underlying database
NeedsReservedConn()

// InReservedConn provides whether this session is using reserved connection
InReservedConn() bool

// ShardSession returns shard info about open connections
ShardSession() []*srvtopo.ResolvedShard
}

// Plan represents the execution strategy for a given query.
Expand Down
Loading

0 comments on commit 68f6713

Please sign in to comment.