Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Online DDL/VReplication: AUTO_INCREMENT support and tests #8223

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func testSingle(t *testing.T, testName string) {
require.Equal(t, string(schema.OnlineDDLStatusComplete), migrationStatus)

if content, exists := readTestFile(t, testName, "expect_table_structure"); exists {
createStatement := getCreateTableStatement(t, tableName)
createStatement := getCreateTableStatement(t, afterTableName)
assert.Contains(t, createStatement, content, "expected SHOW CREATE TABLE to contain text in 'expect_table_structure' file")
}

Expand Down
28 changes: 28 additions & 0 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,27 @@ func (e *Executor) initVreplicationOriginalMigration(ctx context.Context, online
return v, nil
}

// postInitVreplicationOriginalMigration runs extra changes after a vreplication online DDL has been initialized.
// This function is called after both source and target tables have been analyzed, so there's more information
// about the two, and about the transition between the two.
func (e *Executor) postInitVreplicationOriginalMigration(ctx context.Context, v *VRepl, conn *dbconnpool.DBConnection) (err error) {
if v.sourceAutoIncrement > 0 && !v.parser.IsAutoIncrementDefined() {
// Apply ALTER TABLE AUTO_INCREMENT=?
parsed := sqlparser.BuildParsedQuery(sqlAlterTableAutoIncrement, v.targetTable, ":auto_increment")
bindVars := map[string]*querypb.BindVariable{
"auto_increment": sqltypes.Uint64BindVariable(v.sourceAutoIncrement),
}
bound, err := parsed.GenerateQuery(bindVars, nil)
if err != nil {
return err
}
if _, err := conn.ExecuteFetch(bound, 0, false); err != nil {
return err
}
}
return nil
}

func (e *Executor) initVreplicationRevertMigration(ctx context.Context, onlineDDL *schema.OnlineDDL, revertMigration *schema.OnlineDDL) (v *VRepl, err error) {
// Getting here we've already validated that migration is revertible

Expand Down Expand Up @@ -739,6 +760,13 @@ func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schem
if err := v.analyze(ctx, conn); err != nil {
return err
}

if revertMigration == nil {
// Original ALTER TABLE request for vreplication
if err := e.postInitVreplicationOriginalMigration(ctx, v, conn); err != nil {
return err
}
}
if err := e.updateArtifacts(ctx, onlineDDL.UUID, v.targetTable); err != nil {
return err
}
Expand Down
19 changes: 14 additions & 5 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,20 @@ const (
sqlDropTable = "DROP TABLE `%a`"
sqlAlterTableOptions = "ALTER TABLE `%a` %s"
sqlShowColumnsFrom = "SHOW COLUMNS FROM `%a`"
sqlStartVReplStream = "UPDATE _vt.vreplication set state='Running' where db_name=%a and workflow=%a"
sqlStopVReplStream = "UPDATE _vt.vreplication set state='Stopped' where db_name=%a and workflow=%a"
sqlDeleteVReplStream = "DELETE FROM _vt.vreplication where db_name=%a and workflow=%a"
sqlReadVReplStream = `SELECT
sqlGetAutoIncrement = `
SELECT
AUTO_INCREMENT
FROM INFORMATION_SCHEMA.TABLES
WHERE
TABLES.TABLE_SCHEMA=%a
AND TABLES.TABLE_NAME=%a
AND AUTO_INCREMENT IS NOT NULL
`
sqlAlterTableAutoIncrement = "ALTER TABLE `%s` AUTO_INCREMENT=%a"
sqlStartVReplStream = "UPDATE _vt.vreplication set state='Running' where db_name=%a and workflow=%a"
sqlStopVReplStream = "UPDATE _vt.vreplication set state='Stopped' where db_name=%a and workflow=%a"
sqlDeleteVReplStream = "DELETE FROM _vt.vreplication where db_name=%a and workflow=%a"
sqlReadVReplStream = `SELECT
id,
workflow,
source,
Expand All @@ -314,7 +324,6 @@ const (
FROM _vt.vreplication
WHERE
workflow=%a

`
sqlReadCountCopyState = `SELECT
count(*) as cnt
Expand Down
27 changes: 27 additions & 0 deletions go/vt/vttablet/onlineddl/vrepl.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type VRepl struct {
sourceSharedColumns *vrepl.ColumnList
targetSharedColumns *vrepl.ColumnList
sharedColumnsMap map[string]string
sourceAutoIncrement uint64

filterQuery string
bls *binlogdatapb.BinlogSource
Expand Down Expand Up @@ -121,6 +122,27 @@ func (v *VRepl) getCandidateUniqueKeys(ctx context.Context, conn *dbconnpool.DBC
return uniqueKeys, nil
}

// readAutoIncrement reads the AUTO_INCREMENT vlaue, if any, for a give ntable
func (v *VRepl) readAutoIncrement(ctx context.Context, conn *dbconnpool.DBConnection, tableName string) (autoIncrement uint64, err error) {
query, err := sqlparser.ParseAndBind(sqlGetAutoIncrement,
sqltypes.StringBindVariable(v.dbName),
sqltypes.StringBindVariable(tableName),
)
if err != nil {
return 0, err
}

rs, err := conn.ExecuteFetch(query, math.MaxInt64, true)
if err != nil {
return 0, err
}
for _, row := range rs.Named().Rows {
autoIncrement = row.AsUint64("AUTO_INCREMENT", 0)
}

return autoIncrement, nil
}

// readTableColumns reads column list from given table
func (v *VRepl) readTableColumns(ctx context.Context, conn *dbconnpool.DBConnection, tableName string) (columns *vrepl.ColumnList, virtualColumns *vrepl.ColumnList, pkColumns *vrepl.ColumnList, err error) {
parsed := sqlparser.BuildParsedQuery(sqlShowColumnsFrom, tableName)
Expand Down Expand Up @@ -271,6 +293,11 @@ func (v *VRepl) analyzeTables(ctx context.Context, conn *dbconnpool.DBConnection
return fmt.Errorf("Found no shared PRIMARY KEY columns between `%s` and `%s`", v.sourceTable, v.targetTable)
}

v.sourceAutoIncrement, err = v.readAutoIncrement(ctx, conn, v.sourceTable)
if err != nil {
return err
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/onlineddl/vrepl/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var (
renameColumnRegexp = regexp.MustCompile(`(?i)\bchange\s+(column\s+|)([\S]+)\s+([\S]+)\s+`)
dropColumnRegexp = regexp.MustCompile(`(?i)\bdrop\s+(column\s+|)([\S]+)$`)
renameTableRegexp = regexp.MustCompile(`(?i)\brename\s+(to|as)\s+`)
autoIncrementRegexp = regexp.MustCompile(`(?i)\bauto_increment[\s]*=[\s]*([0-9]+)`)
autoIncrementRegexp = regexp.MustCompile(`(?i)\bauto_increment[\s]*[=]?[\s]*([0-9]+)`)
)

// AlterTableParser is a parser tool for ALTER TABLE statements
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/onlineddl/vrepl/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,16 @@ func TestParseAlterStatementWithAutoIncrement(t *testing.T) {
"auto_increment=7",
"auto_increment = 7",
"AUTO_INCREMENT = 71",
"AUTO_INCREMENT 23",
"AUTO_INCREMENT 23",
"add column t int, change ts ts timestamp, auto_increment=7 engine=innodb",
"add column t int, change ts ts timestamp, auto_increment =7 engine=innodb",
"add column t int, change ts ts timestamp, AUTO_INCREMENT = 7 engine=innodb",
"add column t int, change ts ts timestamp, engine=innodb auto_increment=73425",
"add column t int, change ts ts timestamp, engine=innodb, auto_increment=73425",
"add column t int, change ts ts timestamp, engine=innodb, auto_increment 73425",
"add column t int, change ts ts timestamp, engine innodb, auto_increment 73425",
"add column t int, change ts ts timestamp, engine innodb auto_increment 73425",
}
for _, statement := range statements {
parser := NewAlterTableParser()
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
if _, err := dbClient.ExecuteFetch("set names binary", 10000); err != nil {
return err
}
// We must apply AUTO_INCREMENT values precisely as we got them. This include the 0 value, which is not recommended in AUTO_INCREMENT, and yet is valid.
if _, err := dbClient.ExecuteFetch("set @@session.sql_mode = CONCAT(@@session.sql_mode, ',NO_AUTO_VALUE_ON_ZERO')", 10000); err != nil {
return err
}

var vsClient VStreamerClient
var err error
Expand Down