diff --git a/config/init_db.sql b/config/init_db.sql index 6bf2ac52023..125359a40e1 100644 --- a/config/init_db.sql +++ b/config/init_db.sql @@ -28,12 +28,14 @@ CREATE DATABASE IF NOT EXISTS _vt; CREATE TABLE IF NOT EXISTS _vt.local_metadata ( name VARCHAR(255) NOT NULL, value VARCHAR(255) NOT NULL, - PRIMARY KEY (name) + db_name VARBINARY(255) NOT NULL, + PRIMARY KEY (db_name, name) ) ENGINE=InnoDB; CREATE TABLE IF NOT EXISTS _vt.shard_metadata ( name VARCHAR(255) NOT NULL, value MEDIUMBLOB NOT NULL, - PRIMARY KEY (name) + db_name VARBINARY(255) NOT NULL, + PRIMARY KEY (db_name, name) ) ENGINE=InnoDB; # Admin user with all privileges. diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index 00e4ee7b65c..d778582f77c 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -503,8 +503,15 @@ func CreateVReplicationTable() []string { transaction_timestamp BIGINT(20) NOT NULL, state VARBINARY(100) NOT NULL, message VARBINARY(1000) DEFAULT NULL, + db_name VARBINARY(255) NOT NULL, PRIMARY KEY (id) -) ENGINE=InnoDB`} +) ENGINE=InnoDB`, + } +} + +// AlterVReplicationTable adds new columns to vreplication table +func AlterVReplicationTable() []string { + return []string{"ALTER TABLE _vt.vreplication ADD COLUMN db_name VARBINARY(255) NOT NULL"} } // SetVReplicationState updates the state in the _vt.vreplication table. @@ -558,19 +565,19 @@ func ReadVRSettings(dbClient DBClient, uid uint32) (VRSettings, error) { // CreateVReplication returns a statement to populate the first value into // the _vt.vreplication table. -func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, position string, maxTPS, maxReplicationLag, timeUpdated int64) string { +func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, position string, maxTPS, maxReplicationLag, timeUpdated int64, dbName string) string { return fmt.Sprintf("insert into _vt.vreplication "+ - "(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state) "+ - "values (%v, %v, %v, %v, %v, %v, 0, '%v')", - encodeString(workflow), encodeString(source.String()), encodeString(position), maxTPS, maxReplicationLag, timeUpdated, BlpRunning) + "(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name) "+ + "values (%v, %v, %v, %v, %v, %v, 0, '%v', %v)", + encodeString(workflow), encodeString(source.String()), encodeString(position), maxTPS, maxReplicationLag, timeUpdated, BlpRunning, encodeString(dbName)) } // CreateVReplicationState returns a statement to create a stopped vreplication. -func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource, position, state string) string { +func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource, position, state string, dbName string) string { return fmt.Sprintf("insert into _vt.vreplication "+ - "(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state) "+ - "values (%v, %v, %v, %v, %v, %v, 0, '%v')", - encodeString(workflow), encodeString(source.String()), encodeString(position), throttler.MaxRateModuleDisabled, throttler.ReplicationLagModuleDisabled, time.Now().Unix(), state) + "(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name) "+ + "values (%v, %v, %v, %v, %v, %v, 0, '%v', %v)", + encodeString(workflow), encodeString(source.String()), encodeString(position), throttler.MaxRateModuleDisabled, throttler.ReplicationLagModuleDisabled, time.Now().Unix(), state, encodeString(dbName)) } // GenerateUpdatePos returns a statement to update a value in the diff --git a/go/vt/binlog/binlogplayer/binlog_player_test.go b/go/vt/binlog/binlogplayer/binlog_player_test.go index 67d06fb300c..99d57f9127d 100644 --- a/go/vt/binlog/binlogplayer/binlog_player_test.go +++ b/go/vt/binlog/binlogplayer/binlog_player_test.go @@ -320,8 +320,8 @@ func applyEvents(blp *BinlogPlayer) func() error { func TestCreateVReplicationKeyRange(t *testing.T) { want := "insert into _vt.vreplication " + - "(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state) " + - `values ('Resharding', 'keyspace:\"ks\" shard:\"0\" key_range: ', 'MariaDB/0-1-1083', 9223372036854775807, 9223372036854775807, 481823, 0, 'Running')` + "(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name) " + + `values ('Resharding', 'keyspace:\"ks\" shard:\"0\" key_range: ', 'MariaDB/0-1-1083', 9223372036854775807, 9223372036854775807, 481823, 0, 'Running', 'db')` bls := binlogdatapb.BinlogSource{ Keyspace: "ks", @@ -331,7 +331,7 @@ func TestCreateVReplicationKeyRange(t *testing.T) { }, } - got := CreateVReplication("Resharding", &bls, "MariaDB/0-1-1083", throttler.MaxRateModuleDisabled, throttler.ReplicationLagModuleDisabled, 481823) + got := CreateVReplication("Resharding", &bls, "MariaDB/0-1-1083", throttler.MaxRateModuleDisabled, throttler.ReplicationLagModuleDisabled, 481823, "db") if got != want { t.Errorf("CreateVReplication() =\n%v, want\n%v", got, want) } @@ -339,8 +339,8 @@ func TestCreateVReplicationKeyRange(t *testing.T) { func TestCreateVReplicationTables(t *testing.T) { want := "insert into _vt.vreplication " + - "(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state) " + - `values ('Resharding', 'keyspace:\"ks\" shard:\"0\" tables:\"a\" tables:\"b\" ', 'MariaDB/0-1-1083', 9223372036854775807, 9223372036854775807, 481823, 0, 'Running')` + "(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name) " + + `values ('Resharding', 'keyspace:\"ks\" shard:\"0\" tables:\"a\" tables:\"b\" ', 'MariaDB/0-1-1083', 9223372036854775807, 9223372036854775807, 481823, 0, 'Running', 'db')` bls := binlogdatapb.BinlogSource{ Keyspace: "ks", @@ -348,7 +348,7 @@ func TestCreateVReplicationTables(t *testing.T) { Tables: []string{"a", "b"}, } - got := CreateVReplication("Resharding", &bls, "MariaDB/0-1-1083", throttler.MaxRateModuleDisabled, throttler.ReplicationLagModuleDisabled, 481823) + got := CreateVReplication("Resharding", &bls, "MariaDB/0-1-1083", throttler.MaxRateModuleDisabled, throttler.ReplicationLagModuleDisabled, 481823, "db") if got != want { t.Errorf("CreateVReplication() =\n%v, want\n%v", got, want) } diff --git a/go/vt/mysqlctl/backup.go b/go/vt/mysqlctl/backup.go index e0b9206d17e..1b8f498e6f5 100644 --- a/go/vt/mysqlctl/backup.go +++ b/go/vt/mysqlctl/backup.go @@ -235,7 +235,7 @@ func Restore( } if !ok { logger.Infof("Auto-restore is enabled, but mysqld already contains data. Assuming vttablet was just restarted.") - if err = PopulateMetadataTables(mysqld, localMetadata); err == nil { + if err = PopulateMetadataTables(mysqld, localMetadata, dbName); err == nil { err = ErrExistingDB } return mysql.Position{}, err @@ -264,7 +264,7 @@ func Restore( err = ErrNoBackup } - if err2 := PopulateMetadataTables(mysqld, localMetadata); err2 == nil { + if err2 := PopulateMetadataTables(mysqld, localMetadata, dbName); err2 == nil { err = ErrNoBackup } return mysql.Position{}, err @@ -300,7 +300,7 @@ func Restore( // Populate local_metadata before starting without --skip-networking, // so it's there before we start announcing ourselves. logger.Infof("Restore: populating local_metadata") - err = PopulateMetadataTables(mysqld, localMetadata) + err = PopulateMetadataTables(mysqld, localMetadata, dbName) if err != nil { return mysql.Position{}, err } diff --git a/go/vt/mysqlctl/metadata_tables.go b/go/vt/mysqlctl/metadata_tables.go index 5fd4aee1134..50e629f4394 100644 --- a/go/vt/mysqlctl/metadata_tables.go +++ b/go/vt/mysqlctl/metadata_tables.go @@ -18,23 +18,40 @@ package mysqlctl import ( "bytes" + "fmt" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/log" ) // Note that definitions of local_metadata and shard_metadata should be the same // as in testing which is defined in config/init_db.sql. -const sqlCreateLocalMetadataTable = `CREATE TABLE IF NOT EXISTS _vt.local_metadata ( +const ( + sqlCreateLocalMetadataTable = `CREATE TABLE IF NOT EXISTS _vt.local_metadata ( name VARCHAR(255) NOT NULL, value VARCHAR(255) NOT NULL, PRIMARY KEY (name) ) ENGINE=InnoDB` -const sqlCreateShardMetadataTable = `CREATE TABLE IF NOT EXISTS _vt.shard_metadata ( + sqlCreateShardMetadataTable = `CREATE TABLE IF NOT EXISTS _vt.shard_metadata ( name VARCHAR(255) NOT NULL, value MEDIUMBLOB NOT NULL, PRIMARY KEY (name) ) ENGINE=InnoDB` + sqlUpdateLocalMetadataTable = "UPDATE _vt.local_metadata SET db_name='%s' WHERE db_name=''" + sqlUpdateShardMetadataTable = "UPDATE _vt.shard_metadata SET db_name='%s' WHERE db_name=''" +) + +var ( + sqlAlterLocalMetadataTable = []string{ + `ALTER TABLE _vt.local_metadata ADD COLUMN db_name VARBINARY(255) NOT NULL`, + `ALTER TABLE _vt.local_metadata DROP PRIMARY KEY, ADD PRIMARY KEY(name, db_name)`, + } + sqlAlterShardMetadataTable = []string{ + `ALTER TABLE _vt.shard_metadata ADD COLUMN db_name VARBINARY(255) NOT NULL`, + `ALTER TABLE _vt.shard_metadata DROP PRIMARY KEY, ADD PRIMARY KEY(name, db_name)`, + } +) // PopulateMetadataTables creates and fills the _vt.local_metadata table and // creates _vt.shard_metadata table. _vt.local_metadata table is @@ -46,7 +63,7 @@ const sqlCreateShardMetadataTable = `CREATE TABLE IF NOT EXISTS _vt.shard_metada // created here to make it easier to create it on databases that were running // old version of Vitess, or databases that are getting converted to run under // Vitess. -func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string) error { +func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string, dbName string) error { log.Infof("Populating _vt.local_metadata table...") // Get a non-pooled DBA connection. @@ -69,9 +86,35 @@ func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string) if _, err := conn.ExecuteFetch(sqlCreateLocalMetadataTable, 0, false); err != nil { return err } + for _, sql := range sqlAlterLocalMetadataTable { + if _, err := conn.ExecuteFetch(sql, 0, false); err != nil { + if merr, ok := err.(*mysql.SQLError); ok && merr.Num == mysql.ERDupFieldName { + log.Errorf("Expected error executing %v: %v", sql, err) + } else { + log.Errorf("Unexpected error executing %v: %v", sql, err) + return err + } + } + } + if _, err := conn.ExecuteFetch(fmt.Sprintf(sqlUpdateLocalMetadataTable, dbName), 0, false); err != nil { + return err + } if _, err := conn.ExecuteFetch(sqlCreateShardMetadataTable, 0, false); err != nil { return err } + for _, sql := range sqlAlterShardMetadataTable { + if _, err := conn.ExecuteFetch(sql, 0, false); err != nil { + if merr, ok := err.(*mysql.SQLError); ok && merr.Num == mysql.ERDupFieldName { + log.Errorf("Expected error executing %v: %v", sql, err) + } else { + log.Errorf("Unexpected error executing %v: %v", sql, err) + return err + } + } + } + if _, err := conn.ExecuteFetch(fmt.Sprintf(sqlUpdateShardMetadataTable, dbName), 0, false); err != nil { + return err + } // Populate local_metadata from the passed list of values. if _, err := conn.ExecuteFetch("BEGIN", 0, false); err != nil { @@ -80,12 +123,15 @@ func PopulateMetadataTables(mysqld MysqlDaemon, localMetadata map[string]string) for name, val := range localMetadata { nameValue := sqltypes.NewVarChar(name) valValue := sqltypes.NewVarChar(val) + dbNameValue := sqltypes.NewVarBinary(dbName) queryBuf := bytes.Buffer{} - queryBuf.WriteString("INSERT INTO _vt.local_metadata (name,value) VALUES (") + queryBuf.WriteString("INSERT INTO _vt.local_metadata (name,value, db_name) VALUES (") nameValue.EncodeSQL(&queryBuf) queryBuf.WriteByte(',') valValue.EncodeSQL(&queryBuf) + queryBuf.WriteByte(',') + dbNameValue.EncodeSQL(&queryBuf) queryBuf.WriteString(") ON DUPLICATE KEY UPDATE value = ") valValue.EncodeSQL(&queryBuf) diff --git a/go/vt/schemamanager/schemaswap/schema_swap.go b/go/vt/schemamanager/schemaswap/schema_swap.go index b32b0e0c146..d64cebd175c 100644 --- a/go/vt/schemamanager/schemaswap/schema_swap.go +++ b/go/vt/schemamanager/schemaswap/schema_swap.go @@ -39,6 +39,7 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" workflowpb "vitess.io/vitess/go/vt/proto/workflow" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtctl" "vitess.io/vitess/go/vt/vttablet/tmclient" "vitess.io/vitess/go/vt/workflow" @@ -603,8 +604,8 @@ func (shardSwap *shardSchemaSwap) readShardMetadata(metadata *shardSwapMetadata, return } query := fmt.Sprintf( - "SELECT name, value FROM _vt.shard_metadata WHERE name in ('%s', '%s', '%s')", - lastStartedMetadataName, lastFinishedMetadataName, currentSQLMetadataName) + "SELECT name, value FROM _vt.shard_metadata WHERE db_name = '%s' and name in ('%s', '%s', '%s')", + topoproto.TabletDbName(tablet), lastStartedMetadataName, lastFinishedMetadataName, currentSQLMetadataName) queryResult, err := shardSwap.executeAdminQuery(tablet, query, 3 /* maxRows */) if err != nil { metadata.err = err @@ -640,7 +641,9 @@ func (shardSwap *shardSchemaSwap) writeStartedSwap() error { return err } queryBuf := bytes.Buffer{} - queryBuf.WriteString("INSERT INTO _vt.shard_metadata (name, value) VALUES ('") + queryBuf.WriteString("INSERT INTO _vt.shard_metadata (db_name, name, value) VALUES ('") + queryBuf.WriteString(topoproto.TabletDbName(tablet)) + queryBuf.WriteString("',") queryBuf.WriteString(currentSQLMetadataName) queryBuf.WriteString("',") sqlValue := sqltypes.NewVarChar(shardSwap.parent.sql) @@ -666,13 +669,13 @@ func (shardSwap *shardSchemaSwap) writeFinishedSwap() error { return err } query := fmt.Sprintf( - "INSERT INTO _vt.shard_metadata (name, value) VALUES ('%s', '%d') ON DUPLICATE KEY UPDATE value = '%d'", - lastFinishedMetadataName, shardSwap.parent.swapID, shardSwap.parent.swapID) + "INSERT INTO _vt.shard_metadata (db_name, name, value) VALUES ('%s', '%s', '%d') ON DUPLICATE KEY UPDATE value = '%d'", + topoproto.TabletDbName(tablet), lastFinishedMetadataName, shardSwap.parent.swapID, shardSwap.parent.swapID) _, err = shardSwap.executeAdminQuery(tablet, query, 0 /* maxRows */) if err != nil { return err } - query = fmt.Sprintf("DELETE FROM _vt.shard_metadata WHERE name = '%s'", currentSQLMetadataName) + query = fmt.Sprintf("DELETE FROM _vt.shard_metadata WHERE db_name = '%s' AND name = '%s'", topoproto.TabletDbName(tablet), currentSQLMetadataName) _, err = shardSwap.executeAdminQuery(tablet, query, 0 /* maxRows */) return err } @@ -896,7 +899,7 @@ func (shardSwap *shardSchemaSwap) executeAdminQuery(tablet *topodatapb.Tablet, q func (shardSwap *shardSchemaSwap) isSwapApplied(tablet *topodatapb.Tablet) (bool, error) { swapIDResult, err := shardSwap.executeAdminQuery( tablet, - fmt.Sprintf("SELECT value FROM _vt.local_metadata WHERE name = '%s'", lastAppliedMetadataName), + fmt.Sprintf("SELECT value FROM _vt.local_metadata WHERE db_name = '%s' AND name = '%s'", topoproto.TabletDbName(tablet), lastAppliedMetadataName), 1 /* maxRows */) if err != nil { return false, err @@ -1036,8 +1039,8 @@ func (shardSwap *shardSchemaSwap) applySeedSchemaChange() (err error) { return err } updateAppliedSwapQuery := fmt.Sprintf( - "INSERT INTO _vt.local_metadata (name, value) VALUES ('%s', '%d') ON DUPLICATE KEY UPDATE value = '%d'", - lastAppliedMetadataName, shardSwap.parent.swapID, shardSwap.parent.swapID) + "INSERT INTO _vt.local_metadata (db_name, name, value) VALUES ('%s', '%s', '%d') ON DUPLICATE KEY UPDATE value = '%d'", + topoproto.TabletDbName(seedTablet), lastAppliedMetadataName, shardSwap.parent.swapID, shardSwap.parent.swapID) _, err = shardSwap.parent.tabletClient.ExecuteFetchAsDba( shardSwap.parent.ctx, seedTablet, diff --git a/go/vt/vttablet/tabletmanager/action_agent.go b/go/vt/vttablet/tabletmanager/action_agent.go index e55377e7993..98072227b75 100644 --- a/go/vt/vttablet/tabletmanager/action_agent.go +++ b/go/vt/vttablet/tabletmanager/action_agent.go @@ -263,13 +263,6 @@ func NewActionAgent( agent.statsTabletType = stats.NewString("TabletType") agent.statsTabletTypeCount = stats.NewCountersWithSingleLabel("TabletTypeCount", "Number of times the tablet changed to the labeled type", "type") - // The db name will get set by the Start function called below, before - // VREngine gets to invoke the FilteredWithDB call. - agent.VREngine = vreplication.NewEngine(ts, tabletAlias.Cell, mysqld, func() binlogplayer.DBClient { - return binlogplayer.NewDBClient(agent.DBConfigs.FilteredWithDB()) - }) - servenv.OnTerm(agent.VREngine.Close) - var mysqlHost string var mysqlPort int32 if appConfig := dbcfgs.AppWithDB(); appConfig.Host != "" { @@ -289,6 +282,12 @@ func NewActionAgent( return nil, err } + // The db name is set by the Start function called above + agent.VREngine = vreplication.NewEngine(ts, tabletAlias.Cell, mysqld, func() binlogplayer.DBClient { + return binlogplayer.NewDBClient(agent.DBConfigs.FilteredWithDB()) + }, agent.DBConfigs.FilteredWithDB().DbName) + servenv.OnTerm(agent.VREngine.Close) + // Run a background task to rebuild the SrvKeyspace in our cell/keyspace // if it doesn't exist yet. go agent.maybeRebuildKeyspace(agent.initialTablet.Alias.Cell, agent.initialTablet.Keyspace) @@ -340,6 +339,10 @@ func NewActionAgent( // NewTestActionAgent creates an agent for test purposes. Only a // subset of features are supported now, but we'll add more over time. func NewTestActionAgent(batchCtx context.Context, ts *topo.Server, tabletAlias *topodatapb.TabletAlias, vtPort, grpcPort int32, mysqlDaemon mysqlctl.MysqlDaemon, preStart func(*ActionAgent)) *ActionAgent { + ti, err := ts.GetTablet(batchCtx, tabletAlias) + if err != nil { + panic(vterrors.Wrap(err, "failed reading tablet")) + } agent := &ActionAgent{ QueryServiceControl: tabletservermock.NewController(), UpdateStream: binlog.NewUpdateStreamControlMock(), @@ -350,7 +353,7 @@ func NewTestActionAgent(batchCtx context.Context, ts *topo.Server, tabletAlias * Cnf: nil, MysqlDaemon: mysqlDaemon, DBConfigs: &dbconfigs.DBConfigs{}, - VREngine: vreplication.NewEngine(ts, tabletAlias.Cell, mysqlDaemon, binlogplayer.NewFakeDBClient), + VREngine: vreplication.NewEngine(ts, tabletAlias.Cell, mysqlDaemon, binlogplayer.NewFakeDBClient, ti.DbName()), History: history.New(historyLength), _healthy: fmt.Errorf("healthcheck not run yet"), } @@ -389,7 +392,7 @@ func NewComboActionAgent(batchCtx context.Context, ts *topo.Server, tabletAlias Cnf: nil, MysqlDaemon: mysqlDaemon, DBConfigs: dbcfgs, - VREngine: vreplication.NewEngine(nil, "", nil, nil), + VREngine: vreplication.NewEngine(nil, "", nil, nil, ""), gotMysqlPort: true, History: history.New(historyLength), _healthy: fmt.Errorf("healthcheck not run yet"), diff --git a/go/vt/vttablet/tabletmanager/init_tablet.go b/go/vt/vttablet/tabletmanager/init_tablet.go index ba4ed6e8a53..d224fe96a55 100644 --- a/go/vt/vttablet/tabletmanager/init_tablet.go +++ b/go/vt/vttablet/tabletmanager/init_tablet.go @@ -209,7 +209,7 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int32) error { if *initPopulateMetadata { agent.setTablet(tablet) localMetadata := agent.getLocalMetadataValues(tablet.Type) - err := mysqlctl.PopulateMetadataTables(agent.MysqlDaemon, localMetadata) + err := mysqlctl.PopulateMetadataTables(agent.MysqlDaemon, localMetadata, topoproto.TabletDbName(tablet)) if err != nil { return vterrors.Wrap(err, "failed to -init_populate_metadata") } diff --git a/go/vt/vttablet/tabletmanager/init_tablet_test.go b/go/vt/vttablet/tabletmanager/init_tablet_test.go index 27c057c9680..b19db6af3a1 100644 --- a/go/vt/vttablet/tabletmanager/init_tablet_test.go +++ b/go/vt/vttablet/tabletmanager/init_tablet_test.go @@ -173,7 +173,7 @@ func TestInitTablet(t *testing.T) { } db := fakesqldb.New(t) defer db.Close() - db.AddQueryPattern(`(SET|CREATE|BEGIN|INSERT|COMMIT)\b.*`, &sqltypes.Result{}) + db.AddQueryPattern(`(SET|CREATE|BEGIN|INSERT|COMMIT|ALTER|UPDATE)\b.*`, &sqltypes.Result{}) /* db.AddQuery("SET @@session.sql_log_bin = 0", &sqltypes.Result{}) db.AddQuery("CREATE DATABASE IF NOT EXISTS _vt", &sqltypes.Result{}) @@ -194,7 +194,7 @@ func TestInitTablet(t *testing.T) { TabletAlias: tabletAlias, MysqlDaemon: mysqlDaemon, DBConfigs: &dbconfigs.DBConfigs{}, - VREngine: vreplication.NewEngine(nil, "", nil, nil), + VREngine: vreplication.NewEngine(nil, "", nil, nil, ""), batchCtx: ctx, History: history.New(historyLength), _healthy: fmt.Errorf("healthcheck not run yet"), diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index 3577508abdc..7aa5c542ba8 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -61,17 +61,19 @@ type Engine struct { cell string mysqld mysqlctl.MysqlDaemon dbClientFactory func() binlogplayer.DBClient + dbName string } // NewEngine creates a new Engine. // A nil ts means that the Engine is disabled. -func NewEngine(ts *topo.Server, cell string, mysqld mysqlctl.MysqlDaemon, dbClientFactory func() binlogplayer.DBClient) *Engine { +func NewEngine(ts *topo.Server, cell string, mysqld mysqlctl.MysqlDaemon, dbClientFactory func() binlogplayer.DBClient, dbName string) *Engine { vre := &Engine{ controllers: make(map[int]*controller), ts: ts, cell: cell, mysqld: mysqld, dbClientFactory: dbClientFactory, + dbName: dbName, } return vre } @@ -111,18 +113,31 @@ func (vre *Engine) executeFetchMaybeCreateTable(dbClient binlogplayer.DBClient, // If it's a bad table or db, it could be because _vt.vreplication wasn't created. // In that case we can try creating it again. merr, isSQLErr := err.(*mysql.SQLError) - if !isSQLErr || !(merr.Num == mysql.ERNoSuchTable || merr.Num == mysql.ERBadDb) { + if !isSQLErr || !(merr.Num == mysql.ERNoSuchTable || merr.Num == mysql.ERBadDb || merr.Num == mysql.ERBadFieldError) { return qr, err } log.Info("Looks like _vt.vreplication table may not exist. Trying to recreate... ") - for _, query := range binlogplayer.CreateVReplicationTable() { - if _, merr := dbClient.ExecuteFetch(query, 0); merr != nil { - log.Warningf("Failed to ensure _vt.vreplication table exists: %v", merr) - return nil, err + if merr.Num == mysql.ERNoSuchTable || merr.Num == mysql.ERBadDb { + for _, query := range binlogplayer.CreateVReplicationTable() { + if _, merr := dbClient.ExecuteFetch(query, 0); merr != nil { + log.Warningf("Failed to ensure _vt.vreplication table exists: %v", merr) + return nil, err + } + } + } + if merr.Num == mysql.ERBadFieldError { + log.Info("Adding column to table _vt.vreplication") + for _, query := range binlogplayer.AlterVReplicationTable() { + if _, merr := dbClient.ExecuteFetch(query, 0); merr != nil { + merr, isSQLErr := err.(*mysql.SQLError) + if !isSQLErr || !(merr.Num == mysql.ERDupFieldName) { + log.Warningf("Failed to alter _vt.vreplication table: %v", merr) + return nil, err + } + } } } - return dbClient.ExecuteFetch(query, maxrows) } @@ -133,13 +148,18 @@ func (vre *Engine) initAll() error { } defer dbClient.Close() - rows, err := readAllRows(dbClient) + rows, err := readAllRows(dbClient, vre.dbName) if err != nil { // Handle Table not found. if merr, ok := err.(*mysql.SQLError); ok && merr.Num == mysql.ERNoSuchTable { log.Info("_vt.vreplication table not found. Will create it later if needed") return nil } + // Handle missing field + if merr, ok := err.(*mysql.SQLError); ok && merr.Num == mysql.ERBadFieldError { + log.Info("_vt.vreplication table found but is missing field db_name. Will add it later if needed") + return nil + } return err } for _, row := range rows { @@ -347,8 +367,8 @@ func (vre *Engine) updateStats() { } } -func readAllRows(dbClient binlogplayer.DBClient) ([]map[string]string, error) { - qr, err := dbClient.ExecuteFetch("select * from _vt.vreplication", 10000) +func readAllRows(dbClient binlogplayer.DBClient, dbName string) ([]map[string]string, error) { + qr, err := dbClient.ExecuteFetch(fmt.Sprintf("select * from _vt.vreplication where db_name=%v", encodeString(dbName)), 10000) if err != nil { return nil, err } diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go index d0814296585..a4ac882dd10 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go @@ -41,12 +41,12 @@ func TestEngineOpen(t *testing.T) { // Test Insert - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory) + vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) if vre.IsOpen() { t.Errorf("IsOpen: %v, want false", vre.IsOpen()) } - dbClient.ExpectRequest("select * from _vt.vreplication", sqltypes.MakeTestResult( + dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", sqltypes.MakeTestResult( sqltypes.MakeTestFields( "id|state|source", "int64|varchar|varchar", @@ -89,9 +89,9 @@ func TestEngineExec(t *testing.T) { // Test Insert - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory) + vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) - dbClient.ExpectRequest("select * from _vt.vreplication", &sqltypes.Result{}, nil) + dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := vre.Open(context.Background()); err != nil { t.Fatal(err) } @@ -212,9 +212,9 @@ func TestEngineBadInsert(t *testing.T) { dbClientFactory := func() binlogplayer.DBClient { return dbClient } mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory) + vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) - dbClient.ExpectRequest("select * from _vt.vreplication", &sqltypes.Result{}, nil) + dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := vre.Open(context.Background()); err != nil { t.Fatal(err) } @@ -242,9 +242,9 @@ func TestEngineSelect(t *testing.T) { dbClientFactory := func() binlogplayer.DBClient { return dbClient } mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory) + vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) - dbClient.ExpectRequest("select * from _vt.vreplication", &sqltypes.Result{}, nil) + dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := vre.Open(context.Background()); err != nil { t.Fatal(err) } @@ -277,9 +277,9 @@ func TestWaitForPos(t *testing.T) { dbClient := binlogplayer.NewMockDBClient(t) mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} dbClientFactory := func() binlogplayer.DBClient { return dbClient } - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory) + vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) - dbClient.ExpectRequest("select * from _vt.vreplication", &sqltypes.Result{}, nil) + dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := vre.Open(context.Background()); err != nil { t.Fatal(err) } @@ -307,7 +307,7 @@ func TestWaitForPosError(t *testing.T) { dbClient := binlogplayer.NewMockDBClient(t) mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} dbClientFactory := func() binlogplayer.DBClient { return dbClient } - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory) + vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) err := vre.WaitForPos(context.Background(), 1, "MariaDB/0-1-1084") want := `vreplication engine is closed` @@ -315,7 +315,7 @@ func TestWaitForPosError(t *testing.T) { t.Errorf("WaitForPos: %v, want %v", err, want) } - dbClient.ExpectRequest("select * from _vt.vreplication", &sqltypes.Result{}, nil) + dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := vre.Open(context.Background()); err != nil { t.Fatal(err) } @@ -349,9 +349,9 @@ func TestWaitForPosCancel(t *testing.T) { dbClient := binlogplayer.NewMockDBClient(t) mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} dbClientFactory := func() binlogplayer.DBClient { return dbClient } - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory) + vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) - dbClient.ExpectRequest("select * from _vt.vreplication", &sqltypes.Result{}, nil) + dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := vre.Open(context.Background()); err != nil { t.Fatal(err) } @@ -396,10 +396,10 @@ func TestCreateDBAndTable(t *testing.T) { // Test Insert - vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory) + vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) tableNotFound := mysql.SQLError{Num: 1146, Message: "table not found"} - dbClient.ExpectRequest("select * from _vt.vreplication", nil, &tableNotFound) + dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", nil, &tableNotFound) if err := vre.Open(context.Background()); err != nil { t.Fatal(err) } @@ -412,7 +412,6 @@ func TestCreateDBAndTable(t *testing.T) { dbClient.ExpectRequest("CREATE DATABASE IF NOT EXISTS _vt", &sqltypes.Result{}, nil) dbClient.ExpectRequest("DROP TABLE IF EXISTS _vt.blp_checkpoint", &sqltypes.Result{}, nil) dbClient.ExpectRequestRE("CREATE TABLE IF NOT EXISTS _vt.vreplication.*", &sqltypes.Result{}, nil) - dbClient.ExpectRequest("use _vt", &sqltypes.Result{}, nil) // Non-recoverable error. diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 42f10a47b23..a9b5d60d4c8 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -96,7 +96,7 @@ func TestMain(m *testing.M) { return 1 } - playerEngine = NewEngine(env.TopoServ, env.Cells[0], env.Mysqld, realDBClientFactory) + playerEngine = NewEngine(env.TopoServ, env.Cells[0], env.Mysqld, realDBClientFactory, vrepldb) if err := playerEngine.Open(context.Background()); err != nil { fmt.Fprintf(os.Stderr, "%v", err) return 1 diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 56c3cda402d..35062f79813 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -305,7 +305,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { // 1. Fetch was idle for idleTimeout. // 2. We've been receiving empty events for longer than idleTimeout. // In both cases, now > timeLastSaved. If so, any unsaved GTID should be saved. - if time.Now().Sub(vp.timeLastSaved) >= idleTimeout && vp.unsavedGTID != nil { + if time.Since(vp.timeLastSaved) >= idleTimeout && vp.unsavedGTID != nil { // Although unlikely, we should not save if a transaction is still open. // This can happen if a large transaction is split as multiple events. if !vp.dbClient.InTransaction { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_copy_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_copy_test.go index 8c1b839d7c6..7b2e1b41aa4 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_copy_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_copy_test.go @@ -66,7 +66,7 @@ func TestPlayerInitTables(t *testing.T) { Filter: filter, OnDdl: binlogdatapb.OnDDLAction_IGNORE, } - query := binlogplayer.CreateVReplicationState("test", bls, "", binlogplayer.VReplicationInit) + query := binlogplayer.CreateVReplicationState("test", bls, "", binlogplayer.VReplicationInit, playerEngine.dbName) qr, err := playerEngine.Exec(query) if err != nil { t.Fatal(err) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go index 37860a047e8..d806fa2e4ef 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go @@ -682,7 +682,7 @@ func TestPlayerStopPos(t *testing.T) { OnDdl: binlogdatapb.OnDDLAction_IGNORE, } startPos := masterPosition(t) - query := binlogplayer.CreateVReplicationState("test", bls, startPos, binlogplayer.BlpStopped) + query := binlogplayer.CreateVReplicationState("test", bls, startPos, binlogplayer.BlpStopped, vrepldb) qr, err := playerEngine.Exec(query) if err != nil { t.Fatal(err) @@ -792,7 +792,7 @@ func TestPlayerIdleUpdate(t *testing.T) { expectDBClientQueries(t, []string{ "/update _vt.vreplication set pos=", }) - if duration := time.Now().Sub(start); duration < idleTimeout { + if duration := time.Since(start); duration < idleTimeout { t.Errorf("duration: %v, must be at least %v", duration, idleTimeout) } } @@ -1282,7 +1282,7 @@ func startVReplication(t *testing.T, filter *binlogdatapb.Filter, onddl binlogda if pos == "" { pos = masterPosition(t) } - query := binlogplayer.CreateVReplication("test", bls, pos, 9223372036854775807, 9223372036854775807, 0) + query := binlogplayer.CreateVReplication("test", bls, pos, 9223372036854775807, 9223372036854775807, 0, vrepldb) qr, err := playerEngine.Exec(query) if err != nil { t.Fatal(err) diff --git a/go/vt/worker/legacy_split_clone.go b/go/vt/worker/legacy_split_clone.go index c7af31b072a..aae581c0e6f 100644 --- a/go/vt/worker/legacy_split_clone.go +++ b/go/vt/worker/legacy_split_clone.go @@ -615,6 +615,9 @@ func (scw *LegacySplitCloneWorker) copy(ctx context.Context) error { } for _, si := range scw.destinationShards { + keyspaceAndShard := topoproto.KeyspaceShardString(si.Keyspace(), si.ShardName()) + dbName := scw.destinationDbNames[keyspaceAndShard] + destinationWaitGroup.Add(1) go func(keyspace, shard string, kr *topodatapb.KeyRange) { defer destinationWaitGroup.Done() @@ -627,10 +630,12 @@ func (scw *LegacySplitCloneWorker) copy(ctx context.Context) error { Shard: src.ShardName(), KeyRange: kr, } - qr, err := exc.vreplicationExec(ctx, binlogplayer.CreateVReplication("LegacySplitClone", bls, sourcePositions[shardIndex], scw.maxTPS, throttler.ReplicationLagModuleDisabled, time.Now().Unix())) + qr, err := exc.vreplicationExec(ctx, binlogplayer.CreateVReplication("LegacySplitClone", bls, sourcePositions[shardIndex], scw.maxTPS, throttler.ReplicationLagModuleDisabled, time.Now().Unix(), dbName)) if err != nil { processError("vreplication queries failed: %v", err) break + } else { + scw.wr.Logger().Infof("Created replication for tablet %v/%v: %v, db: %v, pos: %v, uid: %v", keyspace, shard, bls, dbName, sourcePositions[shardIndex], uint32(qr.InsertID)) } if err := scw.wr.SourceShardAdd(ctx, keyspace, shard, uint32(qr.InsertID), src.Keyspace(), src.ShardName(), src.Shard.KeyRange, nil); err != nil { processError("could not add source shard: %v", err) diff --git a/go/vt/worker/split_clone.go b/go/vt/worker/split_clone.go index c3606356306..4ef4f8157f1 100644 --- a/go/vt/worker/split_clone.go +++ b/go/vt/worker/split_clone.go @@ -1262,6 +1262,9 @@ func (scw *SplitCloneWorker) setUpVReplication(ctx context.Context) error { } for _, si := range scw.destinationShards { + keyspaceAndShard := topoproto.KeyspaceShardString(si.Keyspace(), si.ShardName()) + dbName := scw.destinationDbNames[keyspaceAndShard] + wg.Add(1) go func(keyspace, shard string, kr *topodatapb.KeyRange) { defer wg.Done() @@ -1286,12 +1289,13 @@ func (scw *SplitCloneWorker) setUpVReplication(ctx context.Context) error { bls.Tables = scw.tables } // TODO(mberlin): Fill in scw.maxReplicationLag once the adapative throttler is enabled by default. - qr, err := exc.vreplicationExec(cancelableCtx, binlogplayer.CreateVReplication("SplitClone", bls, sourcePositions[shardIndex], scw.maxTPS, throttler.ReplicationLagModuleDisabled, time.Now().Unix())) + qr, err := exc.vreplicationExec(cancelableCtx, binlogplayer.CreateVReplication("SplitClone", bls, sourcePositions[shardIndex], scw.maxTPS, throttler.ReplicationLagModuleDisabled, time.Now().Unix(), dbName)) if err != nil { handleError(vterrors.Wrap(err, "vreplication queries failed")) cancel() return } + scw.wr.Logger().Infof("Created replication for tablet %v/%v: %v, db: %v, pos: %v, uid: %v", keyspace, shard, bls, dbName, sourcePositions[shardIndex], uint32(qr.InsertID)) if err := scw.wr.SourceShardAdd(cancelableCtx, keyspace, shard, uint32(qr.InsertID), src.Keyspace(), src.ShardName(), src.Shard.KeyRange, scw.tables); err != nil { handleError(vterrors.Wrap(err, "could not add source shard")) break diff --git a/go/vt/worker/split_diff.go b/go/vt/worker/split_diff.go index 7fd7a40cf73..55b5c084f33 100644 --- a/go/vt/worker/split_diff.go +++ b/go/vt/worker/split_diff.go @@ -315,7 +315,7 @@ func (sdw *SplitDiffWorker) synchronizeReplication(ctx context.Context) error { wrangler.RecordVReplicationAction(sdw.cleaner, masterInfo.Tablet, binlogplayer.StartVReplication(sdw.sourceShard.Uid)) p3qr, err := sdw.wr.TabletManagerClient().VReplicationExec(shortCtx, masterInfo.Tablet, binlogplayer.ReadVReplicationPos(sdw.sourceShard.Uid)) if err != nil { - return vterrors.Wrapf(err, "VReplicationExec(stop) for %v failed", sdw.shardInfo.MasterAlias) + return vterrors.Wrapf(err, "ReadVReplicationPos for %v failed", sdw.shardInfo.MasterAlias) } qr := sqltypes.Proto3ToResult(p3qr) if len(qr.Rows) != 1 || len(qr.Rows[0]) != 1 { diff --git a/go/vt/wrangler/keyspace.go b/go/vt/wrangler/keyspace.go index 451a73a6a68..9ef846f58f4 100644 --- a/go/vt/wrangler/keyspace.go +++ b/go/vt/wrangler/keyspace.go @@ -17,6 +17,7 @@ limitations under the License. package wrangler import ( + "bytes" "flag" "fmt" "strings" @@ -135,7 +136,7 @@ func (wr *Wrangler) printShards(ctx context.Context, si []*topo.ShardInfo) error if err != nil { return err } - qr, err := wr.tmc.VReplicationExec(ctx, ti.Tablet, "select * from _vt.vreplication") + qr, err := wr.tmc.VReplicationExec(ctx, ti.Tablet, fmt.Sprintf("select * from _vt.vreplication where db_name=%v", encodeString(ti.DbName()))) if err != nil { return err } @@ -681,6 +682,11 @@ func (wr *Wrangler) setupReverseReplication(ctx context.Context, sourceShards, d // Create reverse replication for each source. for i, sourceShard := range sourceShards { + ti, err := wr.ts.GetTablet(ctx, sourceShard.MasterAlias) + if err != nil { + return err + } + dbName := ti.DbName() if len(sourceShard.SourceShards) != 0 { continue } @@ -697,16 +703,15 @@ func (wr *Wrangler) setupReverseReplication(ctx context.Context, sourceShards, d Shard: dest.ShardName(), KeyRange: kr, } - qr, err := wr.VReplicationExec(ctx, sourceShard.MasterAlias, binlogplayer.CreateVReplicationState("ReversedResharding", bls, masterPositions[j], binlogplayer.BlpStopped)) + qr, err := wr.VReplicationExec(ctx, sourceShard.MasterAlias, binlogplayer.CreateVReplicationState("ReversedResharding", bls, masterPositions[j], binlogplayer.BlpStopped, dbName)) if err != nil { return err } uids[j] = uint32(qr.InsertId) - wr.Logger().Infof("Created reverse replication for tablet %v/%v: %v, pos: %v, uid: %v", sourceShard.Keyspace(), sourceShard.ShardName(), bls, masterPositions[j], uids[j]) + wr.Logger().Infof("Created reverse replication for tablet %v/%v: %v, db: %v, pos: %v, uid: %v", sourceShard.Keyspace(), sourceShard.ShardName(), bls, dbName, masterPositions[j], uids[j]) } // Source shards have to be atomically added to ensure idempotence. // If this fails, there's no harm because the unstarted vreplication streams will just be abandoned. - var err error sourceShards[i], err = wr.ts.UpdateShardFields(ctx, sourceShard.Keyspace(), sourceShard.ShardName(), func(si *topo.ShardInfo) error { for j, dest := range destinationShards { si.SourceShards = append(si.SourceShards, &topodatapb.Shard_SourceShard{ @@ -1322,3 +1327,9 @@ func (wr *Wrangler) RemoveKeyspaceCell(ctx context.Context, keyspace, cell strin wr.Logger().Infof("Removing cell %v keyspace %v SrvKeyspace object", cell, keyspace) return wr.ts.DeleteSrvKeyspace(ctx, cell, keyspace) } + +func encodeString(in string) string { + buf := bytes.NewBuffer(nil) + sqltypes.NewVarChar(in).EncodeSQL(buf) + return buf.String() +} diff --git a/go/vt/wrangler/schema.go b/go/vt/wrangler/schema.go index 4b6bbb533d4..fd041172797 100644 --- a/go/vt/wrangler/schema.go +++ b/go/vt/wrangler/schema.go @@ -379,17 +379,21 @@ func (wr *Wrangler) copyShardMetadata(ctx context.Context, srcTabletAlias *topod return nil } - sql = "SELECT name, value FROM _vt.shard_metadata" + // TODO: 100 may be too low here for row limit + sql = "SELECT db_name, name, value FROM _vt.shard_metadata" dataProto, err := wr.ExecuteFetchAsDba(ctx, srcTabletAlias, sql, 100, false, false) if err != nil { return fmt.Errorf("ExecuteFetchAsDba(%v, %v, 100, false, false) failed: %v", srcTabletAlias, sql, err) } data := sqltypes.Proto3ToResult(dataProto) for _, row := range data.Rows { - name := row[0] - value := row[1] + dbName := row[0] + name := row[1] + value := row[2] queryBuf := bytes.Buffer{} - queryBuf.WriteString("INSERT INTO _vt.shard_metadata (name, value) VALUES (") + queryBuf.WriteString("INSERT INTO _vt.shard_metadata (db_name, name, value) VALUES (") + dbName.EncodeSQL(&queryBuf) + queryBuf.WriteByte(',') name.EncodeSQL(&queryBuf) queryBuf.WriteByte(',') value.EncodeSQL(&queryBuf) diff --git a/go/vt/wrangler/testlib/backup_test.go b/go/vt/wrangler/testlib/backup_test.go index a9abb4497d8..ddd80965452 100644 --- a/go/vt/wrangler/testlib/backup_test.go +++ b/go/vt/wrangler/testlib/backup_test.go @@ -56,6 +56,10 @@ func TestBackupRestore(t *testing.T) { db.AddQueryPattern(`SET @@session\.sql_log_bin = .*`, &sqltypes.Result{}) db.AddQueryPattern(`CREATE TABLE IF NOT EXISTS _vt\.shard_metadata .*`, &sqltypes.Result{}) db.AddQueryPattern(`CREATE TABLE IF NOT EXISTS _vt\.local_metadata .*`, &sqltypes.Result{}) + db.AddQueryPattern(`ALTER TABLE _vt\.local_metadata .*`, &sqltypes.Result{}) + db.AddQueryPattern(`ALTER TABLE _vt\.shard_metadata .*`, &sqltypes.Result{}) + db.AddQueryPattern(`UPDATE _vt\.local_metadata SET db_name=.*`, &sqltypes.Result{}) + db.AddQueryPattern(`UPDATE _vt\.shard_metadata SET db_name=.*`, &sqltypes.Result{}) db.AddQueryPattern(`INSERT INTO _vt\.local_metadata .*`, &sqltypes.Result{}) // Initialize our temp dirs diff --git a/go/vt/wrangler/testlib/copy_schema_shard_test.go b/go/vt/wrangler/testlib/copy_schema_shard_test.go index 024a44be75b..ad33097440b 100644 --- a/go/vt/wrangler/testlib/copy_schema_shard_test.go +++ b/go/vt/wrangler/testlib/copy_schema_shard_test.go @@ -115,7 +115,7 @@ func copySchema(t *testing.T, useShardAsSource bool) { " KEY `by_msg` (`msg`)\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8" selectInformationSchema := "SELECT 1 FROM information_schema.tables WHERE table_schema = '_vt' AND table_name = 'shard_metadata'" - selectShardMetadata := "SELECT name, value FROM _vt.shard_metadata" + selectShardMetadata := "SELECT db_name, name, value FROM _vt.shard_metadata" // The source table is asked about its schema. // It may be the master or the rdonly. diff --git a/go/vt/wrangler/testlib/migrate_served_from_test.go b/go/vt/wrangler/testlib/migrate_served_from_test.go index 7388600aead..19d8f4ef69c 100644 --- a/go/vt/wrangler/testlib/migrate_served_from_test.go +++ b/go/vt/wrangler/testlib/migrate_served_from_test.go @@ -106,8 +106,8 @@ func TestMigrateServedFrom(t *testing.T) { // Override with a fake VREngine after Agent is initialized in action loop. dbClient := binlogplayer.NewMockDBClient(t) dbClientFactory := func() binlogplayer.DBClient { return dbClient } - destMaster.Agent.VREngine = vreplication.NewEngine(ts, "", destMaster.FakeMysqlDaemon, dbClientFactory) - dbClient.ExpectRequest("select * from _vt.vreplication", &sqltypes.Result{}, nil) + destMaster.Agent.VREngine = vreplication.NewEngine(ts, "", destMaster.FakeMysqlDaemon, dbClientFactory, dbClient.DBName()) + dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := destMaster.Agent.VREngine.Open(context.Background()); err != nil { t.Fatal(err) } diff --git a/go/vt/wrangler/testlib/migrate_served_types_test.go b/go/vt/wrangler/testlib/migrate_served_types_test.go index 5fb5ecf4a11..5cb170c5c8c 100644 --- a/go/vt/wrangler/testlib/migrate_served_types_test.go +++ b/go/vt/wrangler/testlib/migrate_served_types_test.go @@ -153,9 +153,9 @@ func TestMigrateServedTypes(t *testing.T) { // Override with a fake VREngine after Agent is initialized in action loop. dbClient1 := binlogplayer.NewMockDBClient(t) dbClientFactory1 := func() binlogplayer.DBClient { return dbClient1 } - dest1Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest1Master.FakeMysqlDaemon, dbClientFactory1) + dest1Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest1Master.FakeMysqlDaemon, dbClientFactory1, dbClient1.DBName()) // select * from _vt.vreplication during Open - dbClient1.ExpectRequest("select * from _vt.vreplication", &sqltypes.Result{}, nil) + dbClient1.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := dest1Master.Agent.VREngine.Open(context.Background()); err != nil { t.Fatal(err) } @@ -182,9 +182,9 @@ func TestMigrateServedTypes(t *testing.T) { // Override with a fake VREngine after Agent is initialized in action loop. dbClient2 := binlogplayer.NewMockDBClient(t) dbClientFactory2 := func() binlogplayer.DBClient { return dbClient2 } - dest2Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest2Master.FakeMysqlDaemon, dbClientFactory2) + dest2Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest2Master.FakeMysqlDaemon, dbClientFactory2, dbClient2.DBName()) // select * from _vt.vreplication during Open - dbClient2.ExpectRequest("select * from _vt.vreplication", &sqltypes.Result{}, nil) + dbClient2.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := dest2Master.Agent.VREngine.Open(context.Background()); err != nil { t.Fatal(err) } @@ -419,9 +419,9 @@ func TestMultiShardMigrateServedTypes(t *testing.T) { // Override with a fake VREngine after Agent is initialized in action loop. dbClient1 := binlogplayer.NewMockDBClient(t) dbClientFactory1 := func() binlogplayer.DBClient { return dbClient1 } - dest1Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest1Master.FakeMysqlDaemon, dbClientFactory1) + dest1Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest1Master.FakeMysqlDaemon, dbClientFactory1, "db") // select * from _vt.vreplication during Open - dbClient1.ExpectRequest("select * from _vt.vreplication", &sqltypes.Result{}, nil) + dbClient1.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := dest1Master.Agent.VREngine.Open(context.Background()); err != nil { t.Fatal(err) } @@ -437,9 +437,9 @@ func TestMultiShardMigrateServedTypes(t *testing.T) { // Override with a fake VREngine after Agent is initialized in action loop. dbClient2 := binlogplayer.NewMockDBClient(t) dbClientFactory2 := func() binlogplayer.DBClient { return dbClient2 } - dest2Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest2Master.FakeMysqlDaemon, dbClientFactory2) + dest2Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest2Master.FakeMysqlDaemon, dbClientFactory2, "db") // select * from _vt.vreplication during Open - dbClient2.ExpectRequest("select * from _vt.vreplication", &sqltypes.Result{}, nil) + dbClient2.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := dest2Master.Agent.VREngine.Open(context.Background()); err != nil { t.Fatal(err) } @@ -509,9 +509,9 @@ func TestMultiShardMigrateServedTypes(t *testing.T) { // Override with a fake VREngine after Agent is initialized in action loop. dbClient1 = binlogplayer.NewMockDBClient(t) dbClientFactory1 = func() binlogplayer.DBClient { return dbClient1 } - dest3Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest3Master.FakeMysqlDaemon, dbClientFactory1) + dest3Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest3Master.FakeMysqlDaemon, dbClientFactory1, "db") // select * from _vt.vreplication during Open - dbClient1.ExpectRequest("select * from _vt.vreplication", &sqltypes.Result{}, nil) + dbClient1.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := dest3Master.Agent.VREngine.Open(context.Background()); err != nil { t.Fatal(err) } @@ -527,9 +527,9 @@ func TestMultiShardMigrateServedTypes(t *testing.T) { // Override with a fake VREngine after Agent is initialized in action loop. dbClient2 = binlogplayer.NewMockDBClient(t) dbClientFactory2 = func() binlogplayer.DBClient { return dbClient2 } - dest4Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest4Master.FakeMysqlDaemon, dbClientFactory2) + dest4Master.Agent.VREngine = vreplication.NewEngine(ts, "", dest4Master.FakeMysqlDaemon, dbClientFactory2, "db") // select * from _vt.vreplication during Open - dbClient2.ExpectRequest("select * from _vt.vreplication", &sqltypes.Result{}, nil) + dbClient2.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) if err := dest4Master.Agent.VREngine.Open(context.Background()); err != nil { t.Fatal(err) } diff --git a/test/base_sharding.py b/test/base_sharding.py index 64fefd28f89..5cdb5398271 100644 --- a/test/base_sharding.py +++ b/test/base_sharding.py @@ -54,7 +54,7 @@ class BaseShardingTest(object): def _insert_value(self, tablet_obj, table, mid, msg, keyspace_id): k = utils.uint64_to_hex(keyspace_id) tablet_obj.mquery( - 'vt_test_keyspace', + tablet_obj.dbname, ['begin', 'insert into %s(parent_id, id, msg, custom_ksid_col) ' 'values(%d, %d, "%s", 0x%x) /* vtgate:: keyspace_id:%s */ ' @@ -84,7 +84,7 @@ def _insert_multi_value(self, tablet_obj, table, mids, msgs, keyspace_ids): querystr += values_str tablet_obj.mquery( - 'vt_test_keyspace', + tablet_obj.dbname, ['begin', querystr, 'commit'], @@ -119,7 +119,7 @@ def _get_value(self, tablet_obj, table, mid): A tuple of results. """ return tablet_obj.mquery( - 'vt_test_keyspace', + tablet_obj.dbname, 'select parent_id, id, msg, custom_ksid_col from %s ' 'where parent_id=%d and id=%d' % (table, fixed_parent_id, mid)) diff --git a/test/config.json b/test/config.json index 36c0cc3ecfe..86c8ba830f3 100644 --- a/test/config.json +++ b/test/config.json @@ -106,6 +106,15 @@ "worker_test" ] }, + "initial_sharding_multi": { + "File": "initial_sharding_multi.py", + "Args": [], + "Command": [], + "Manual": false, + "Shard": 3, + "RetryMax": 0, + "Tags": [] + }, "initial_sharding_bytes": { "File": "initial_sharding_bytes.py", "Args": [], diff --git a/test/initial_sharding_multi.py b/test/initial_sharding_multi.py new file mode 100755 index 00000000000..998864e3f67 --- /dev/null +++ b/test/initial_sharding_multi.py @@ -0,0 +1,801 @@ +#!/usr/bin/env python +# +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This test simulates the first time a database has to be split +in a multi-vttablet-single-mysql environment + +We have 2 keyspaces. One keyspace is in managing mode. It's vttablets +own the MySQL instances and can reparent, start/stop server, start/stop +replication etc. Other keyspace is in non-managing mode and cannot do +any of these actions. Only TabletExternallyReparented is allowed, but +resharding should still work. + +For each keyspace: +- we start with a keyspace with a single shard and a single table +- we add and populate the sharding key +- we set the sharding key in the topology +- we clone into 2 instances +- we enable filtered replication +- we move all serving types +- we remove the source tablets +- we remove the original shard +""" + +import json +import logging +import unittest +from vtdb import keyrange_constants + +import MySQLdb + +import base_sharding +import environment +import tablet +import utils + +# initial shard, covers everything +ks1_shard_master = tablet.Tablet(vt_dba_passwd='VtDbaPass') +ks1_shard_replica = tablet.Tablet(vt_dba_passwd='VtDbaPass') +ks1_shard_rdonly1 = tablet.Tablet(vt_dba_passwd='VtDbaPass') + +# split shards +# range '' - 80 +ks1_shard_0_master = tablet.Tablet(vt_dba_passwd='VtDbaPass') +ks1_shard_0_replica = tablet.Tablet(vt_dba_passwd='VtDbaPass') +ks1_shard_0_rdonly1 = tablet.Tablet(vt_dba_passwd='VtDbaPass') +# range 80 - '' +ks1_shard_1_master = tablet.Tablet(vt_dba_passwd='VtDbaPass') +ks1_shard_1_replica = tablet.Tablet(vt_dba_passwd='VtDbaPass') +ks1_shard_1_rdonly1 = tablet.Tablet(vt_dba_passwd='VtDbaPass') + +ks1_tablets = { + '0': {'master':ks1_shard_master, 'replica':ks1_shard_replica, 'rdonly':ks1_shard_rdonly1}, + '-80': {'master':ks1_shard_0_master, 'replica':ks1_shard_0_replica, 'rdonly':ks1_shard_0_rdonly1}, + '80-': {'master':ks1_shard_1_master, 'replica':ks1_shard_1_replica, 'rdonly':ks1_shard_1_rdonly1} +} + +# initial shard, covers everything +ks2_shard_master = tablet.Tablet(vt_dba_passwd='VtDbaPass') +ks2_shard_replica = tablet.Tablet(vt_dba_passwd='VtDbaPass') +ks2_shard_rdonly1 = tablet.Tablet(vt_dba_passwd='VtDbaPass') + +# split shards +# range '' - 80 +ks2_shard_0_master = tablet.Tablet(vt_dba_passwd='VtDbaPass') +ks2_shard_0_replica = tablet.Tablet(vt_dba_passwd='VtDbaPass') +ks2_shard_0_rdonly1 = tablet.Tablet(vt_dba_passwd='VtDbaPass') +# range 80 - '' +ks2_shard_1_master = tablet.Tablet(vt_dba_passwd='VtDbaPass') +ks2_shard_1_replica = tablet.Tablet(vt_dba_passwd='VtDbaPass') +ks2_shard_1_rdonly1 = tablet.Tablet(vt_dba_passwd='VtDbaPass') + +ks2_tablets = { + '0': {'master':ks2_shard_master, 'replica':ks2_shard_replica, 'rdonly':ks2_shard_rdonly1}, + '-80': {'master':ks2_shard_0_master, 'replica':ks2_shard_0_replica, 'rdonly':ks2_shard_0_rdonly1}, + '80-': {'master':ks2_shard_1_master, 'replica':ks2_shard_1_replica, 'rdonly':ks2_shard_1_rdonly1} +} + +all_mysql_tablets = [ks1_shard_master, ks1_shard_replica, ks1_shard_rdonly1, + ks1_shard_0_master, ks1_shard_0_replica, ks1_shard_0_rdonly1, + ks1_shard_1_master, ks1_shard_1_replica, ks1_shard_1_rdonly1] + +all_other_tablets = [ks2_shard_master, ks2_shard_replica, ks2_shard_rdonly1, + ks2_shard_0_master, ks2_shard_0_replica, ks2_shard_0_rdonly1, + ks2_shard_1_master, ks2_shard_1_replica, ks2_shard_1_rdonly1] + +def setUpModule(): + global new_init_db, db_credentials_file + + try: + # Determine which column is used for user passwords in this MySQL version. + proc = ks1_shard_master.init_mysql() + utils.wait_procs([proc]) + try: + ks1_shard_master.mquery('mysql', 'select password from mysql.user limit 0', + user='root') + password_col = 'password' + except MySQLdb.DatabaseError: + password_col = 'authentication_string' + utils.wait_procs([ks1_shard_master.teardown_mysql()]) + ks1_shard_master.remove_tree(ignore_options=True) + + # Create a new init_db.sql file that sets up passwords for all users. + # Then we use a db-credentials-file with the passwords. + new_init_db = environment.tmproot + '/init_db_with_passwords.sql' + with open(environment.vttop + '/config/init_db.sql') as fd: + init_db = fd.read() + with open(new_init_db, 'w') as fd: + fd.write(init_db) + fd.write(''' +# Set real passwords for all users. +ALTER USER 'root'@'localhost' IDENTIFIED BY 'RootPass'; +ALTER USER 'vt_dba'@'localhost' IDENTIFIED BY 'VtDbaPass'; +ALTER USER 'vt_app'@'localhost' IDENTIFIED BY 'VtAppPass'; +ALTER USER 'vt_allprivs'@'localhost' IDENTIFIED BY 'VtAllPrivsPass'; +ALTER USER 'vt_repl'@'%' IDENTIFIED BY 'VtReplPass'; +ALTER USER 'vt_filtered'@'localhost' IDENTIFIED BY 'VtFilteredPass'; + +# connecting through a port requires 127.0.0.1 +# --host=localhost will connect through socket +CREATE USER 'vt_dba'@'127.0.0.1' IDENTIFIED BY 'VtDbaPass'; +GRANT ALL ON *.* TO 'vt_dba'@'127.0.0.1'; +GRANT GRANT OPTION ON *.* TO 'vt_dba'@'127.0.0.1'; + +# User for app traffic, with global read-write access. +CREATE USER 'vt_app'@'127.0.0.1' IDENTIFIED BY 'VtAppPass'; +GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, FILE, + REFERENCES, INDEX, ALTER, SHOW DATABASES, CREATE TEMPORARY TABLES, + LOCK TABLES, EXECUTE, REPLICATION SLAVE, REPLICATION CLIENT, CREATE VIEW, + SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER + ON *.* TO 'vt_app'@'127.0.0.1'; + +# User for administrative operations that need to be executed as non-SUPER. +# Same permissions as vt_app here. +CREATE USER 'vt_allprivs'@'127.0.0.1' IDENTIFIED BY 'VtAllPrivsPass'; +GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, FILE, + REFERENCES, INDEX, ALTER, SHOW DATABASES, CREATE TEMPORARY TABLES, + LOCK TABLES, EXECUTE, REPLICATION SLAVE, REPLICATION CLIENT, CREATE VIEW, + SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER + ON *.* TO 'vt_allprivs'@'127.0.0.1'; + +# User for Vitess filtered replication (binlog player). +# Same permissions as vt_app. +CREATE USER 'vt_filtered'@'127.0.0.1' IDENTIFIED BY 'VtFilteredPass'; +GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, FILE, + REFERENCES, INDEX, ALTER, SHOW DATABASES, CREATE TEMPORARY TABLES, + LOCK TABLES, EXECUTE, REPLICATION SLAVE, REPLICATION CLIENT, CREATE VIEW, + SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER + ON *.* TO 'vt_filtered'@'127.0.0.1'; + +FLUSH PRIVILEGES; +''') + credentials = { + 'vt_dba': ['VtDbaPass'], + 'vt_app': ['VtAppPass'], + 'vt_allprivs': ['VtAllprivsPass'], + 'vt_repl': ['VtReplPass'], + 'vt_filtered': ['VtFilteredPass'], + } + db_credentials_file = environment.tmproot+'/db_credentials.json' + with open(db_credentials_file, 'w') as fd: + fd.write(json.dumps(credentials)) + + setup_procs = [t.init_mysql(use_rbr=True, init_db=new_init_db, + extra_args=['-db-credentials-file', + db_credentials_file]) for t in all_mysql_tablets] + utils.wait_procs(setup_procs) + for i in range(0, len(all_other_tablets)): + all_other_tablets[i].mysql_port = all_mysql_tablets[i].mysql_port + + environment.topo_server().setup() + + except: + tearDownModule() + raise + + +def tearDownModule(): + utils.required_teardown() + if utils.options.skip_teardown: + return + + teardown_procs = [t.teardown_mysql(extra_args=['-db-credentials-file', db_credentials_file]) for t in all_mysql_tablets] + utils.wait_procs(teardown_procs, raise_on_error=False) + environment.topo_server().teardown() + utils.kill_sub_processes() + utils.remove_tmp_files() + for t in all_mysql_tablets: + t.remove_tree() + for t in all_other_tablets: + t.remove_tree() + + +class TestInitialSharding(unittest.TestCase, base_sharding.BaseShardingTest): + + # create_schema will create the same schema on the keyspace + def _create_schema(self, keyspace): + # Note that the primary key columns are not defined first on purpose to test + # that a reordered column list is correctly used everywhere in vtworker. + create_table_template = '''create table %s( +msg varchar(64), +id bigint not null, +parent_id bigint not null, +primary key (parent_id, id), +index by_msg (msg) +) Engine=InnoDB''' + + utils.run_vtctl(['ApplySchema', + '-sql=' + create_table_template % ('resharding1'), + keyspace], + auto_log=True) + + def _add_sharding_key_to_schema(self, keyspace): + if base_sharding.keyspace_id_type == keyrange_constants.KIT_BYTES: + t = 'varbinary(64)' + else: + t = 'bigint(20) unsigned' + sql = 'alter table %s add custom_ksid_col ' + t + utils.run_vtctl(['ApplySchema', + '-sql=' + sql % ('resharding1'), + keyspace], + auto_log=True) + + def _mark_sharding_key_not_null(self, keyspace): + if base_sharding.keyspace_id_type == keyrange_constants.KIT_BYTES: + t = 'varbinary(64)' + else: + t = 'bigint(20) unsigned' + sql = 'alter table %s modify custom_ksid_col ' + t + ' not null' + utils.run_vtctl(['ApplySchema', + '-sql=' + sql % ('resharding1'), + keyspace], + auto_log=True) + + # _insert_startup_value inserts a value in the MySQL database before it + # is sharded + def _insert_startup_value(self, keyspace, tablet_obj, table, mid, msg): + tablet_obj.mquery('vt_' + keyspace, [ + 'begin', + 'insert into %s(parent_id, id, msg) values(%d, %d, "%s")' % + (table, base_sharding.fixed_parent_id, mid, msg), + 'commit' + ], write=True) + + def _insert_startup_values(self, keyspace, master_tablet): + self._insert_startup_value(keyspace, master_tablet, 'resharding1', 1, 'msg1') + self._insert_startup_value(keyspace, master_tablet, 'resharding1', 2, 'msg2') + self._insert_startup_value(keyspace, master_tablet, 'resharding1', 3, 'msg3') + + def _backfill_keyspace_id(self, keyspace, tablet_obj): + tablet_obj.mquery('vt_' + keyspace, [ + 'begin', + 'update resharding1 set custom_ksid_col=0x1000000000000000 where id=1', + 'update resharding1 set custom_ksid_col=0x9000000000000000 where id=2', + 'update resharding1 set custom_ksid_col=0xD000000000000000 where id=3', + 'commit' + ], write=True) + + def _check_startup_values(self, keyspace, tablets): + # check first value is in the left shard + for t in tablets['-80'].values(): + self._check_value(t, 'resharding1', 1, 'msg1', 0x1000000000000000) + for t in tablets['80-'].values(): + self._check_value(t, 'resharding1', 1, 'msg1', + 0x1000000000000000, should_be_here=False) + + # check second value is in the right shard + for t in tablets['-80'].values(): + self._check_value(t, 'resharding1', 2, 'msg2', 0x9000000000000000, + should_be_here=False) + for t in tablets['80-'].values(): + self._check_value(t, 'resharding1', 2, 'msg2', 0x9000000000000000) + + # check third value is in the right shard too + for t in tablets['-80'].values(): + self._check_value(t, 'resharding1', 3, 'msg3', 0xD000000000000000, + should_be_here=False) + for t in tablets['80-'].values(): + self._check_value(t, 'resharding1', 3, 'msg3', 0xD000000000000000) + + def _insert_lots(self, keyspace, master_tablet, count, base=0): + for i in xrange(count): + self._insert_value(master_tablet, 'resharding1', 10000 + base + i, + 'msg-range1-%d' % i, 0xA000000000000000 + base + i) + self._insert_value(master_tablet, 'resharding1', 20000 + base + i, + 'msg-range2-%d' % i, 0xE000000000000000 + base + i) + + # _check_lots returns how many of the values we have, in percents. + def _check_lots(self, replica_tablet, count, base=0): + found = 0 + for i in xrange(count): + if self._is_value_present_and_correct(replica_tablet, 'resharding1', + 10000 + base + i, 'msg-range1-%d' % + i, 0xA000000000000000 + base + i): + found += 1 + if self._is_value_present_and_correct(replica_tablet, 'resharding1', + 20000 + base + i, 'msg-range2-%d' % + i, 0xE000000000000000 + base + i): + found += 1 + percent = found * 100 / count / 2 + logging.debug('I have %d%% of the data', percent) + return percent + + def _check_lots_timeout(self, replica_tablet, count, threshold, timeout, base=0): + while True: + value = self._check_lots(replica_tablet, count, base=base) + if value >= threshold: + return value + timeout = utils.wait_step('enough data went through', timeout) + + # _check_lots_not_present makes sure no data is in the wrong shard + def _check_lots_not_present(self, replica_tablet, count, base=0): + for i in xrange(count): + self._check_value(replica_tablet, 'resharding1', 10000 + base + i, + 'msg-range1-%d' % i, 0xA000000000000000 + base + i, + should_be_here=False) + self._check_value(replica_tablet, 'resharding1', 20000 + base + i, + 'msg-range2-%d' % i, 0xE000000000000000 + base + i, + should_be_here=False) + + def _test_resharding(self, keyspace, tablet_map, external_mysql=False): + # create the keyspace with just one shard + shard_master = tablet_map['0']['master'] + shard_replica = tablet_map['0']['replica'] + shard_rdonly = tablet_map['0']['rdonly'] + shard_0_master = tablet_map['-80']['master'] + shard_0_replica = tablet_map['-80']['replica'] + shard_0_rdonly = tablet_map['-80']['rdonly'] + shard_1_master = tablet_map['80-']['master'] + shard_1_replica = tablet_map['80-']['replica'] + shard_1_rdonly = tablet_map['80-']['rdonly'] + shard_master.init_tablet( + 'replica', + keyspace=keyspace, + shard='0', + tablet_index=0, + external_mysql=external_mysql, + extra_args=['-db-credentials-file', db_credentials_file]) + shard_replica.init_tablet( + 'replica', + keyspace=keyspace, + shard='0', + tablet_index=1, + external_mysql=external_mysql, + extra_args=['-db-credentials-file', db_credentials_file]) + shard_rdonly.init_tablet( + 'rdonly', + keyspace=keyspace, + shard='0', + tablet_index=2, + external_mysql=external_mysql, + extra_args=['-db-credentials-file', db_credentials_file]) + + for t in [shard_master, shard_replica, shard_rdonly]: + t.create_db('vt_' + keyspace) + + shard_master.start_vttablet(wait_for_state=None, + binlog_use_v3_resharding_mode=False, + supports_backups=False, + extra_args=['-db-credentials-file', db_credentials_file]) + shard_rdonly.start_vttablet(wait_for_state=None, + binlog_use_v3_resharding_mode=False, + supports_backups=False, + extra_args=['-db-credentials-file', db_credentials_file]) + + if not external_mysql: + for t in [shard_master, shard_rdonly]: + t.wait_for_vttablet_state('NOT_SERVING') + + # start replica + shard_replica.start_vttablet(wait_for_state=None, + binlog_use_v3_resharding_mode=False, + supports_backups=False, + extra_args=['-db-credentials-file', db_credentials_file]) + + if not external_mysql: + shard_replica.wait_for_vttablet_state('NOT_SERVING') + + # reparent to make the tablets work + utils.run_vtctl(['InitShardMaster', '-force', keyspace+'/0', + shard_master.tablet_alias], auto_log=True) + + utils.wait_for_tablet_type(shard_replica.tablet_alias, 'replica') + utils.wait_for_tablet_type(shard_rdonly.tablet_alias, 'rdonly') + else: + shard_replica.wait_for_vttablet_state('SERVING') + # default mode is VTCTL_AUTO which makes this command hang + _, stderr = utils.run_vtctl(['TabletExternallyReparented', shard_master.tablet_alias], mode=utils.VTCTL_VTCTL, auto_log=True) + + for t in [shard_master, shard_replica, shard_rdonly]: + t.wait_for_vttablet_state('SERVING') + + # create the tables and add startup values + self._create_schema(keyspace) + self._insert_startup_values(keyspace, shard_master) + + # reload schema on all tablets so we can query them + for t in [shard_master, shard_replica, shard_rdonly]: + utils.run_vtctl(['ReloadSchema', t.tablet_alias], auto_log=True) + + # We must start vtgate after tablets are up, or else wait until 1min refresh + # (that is the tablet_refresh_interval parameter for discovery gateway) + # we want cache_ttl at zero so we re-read the topology for every test query. + + utils.VtGate().start(cache_ttl='0', tablets=[ + shard_master, shard_replica, shard_rdonly]) + utils.vtgate.wait_for_endpoints(keyspace + '.0.master', 1) + utils.vtgate.wait_for_endpoints(keyspace + '.0.replica', 1) + utils.vtgate.wait_for_endpoints(keyspace + '.0.rdonly', 1) + + # check the Map Reduce API works correctly, should use ExecuteShards, + # as we're not sharded yet. + # we have 3 values in the database, asking for 4 splits will get us + # a single query. + sql = 'select id, msg from resharding1' + s = utils.vtgate.split_query(sql, keyspace, 4) + self.assertEqual(len(s), 1) + self.assertEqual(s[0]['shard_part']['shards'][0], '0') + + # change the schema, backfill keyspace_id, and change schema again + self._add_sharding_key_to_schema(keyspace) + self._backfill_keyspace_id(keyspace, shard_master) + self._mark_sharding_key_not_null(keyspace) + + # now we can be a sharded keyspace (and propagate to SrvKeyspace) + utils.run_vtctl(['SetKeyspaceShardingInfo', keyspace, + 'custom_ksid_col', base_sharding.keyspace_id_type]) + utils.run_vtctl(['RebuildKeyspaceGraph', keyspace], + auto_log=True) + + # run a health check on source replica so it responds to discovery + utils.run_vtctl(['RunHealthCheck', shard_replica.tablet_alias]) + + # create the split shards + shard_0_master.init_tablet( + 'replica', + keyspace=keyspace, + shard='-80', + tablet_index=0, + external_mysql=external_mysql, + extra_args=['-db-credentials-file', db_credentials_file]) + shard_0_replica.init_tablet( + 'replica', + keyspace=keyspace, + shard='-80', + tablet_index=1, + external_mysql=external_mysql, + extra_args=['-db-credentials-file', db_credentials_file]) + shard_0_rdonly.init_tablet( + 'rdonly', + keyspace=keyspace, + shard='-80', + tablet_index=2, + external_mysql=external_mysql, + extra_args=['-db-credentials-file', db_credentials_file]) + shard_1_master.init_tablet( + 'replica', + keyspace=keyspace, + shard='80-', + tablet_index=0, + external_mysql=external_mysql, + extra_args=['-db-credentials-file', db_credentials_file]) + shard_1_replica.init_tablet( + 'replica', + keyspace=keyspace, + shard='80-', + tablet_index=1, + external_mysql=external_mysql, + extra_args=['-db-credentials-file', db_credentials_file]) + shard_1_rdonly.init_tablet( + 'rdonly', + keyspace=keyspace, + shard='80-', + tablet_index=2, + external_mysql=external_mysql, + extra_args=['-db-credentials-file', db_credentials_file]) + + for t in [shard_0_master, shard_0_replica, shard_0_rdonly, + shard_1_master, shard_1_replica, shard_1_rdonly]: + t.create_db('vt_' + keyspace) + t.start_vttablet(wait_for_state=None, + binlog_use_v3_resharding_mode=False, + supports_backups=False, + extra_args=['-db-credentials-file', db_credentials_file]) + + sharded_tablets = [shard_0_master, shard_0_replica, shard_0_rdonly, + shard_1_master, shard_1_replica, shard_1_rdonly] + if not external_mysql: + for t in sharded_tablets: + t.wait_for_vttablet_state('NOT_SERVING') + + utils.run_vtctl(['InitShardMaster', '-force', keyspace + '/-80', + shard_0_master.tablet_alias], auto_log=True) + utils.run_vtctl(['InitShardMaster', '-force', keyspace + '/80-', + shard_1_master.tablet_alias], auto_log=True) + + for t in [shard_0_replica, shard_1_replica]: + utils.wait_for_tablet_type(t.tablet_alias, 'replica') + for t in [shard_0_rdonly, shard_1_rdonly]: + utils.wait_for_tablet_type(t.tablet_alias, 'rdonly') + + for t in sharded_tablets: + t.wait_for_vttablet_state('SERVING') + else: + # default mode is VTCTL_AUTO which makes this command hang + _, stderr = utils.run_vtctl(['TabletExternallyReparented', shard_0_master.tablet_alias], mode=utils.VTCTL_VTCTL, auto_log=True) + _, stderr = utils.run_vtctl(['TabletExternallyReparented', shard_1_master.tablet_alias], mode=utils.VTCTL_VTCTL, auto_log=True) + + # must restart vtgate after tablets are up, or else wait until 1min refresh + # we want cache_ttl at zero so we re-read the topology for every test query. + utils.vtgate.kill() + + utils.vtgate = None + utils.VtGate().start(cache_ttl='0', tablets=[ + shard_master, shard_replica, shard_rdonly, + shard_0_master, shard_0_replica, shard_0_rdonly, + shard_1_master, shard_1_replica, shard_1_rdonly]) + var = None + + # Wait for the endpoints, either local or remote. + utils.vtgate.wait_for_endpoints(keyspace + '.0.master', 1, var=var) + utils.vtgate.wait_for_endpoints(keyspace + '.0.replica', 1, var=var) + utils.vtgate.wait_for_endpoints(keyspace + '.0.rdonly', 1, var=var) + utils.vtgate.wait_for_endpoints(keyspace + '.-80.master', 1, var=var) + utils.vtgate.wait_for_endpoints(keyspace + '.-80.replica', 1, var=var) + utils.vtgate.wait_for_endpoints(keyspace + '.-80.rdonly', 1, var=var) + utils.vtgate.wait_for_endpoints(keyspace + '.80-.master', 1, var=var) + utils.vtgate.wait_for_endpoints(keyspace + '.80-.replica', 1, var=var) + utils.vtgate.wait_for_endpoints(keyspace + '.80-.rdonly', 1, var=var) + + # check the Map Reduce API works correctly, should use ExecuteKeyRanges now, + # as we are sharded (with just one shard). + # again, we have 3 values in the database, asking for 4 splits will get us + # a single query. + sql = 'select id, msg from resharding1' + s = utils.vtgate.split_query(sql, keyspace, 4) + self.assertEqual(len(s), 1) + self.assertEqual(s[0]['key_range_part']['keyspace'], keyspace) + # There must be one empty KeyRange which represents the full keyspace. + self.assertEqual(len(s[0]['key_range_part']['key_ranges']), 1) + self.assertEqual(s[0]['key_range_part']['key_ranges'][0], {}) + + utils.check_srv_keyspace('test_nj', keyspace, + 'Partitions(master): -\n' + 'Partitions(rdonly): -\n' + 'Partitions(replica): -\n', + keyspace_id_type=base_sharding.keyspace_id_type, + sharding_column_name='custom_ksid_col') + + # we need to create the schema, and the worker will do data copying + for keyspace_shard in (keyspace + '/-80', keyspace + '/80-'): + utils.run_vtctl(['CopySchemaShard', + '--exclude_tables', 'unrelated', + shard_rdonly.tablet_alias, + keyspace_shard], + auto_log=True) + utils.run_vtctl(['RunHealthCheck', shard_rdonly.tablet_alias]) + + # Run vtworker as daemon for the following SplitClone commands. + worker_proc, worker_port, worker_rpc_port = utils.run_vtworker_bg( + ['--cell', 'test_nj', '--command_display_interval', '10ms', + '--use_v3_resharding_mode=false'], + auto_log=True) + + # Initial clone (online). + workerclient_proc = utils.run_vtworker_client_bg( + ['SplitClone', + '--offline=false', + '--exclude_tables', 'unrelated', + '--chunk_count', '10', + '--min_rows_per_chunk', '1', + '--min_healthy_rdonly_tablets', '1', + keyspace + '/0'], + worker_rpc_port) + utils.wait_procs([workerclient_proc]) + self.verify_reconciliation_counters(worker_port, 'Online', 'resharding1', + 3, 0, 0, 0) + + # Reset vtworker such that we can run the next command. + workerclient_proc = utils.run_vtworker_client_bg(['Reset'], worker_rpc_port) + utils.wait_procs([workerclient_proc]) + + # Modify the destination shard. SplitClone will revert the changes. + # Delete row 1 (provokes an insert). + shard_0_master.mquery('vt_' + keyspace, + 'delete from resharding1 where id=1', write=True) + # Delete row 2 (provokes an insert). + shard_1_master.mquery('vt_' + keyspace, + 'delete from resharding1 where id=2', write=True) + # Update row 3 (provokes an update). + shard_1_master.mquery('vt_' + keyspace, + "update resharding1 set msg='msg-not-3' where id=3", + write=True) + # Insert row 4 (provokes a delete). + self._insert_value(shard_1_master, 'resharding1', 4, 'msg4', + 0xD000000000000000) + + workerclient_proc = utils.run_vtworker_client_bg( + ['SplitClone', + '--exclude_tables', 'unrelated', + '--chunk_count', '10', + '--min_rows_per_chunk', '1', + '--min_healthy_rdonly_tablets', '1', + keyspace + '/0'], + worker_rpc_port) + utils.wait_procs([workerclient_proc]) + self.verify_reconciliation_counters(worker_port, 'Online', 'resharding1', + 2, 1, 1, 0) + self.verify_reconciliation_counters(worker_port, 'Offline', 'resharding1', + 0, 0, 0, 3) + # Terminate worker daemon because it is no longer needed. + utils.kill_sub_process(worker_proc, soft=True) + + # check the startup values are in the right place + self._check_startup_values(keyspace, tablet_map) + + # check the schema too + utils.run_vtctl(['ValidateSchemaKeyspace', keyspace], auto_log=True) + + # check the binlog players are running + logging.debug('Waiting for binlog players to start on new masters...') + self.check_destination_master(shard_0_master, [keyspace + '/0']) + self.check_destination_master(shard_1_master, [keyspace + '/0']) + + # check that binlog server exported the stats vars + self.check_binlog_server_vars(shard_replica, horizontal=True) + + # testing filtered replication: insert a bunch of data on shard 1, + # check we get most of it after a few seconds, wait for binlog server + # timeout, check we get all of it. + logging.debug('Inserting lots of data on source shard') + self._insert_lots(keyspace, shard_master, 1000) + logging.debug('Checking 80 percent of data is sent quickly') + v = self._check_lots_timeout(shard_1_replica, 1000, 80, 5) + if v != 100: + logging.debug('Checking all data goes through eventually') + self._check_lots_timeout(shard_1_replica, 1000, 100, 20) + logging.debug('Checking no data was sent the wrong way') + self._check_lots_not_present(shard_0_replica, 1000) + self.check_binlog_player_vars(shard_0_master, [keyspace + '/0'], + seconds_behind_master_max=30) + self.check_binlog_player_vars(shard_1_master, [keyspace + '/0'], + seconds_behind_master_max=30) + self.check_binlog_server_vars(shard_replica, horizontal=True, + min_statements=1000, min_transactions=1000) + + # use vtworker to compare the data + for t in [shard_0_rdonly, shard_1_rdonly]: + utils.run_vtctl(['RunHealthCheck', t.tablet_alias]) + + # get status for the destination master tablet, make sure we have it all + if not external_mysql: + self.check_running_binlog_player(shard_0_master, 2000, 2000) + self.check_running_binlog_player(shard_1_master, 6000, 2000) + else: + self.check_running_binlog_player(shard_0_master, 2002, 2002) + self.check_running_binlog_player(shard_1_master, 6002, 2002) + + + # check we can't migrate the master just yet + utils.run_vtctl(['MigrateServedTypes', keyspace + '/0', 'master'], + expect_fail=True) + + # now serve rdonly from the split shards + utils.run_vtctl(['MigrateServedTypes', keyspace + '/0', 'rdonly'], + auto_log=True) + utils.check_srv_keyspace('test_nj', keyspace, + 'Partitions(master): -\n' + 'Partitions(rdonly): -80 80-\n' + 'Partitions(replica): -\n', + keyspace_id_type=base_sharding.keyspace_id_type, + sharding_column_name='custom_ksid_col') + + # make sure rdonly tablets are back to serving before hitting vtgate. + for t in [shard_0_rdonly, shard_1_rdonly]: + t.wait_for_vttablet_state('SERVING') + + utils.vtgate.wait_for_endpoints(keyspace + '.-80.rdonly', 1) + utils.vtgate.wait_for_endpoints(keyspace + '.80-.rdonly', 1) + + # check the Map Reduce API works correctly, should use ExecuteKeyRanges + # on both destination shards now. + # we ask for 2 splits to only have one per shard + sql = 'select id, msg from resharding1' + timeout = 10.0 + while True: + try: + s = utils.vtgate.split_query(sql, keyspace, 2) + break + except Exception: # pylint: disable=broad-except + timeout = utils.wait_step( + 'vtgate executes split_query properly', timeout) + self.assertEqual(len(s), 2) + self.assertEqual(s[0]['key_range_part']['keyspace'], keyspace) + self.assertEqual(s[1]['key_range_part']['keyspace'], keyspace) + self.assertEqual(len(s[0]['key_range_part']['key_ranges']), 1) + self.assertEqual(len(s[1]['key_range_part']['key_ranges']), 1) + + # then serve replica from the split shards + source_tablet = shard_replica + destination_tablets = [shard_0_replica, shard_1_replica] + + utils.run_vtctl( + ['MigrateServedTypes', keyspace + '/0', 'replica'], auto_log=True) + utils.check_srv_keyspace('test_nj', keyspace, + 'Partitions(master): -\n' + 'Partitions(rdonly): -80 80-\n' + 'Partitions(replica): -80 80-\n', + keyspace_id_type=base_sharding.keyspace_id_type, + sharding_column_name='custom_ksid_col') + + # move replica back and forth + utils.run_vtctl( + ['MigrateServedTypes', '-reverse', keyspace + '/0', 'replica'], + auto_log=True) + # After a backwards migration, queryservice should be enabled on + # source and disabled on destinations + utils.check_tablet_query_service(self, source_tablet, True, False) + utils.check_tablet_query_services(self, destination_tablets, False, True) + utils.check_srv_keyspace('test_nj', keyspace, + 'Partitions(master): -\n' + 'Partitions(rdonly): -80 80-\n' + 'Partitions(replica): -\n', + keyspace_id_type=base_sharding.keyspace_id_type, + sharding_column_name='custom_ksid_col') + + utils.run_vtctl(['MigrateServedTypes', keyspace + '/0', 'replica'], + auto_log=True) + # After a forwards migration, queryservice should be disabled on + # source and enabled on destinations + utils.check_tablet_query_service(self, source_tablet, False, True) + utils.check_tablet_query_services(self, destination_tablets, True, False) + utils.check_srv_keyspace('test_nj', keyspace, + 'Partitions(master): -\n' + 'Partitions(rdonly): -80 80-\n' + 'Partitions(replica): -80 80-\n', + keyspace_id_type=base_sharding.keyspace_id_type, + sharding_column_name='custom_ksid_col') + + # then serve master from the split shards + utils.run_vtctl(['MigrateServedTypes', keyspace + '/0', 'master'], + auto_log=True) + utils.check_srv_keyspace('test_nj', keyspace, + 'Partitions(master): -80 80-\n' + 'Partitions(rdonly): -80 80-\n' + 'Partitions(replica): -80 80-\n', + keyspace_id_type=base_sharding.keyspace_id_type, + sharding_column_name='custom_ksid_col') + + # check the binlog players are gone now + self.check_no_binlog_player(shard_0_master) + self.check_no_binlog_player(shard_1_master) + + def kill_all_tablets(self, keyspace, tablet_map): + shard_master = tablet_map['0']['master'] + shard_replica = tablet_map['0']['replica'] + shard_rdonly = tablet_map['0']['rdonly'] + shard_0_master = tablet_map['-80']['master'] + shard_0_replica = tablet_map['-80']['replica'] + shard_0_rdonly = tablet_map['-80']['rdonly'] + shard_1_master = tablet_map['80-']['master'] + shard_1_replica = tablet_map['80-']['replica'] + shard_1_rdonly = tablet_map['80-']['rdonly'] + + # remove the original tablets in the original shard + tablet.kill_tablets([shard_master, shard_replica, shard_rdonly]) + for t in [shard_replica, shard_rdonly]: + utils.run_vtctl(['DeleteTablet', t.tablet_alias], auto_log=True) + utils.run_vtctl(['DeleteTablet', '-allow_master', + shard_master.tablet_alias], auto_log=True) + + # rebuild the serving graph, all mentions of the old shards should be gone + utils.run_vtctl(['RebuildKeyspaceGraph', keyspace], auto_log=True) + + # delete the original shard + utils.run_vtctl(['DeleteShard', keyspace + '/0'], auto_log=True) + + # kill everything else + tablet.kill_tablets([shard_0_master, shard_0_replica, shard_0_rdonly, + shard_1_master, shard_1_replica, shard_1_rdonly]) + + def test_resharding(self): + self._test_resharding('test_keyspace1', ks1_tablets) + self._test_resharding('test_keyspace2', ks2_tablets, True) + self.kill_all_tablets('test_keyspace1', ks1_tablets) + self.kill_all_tablets('test_keyspace2', ks2_tablets) + +if __name__ == '__main__': + utils.main() diff --git a/test/resharding.py b/test/resharding.py index e0d68c5517c..4bc50021cff 100755 --- a/test/resharding.py +++ b/test/resharding.py @@ -1083,6 +1083,8 @@ def test_resharding(self): # update our test variables to point at the new master shard_2_master, shard_2_replica1 = shard_2_replica1, shard_2_master + utils.pause('check state of _vt.vreplication') + logging.debug('Inserting lots of data on source shard after reparenting') self._insert_lots(3000, base=2000) logging.debug('Checking 80 percent of data was sent fairly quickly') diff --git a/test/tablet.py b/test/tablet.py index 1d64bfbe752..06e80492469 100644 --- a/test/tablet.py +++ b/test/tablet.py @@ -98,7 +98,8 @@ def __init__(self, tablet_uid=None, port=None, mysql_port=None, cell=None, self.shard = None self.index = None self.tablet_index = None - + # default to false + self.external_mysql = False # utility variables self.tablet_alias = 'test_%s-%010d' % (self.cell, self.tablet_uid) self.zk_tablet_path = ( @@ -229,7 +230,6 @@ def remove_tree(self, ignore_options=False): def mysql_connection_parameters(self, dbname, user='vt_dba'): result = dict(user=user, - unix_socket=self.tablet_dir + '/mysql.sock', db=dbname) if user == 'vt_dba' and self.vt_dba_passwd: result['passwd'] = self.vt_dba_passwd @@ -237,6 +237,10 @@ def mysql_connection_parameters(self, dbname, user='vt_dba'): def connect(self, dbname='', user='vt_dba', **params): params.update(self.mysql_connection_parameters(dbname, user)) + if 'port' not in params.keys(): + params['unix_socket']=self.tablet_dir + '/mysql.sock' + else: + params['host']='127.0.0.1' conn = MySQLdb.Connect(**params) return conn, conn.cursor() @@ -257,6 +261,8 @@ def mquery(self, dbname, query, write=False, user='vt_dba', conn_params=None, """ if conn_params is None: conn_params = {} + if self.external_mysql: + conn_params['port']=self.mysql_port conn, cursor = self.connect(dbname, user=user, **conn_params) if write: conn.begin() @@ -381,13 +387,14 @@ def update_addrs(self): def init_tablet(self, tablet_type, keyspace, shard, tablet_index=None, start=False, dbname=None, parent=True, wait_for_start=True, - include_mysql_port=True, **kwargs): + include_mysql_port=True, external_mysql=False, **kwargs): """Initialize a tablet's record in topology.""" self.tablet_type = tablet_type self.keyspace = keyspace self.shard = shard self.tablet_index = tablet_index + self.external_mysql = external_mysql self.dbname = dbname or ('vt_' + self.keyspace) @@ -487,6 +494,10 @@ def start_vttablet( args.extend( ['-mysqlctl_socket', os.path.join(self.tablet_dir, 'mysqlctl.sock')]) + if self.external_mysql: + args.extend(['-db_host', '127.0.0.1']) + args.extend(['-db_port', str(self.mysql_port)]) + args.append('-disable_active_reparents') if full_mycnf_args: # this flag is used to specify all the mycnf_ flags, to make # sure that code works.