Skip to content

Commit

Permalink
Add automatic read only handling for queries
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed May 22, 2022
1 parent 4ee3a6f commit a03de1f
Show file tree
Hide file tree
Showing 23 changed files with 97 additions and 64 deletions.
6 changes: 3 additions & 3 deletions config/init_db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
# these commands. Note that disabling it does NOT disable read_only.
# We save the current value so that we only re-enable it at the end if it was
# enabled before.
SET @original_super_read_only=@@global.super_read_only;
SET GLOBAL super_read_only=OFF;
SET @original_super_read_only=IF(@@global.super_read_only=1, 'ON', 'OFF');
SET GLOBAL super_read_only='OFF';

# Changes during the init db should not make it to the binlog.
# They could potentially create errant transactions on replicas.
Expand Down Expand Up @@ -109,4 +109,4 @@ RESET SLAVE ALL;
RESET MASTER;

# We need to set super_read_only back to what it was before
SET @@global.super_read_only=@original_super_read_only;
SET GLOBAL super_read_only=IFNULL(@original_super_read_only, 'OFF');
8 changes: 4 additions & 4 deletions go/mysql/endtoend/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func doTestMultiResult(t *testing.T, disableClientDeprecateEOF bool) {
expectNoError(t, err)
defer conn.Close()

qr, more, err := conn.ExecuteFetchMulti("select 1 from dual; set autocommit=1; select 1 from dual", 10, true)
qr, more, err := conn.ExecuteFetchMulti("select 1 from dual; set autocommit=1; select 1 from dual", 10, true, false)
expectNoError(t, err)
expectFlag(t, "ExecuteMultiFetch(multi result)", more, true)
assert.EqualValues(t, 1, len(qr.Rows))
Expand All @@ -176,12 +176,12 @@ func doTestMultiResult(t *testing.T, disableClientDeprecateEOF bool) {
expectFlag(t, "ReadQueryResult(2)", more, false)
assert.EqualValues(t, 1, len(qr.Rows))

qr, more, err = conn.ExecuteFetchMulti("select 1 from dual", 10, true)
qr, more, err = conn.ExecuteFetchMulti("select 1 from dual", 10, true, false)
expectNoError(t, err)
expectFlag(t, "ExecuteMultiFetch(single result)", more, false)
assert.EqualValues(t, 1, len(qr.Rows))

qr, more, err = conn.ExecuteFetchMulti("set autocommit=1", 10, true)
qr, more, err = conn.ExecuteFetchMulti("set autocommit=1", 10, true, false)
expectNoError(t, err)
expectFlag(t, "ExecuteMultiFetch(no result)", more, false)
assert.EqualValues(t, 0, len(qr.Rows))
Expand All @@ -208,7 +208,7 @@ func doTestMultiResult(t *testing.T, disableClientDeprecateEOF bool) {
assert.EqualValues(t, 1, result.RowsAffected, "insert into returned RowsAffected")
}

qr, more, err = conn.ExecuteFetchMulti("update a set name = concat(name, ' updated'); select * from a; select count(*) from a", 300, true)
qr, more, err = conn.ExecuteFetchMulti("update a set name = concat(name, ' updated'); select * from a; select count(*) from a", 300, true, true)
expectNoError(t, err)
expectFlag(t, "ExecuteMultiFetch(multi result)", more, true)
assert.EqualValues(t, 255, qr.RowsAffected)
Expand Down
8 changes: 4 additions & 4 deletions go/mysql/endtoend/query_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func BenchmarkSetVarsMultipleSetsInSameStmt(b *testing.B) {
for _, sleepDuration := range []time.Duration{0, 1 * time.Millisecond} {
b.Run(fmt.Sprintf("Sleep %d ms", sleepDuration/time.Millisecond), func(b *testing.B) {
for i := 0; i < b.N; i++ {
_, _, err := conn.ExecuteFetchMulti(fmt.Sprintf("set sql_mode = '', sql_safe_updates = 0 ; insert into t(id) values (%d)", i), 1, false)
_, _, err := conn.ExecuteFetchMulti(fmt.Sprintf("set sql_mode = '', sql_safe_updates = 0 ; insert into t(id) values (%d)", i), 1, false, true)
if err != nil {
b.Fatal(err)
}
Expand All @@ -252,7 +252,7 @@ func BenchmarkSetVarsMultipleSetsInSameStmt(b *testing.B) {
b.Fatal(err)
}

_, _, err = conn.ExecuteFetchMulti(fmt.Sprintf("set sql_mode = '', sql_safe_updates = 0 ; select * from t where id = %d", i), 1, false)
_, _, err = conn.ExecuteFetchMulti(fmt.Sprintf("set sql_mode = '', sql_safe_updates = 0 ; select * from t where id = %d", i), 1, false, false)
if err != nil {
b.Fatal(err)
}
Expand All @@ -261,7 +261,7 @@ func BenchmarkSetVarsMultipleSetsInSameStmt(b *testing.B) {
b.Fatal(err)
}

_, _, err = conn.ExecuteFetchMulti(fmt.Sprintf("set sql_mode = '', sql_safe_updates = 0 ; update t set name = 'foo' where id = %d", i), 1, false)
_, _, err = conn.ExecuteFetchMulti(fmt.Sprintf("set sql_mode = '', sql_safe_updates = 0 ; update t set name = 'foo' where id = %d", i), 1, false, true)
if err != nil {
b.Fatal(err)
}
Expand All @@ -270,7 +270,7 @@ func BenchmarkSetVarsMultipleSetsInSameStmt(b *testing.B) {
b.Fatal(err)
}

_, _, err = conn.ExecuteFetchMulti(fmt.Sprintf("set sql_mode = '', sql_safe_updates = 0 ; delete from t where id = %d", i), 1, false)
_, _, err = conn.ExecuteFetchMulti(fmt.Sprintf("set sql_mode = '', sql_safe_updates = 0 ; delete from t where id = %d", i), 1, false, true)
if err != nil {
b.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions go/mysql/endtoend/schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ func TestChangeSchemaIsNoticed(t *testing.T) {
require.NoError(t, err)
defer conn.Close()

_, err = conn.ExecuteFetch(createDb, 1000, true)
_, err = conn.ExecuteFetchWithReadOnlyHandling(createDb, 1000, true)
require.NoError(t, err)
_, err = conn.ExecuteFetch(mysql.CreateSchemaCopyTable, 1000, true)
_, err = conn.ExecuteFetchWithReadOnlyHandling(mysql.CreateSchemaCopyTable, 1000, true)
require.NoError(t, err)

tests := []struct {
Expand Down
34 changes: 32 additions & 2 deletions go/mysql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,26 @@ func (c *Conn) parseRow(data []byte, fields []*querypb.Field, reader func([]byte
// 2. if the server closes the connection when a command is in flight,
// readComQueryResponse will fail, and we'll return CRServerLost(2013).
func (c *Conn) ExecuteFetch(query string, maxrows int, wantfields bool) (result *sqltypes.Result, err error) {
result, _, err = c.ExecuteFetchMulti(query, maxrows, wantfields)
result, _, err = c.ExecuteFetchMulti(query, maxrows, wantfields, false)
return result, err
}

// ExecuteFetchWithReadOnlyHandling should be used if you are executing a write query
// on a tablet that may NOT be a primary and you want to execute it regardless of
// tablet type. This function will temporarily make the mysql instance read-write and
// re-enable read-only mode after the query is executed if needed.
func (c *Conn) ExecuteFetchWithReadOnlyHandling(query string, maxrows int, wantfields bool) (result *sqltypes.Result, err error) {
result, _, err = c.ExecuteFetchMulti(query, maxrows, wantfields, true)
return result, err
}

// ExecuteFetchMulti is for fetching multiple results from a multi-statement result.
// It returns an additional 'more' flag. If it is set, you must fetch the additional
// results using ReadQueryResult.
func (c *Conn) ExecuteFetchMulti(query string, maxrows int, wantfields bool) (result *sqltypes.Result, more bool, err error) {
//
// Pass disableReadOnly as true if you are executing a write on a tablet/connection
// that may NOT be a primary and you want to execute it regardless of tablet type.
func (c *Conn) ExecuteFetchMulti(query string, maxrows int, wantfields bool, disableReadOnly bool) (result *sqltypes.Result, more bool, err error) {
defer func() {
if err != nil {
if sqlerr, ok := err.(*SQLError); ok {
Expand All @@ -316,6 +328,24 @@ func (c *Conn) ExecuteFetchMulti(query string, maxrows int, wantfields bool) (re
}
}()

// Note: MariaDB does not have super_read_only but support for it is EOL in v14.0+
if disableReadOnly && !c.IsMariaDB() {
var res *sqltypes.Result
if err = c.WriteComQuery("SELECT @@global.super_read_only"); err != nil {
return nil, false, err
}
res, _, _, err := c.ReadQueryResult(maxrows, wantfields)
if err == nil && len(res.Rows) == 1 {
sro := res.Rows[0][0].ToString()
if sro == "1" || sro == "ON" {
defer c.WriteComQuery("SET @@global.super_read_only='ON'")
if err = c.WriteComQuery("SET @@global.super_read_only='OFF'"); err != nil {
return nil, false, err
}
}
}
}

// Send the query as a COM_QUERY packet.
if err = c.WriteComQuery(query); err != nil {
return nil, false, err
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func executeQuery(dbConn *mysql.Conn, query string) (*sqltypes.Result, error) {
retryDelay := 1 * time.Second
for i := 1; i <= retries; i++ {
log.Infof("Executing query %s on %s (attempt %d of %d)", query, i, retries)
result, err = dbConn.ExecuteFetch(query, 10000, true)
result, err = dbConn.ExecuteFetchWithReadOnlyHandling(query, 10000, true)
if err == nil {
break
}
Expand Down
2 changes: 0 additions & 2 deletions go/test/endtoend/recovery/pitr/shardedpitr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,13 +426,11 @@ func initializeCluster(t *testing.T) {
}

queryCmds := []string{
"SET GLOBAL super_read_only=OFF;",
fmt.Sprintf("CREATE USER '%s'@'%%' IDENTIFIED BY '%s';", mysqlUserName, mysqlPassword),
fmt.Sprintf("GRANT ALL ON *.* TO '%s'@'%%';", mysqlUserName),
fmt.Sprintf("GRANT GRANT OPTION ON *.* TO '%s'@'%%';", mysqlUserName),
fmt.Sprintf("create database %s;", "vt_ks"),
"FLUSH PRIVILEGES;",
"SET GLOBAL super_read_only=ON;",
}

for _, tablet := range []*cluster.Vttablet{primary, replica, shard0Primary, shard0Replica, shard1Primary, shard1Replica} {
Expand Down
2 changes: 0 additions & 2 deletions go/test/endtoend/recovery/pitrtls/shardedpitr_tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,11 @@ func initializeCluster(t *testing.T) {
}

queryCmds := []string{
"SET GLOBAL super_read_only=OFF;",
fmt.Sprintf("CREATE USER '%s'@'%%' IDENTIFIED BY '%s';", mysqlUserName, mysqlPassword),
fmt.Sprintf("GRANT ALL ON *.* TO '%s'@'%%';", mysqlUserName),
fmt.Sprintf("GRANT GRANT OPTION ON *.* TO '%s'@'%%';", mysqlUserName),
fmt.Sprintf("create database %s;", "vt_ks"),
"FLUSH PRIVILEGES;",
"SET GLOBAL super_read_only=ON;",
}

for _, tablet := range []*cluster.Vttablet{primary, replica, shard0Primary, shard0Replica, shard1Primary, shard1Replica} {
Expand Down
13 changes: 3 additions & 10 deletions go/test/endtoend/tabletmanager/tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/cluster"
Expand All @@ -42,17 +41,11 @@ func TestEnsureDB(t *testing.T) {
err = clusterInstance.StartVttablet(tablet, "NOT_SERVING", false, cell, "dbtest", hostname, "0")
require.NoError(t, err)

// Make it the primary.
// Make it the primary, which should create the DB
err = clusterInstance.VtctlclientProcess.ExecuteCommand("TabletExternallyReparented", tablet.Alias)
require.EqualError(t, err, "exit status 1")

// It is still NOT_SERVING because the db is read-only.
assert.Equal(t, "NOT_SERVING", tablet.VttabletProcess.GetTabletStatus())
status := tablet.VttabletProcess.GetStatusDetails()
assert.Contains(t, status, "read-only")
require.NoError(t, err)

// Switch to read-write and verify that that we go serving.
_ = clusterInstance.VtctlclientProcess.ExecuteCommand("SetReadWrite", tablet.Alias)
// verify that that we are serving
err = tablet.VttabletProcess.WaitForTabletStatus("SERVING")
require.NoError(t, err)
killTablets(t, tablet)
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/topotest/consul/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func execute(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
t.Helper()
var res []*sqltypes.Result
qr, more, err := conn.ExecuteFetchMulti(query, 1000, true)
qr, more, err := conn.ExecuteFetchMulti(query, 1000, true, true)
res = append(res, qr)
require.NoError(t, err)
for more == true {
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/topotest/etcd2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestTopoDownServingQuery(t *testing.T) {
func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
t.Helper()
var res []*sqltypes.Result
qr, more, err := conn.ExecuteFetchMulti(query, 1000, true)
qr, more, err := conn.ExecuteFetchMulti(query, 1000, true, true)
res = append(res, qr)
require.NoError(t, err)
for more == true {
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/topotest/zk2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestTopoDownServingQuery(t *testing.T) {
func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
t.Helper()
var res []*sqltypes.Result
qr, more, err := conn.ExecuteFetchMulti(query, 1000, true)
qr, more, err := conn.ExecuteFetchMulti(query, 1000, true, true)
res = append(res, qr)
require.NoError(t, err)
for more == true {
Expand Down
7 changes: 6 additions & 1 deletion go/test/endtoend/utils/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,12 @@ func createInitSQLFile(mysqlDir, ksName string) (string, error) {
}
defer f.Close()

_, err = f.WriteString(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s;", ksName))
sql := fmt.Sprintf(`
SET @original_super_read_only=IF(@@global.super_read_only=1, 'ON', 'OFF');
SET GLOBAL super_read_only='OFF';
CREATE DATABASE IF NOT EXISTS %s;
SET GLOBAL super_read_only=IFNULL(@original_super_read_only, 'OFF');`, ksName)
_, err = f.WriteString(sql)
if err != nil {
return "", err
}
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func AssertResultIsEmpty(t *testing.T, conn *mysql.Conn, pre string) {
// The test fails if the query produces an error.
func Exec(t testing.TB, conn *mysql.Conn, query string) *sqltypes.Result {
t.Helper()
qr, err := conn.ExecuteFetch(query, 1000, true)
qr, err := conn.ExecuteFetchWithReadOnlyHandling(query, 1000, true)
require.NoError(t, err, "for query: "+query)
return qr
}
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vtgate/reservedconn/udv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func TestMysqlDumpInitialLog(t *testing.T) {

for _, query := range queries {
t.Run(query, func(t *testing.T) {
_, more, err := conn.ExecuteFetchMulti(query, 1000, true)
_, more, err := conn.ExecuteFetchMulti(query, 1000, true, true)
require.NoError(t, err)
require.False(t, more)
})
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vtgate/unsharded/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func TestRowCountExceeded(t *testing.T) {
func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
t.Helper()
var res []*sqltypes.Result
qr, more, err := conn.ExecuteFetchMulti(query, 1000, true)
qr, more, err := conn.ExecuteFetchMulti(query, 1000, true, true)
res = append(res, qr)
require.NoError(t, err)
for more == true {
Expand Down
18 changes: 9 additions & 9 deletions go/vt/mysqlctl/metadata_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (m *MetadataManager) UpsertLocalMetadata(mysqld MysqlDaemon, localMetadata
}

func createMetadataTables(conn *dbconnpool.DBConnection, dbName string) error {
if _, err := conn.ExecuteFetch("CREATE DATABASE IF NOT EXISTS _vt", 0, false); err != nil {
if _, err := conn.ExecuteFetchWithReadOnlyHandling("CREATE DATABASE IF NOT EXISTS _vt", 0, false); err != nil {
return err
}

Expand All @@ -147,12 +147,12 @@ func createMetadataTables(conn *dbconnpool.DBConnection, dbName string) error {
}

func createLocalMetadataTable(conn *dbconnpool.DBConnection, dbName string) error {
if _, err := conn.ExecuteFetch(sqlCreateLocalMetadataTable, 0, false); err != nil {
if _, err := conn.ExecuteFetchWithReadOnlyHandling(sqlCreateLocalMetadataTable, 0, false); err != nil {
return err
}

for _, sql := range sqlAlterLocalMetadataTable {
if _, err := conn.ExecuteFetch(sql, 0, false); err != nil {
if _, err := conn.ExecuteFetchWithReadOnlyHandling(sql, 0, false); err != nil {
// Ignore "Duplicate column name 'db_name'" errors which can happen on every restart.
if merr, ok := err.(*mysql.SQLError); !ok || merr.Num != mysql.ERDupFieldName {
log.Errorf("Error executing %v: %v", sql, err)
Expand All @@ -162,20 +162,20 @@ func createLocalMetadataTable(conn *dbconnpool.DBConnection, dbName string) erro
}

sql := fmt.Sprintf(sqlUpdateLocalMetadataTable, dbName)
if _, err := conn.ExecuteFetch(sql, 0, false); err != nil {
if _, err := conn.ExecuteFetchWithReadOnlyHandling(sql, 0, false); err != nil {
log.Errorf("Error executing %v: %v, continuing. Please check the data in _vt.local_metadata and take corrective action.", sql, err)
}

return nil
}

func createShardMetadataTable(conn *dbconnpool.DBConnection, dbName string) error {
if _, err := conn.ExecuteFetch(sqlCreateShardMetadataTable, 0, false); err != nil {
if _, err := conn.ExecuteFetchWithReadOnlyHandling(sqlCreateShardMetadataTable, 0, false); err != nil {
return err
}

for _, sql := range sqlAlterShardMetadataTable {
if _, err := conn.ExecuteFetch(sql, 0, false); err != nil {
if _, err := conn.ExecuteFetchWithReadOnlyHandling(sql, 0, false); err != nil {
// Ignore "Duplicate column name 'db_name'" errors which can happen on every restart.
if merr, ok := err.(*mysql.SQLError); !ok || merr.Num != mysql.ERDupFieldName {
log.Errorf("Error executing %v: %v", sql, err)
Expand All @@ -185,7 +185,7 @@ func createShardMetadataTable(conn *dbconnpool.DBConnection, dbName string) erro
}

sql := fmt.Sprintf(sqlUpdateShardMetadataTable, dbName)
if _, err := conn.ExecuteFetch(sql, 0, false); err != nil {
if _, err := conn.ExecuteFetchWithReadOnlyHandling(sql, 0, false); err != nil {
log.Errorf("Error executing %v: %v, continuing. Please check the data in _vt.shard_metadata and take corrective action.", sql, err)
}

Expand Down Expand Up @@ -221,12 +221,12 @@ func upsertLocalMetadata(conn *dbconnpool.DBConnection, localMetadata map[string
queryBuf.WriteString(") ON DUPLICATE KEY UPDATE value = ")
valValue.EncodeSQL(&queryBuf)

if _, err := conn.ExecuteFetch(queryBuf.String(), 0, false); err != nil {
if _, err := conn.ExecuteFetchWithReadOnlyHandling(queryBuf.String(), 0, false); err != nil {
return err
}
}

if _, err := conn.ExecuteFetch("COMMIT", 0, false); err != nil {
if _, err := conn.ExecuteFetchWithReadOnlyHandling("COMMIT", 0, false); err != nil {
return err
}

Expand Down
Loading

0 comments on commit a03de1f

Please sign in to comment.