diff --git a/.github/workflows/cluster_endtoend_onlineddl_revert.yml b/.github/workflows/cluster_endtoend_onlineddl_revert.yml new file mode 100644 index 00000000000..6b590b2ab13 --- /dev/null +++ b/.github/workflows/cluster_endtoend_onlineddl_revert.yml @@ -0,0 +1,40 @@ +# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows" + +name: Cluster (onlineddl_revert) +on: [push, pull_request] +jobs: + + build: + name: Run endtoend tests on Cluster (onlineddl_revert) + runs-on: ubuntu-latest + + steps: + - name: Set up Go + uses: actions/setup-go@v1 + with: + go-version: 1.15 + + - name: Check out code + uses: actions/checkout@v2 + + - name: Get dependencies + run: | + sudo apt-get update + sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata + sudo service mysql stop + sudo service etcd stop + sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/ + sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld + go mod download + + wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb + sudo apt-get install -y gnupg2 + sudo dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb + sudo apt-get update + sudo apt-get install percona-xtrabackup-24 + + - name: Run cluster endtoend test + timeout-minutes: 30 + run: | + source build.env + eatmydata -- go run test.go -docker=false -print-log -follow -shard onlineddl_revert diff --git a/go/test/endtoend/cluster/vtctlclient_process.go b/go/test/endtoend/cluster/vtctlclient_process.go index 420656d4ff9..99b68b7cba4 100644 --- a/go/test/endtoend/cluster/vtctlclient_process.go +++ b/go/test/endtoend/cluster/vtctlclient_process.go @@ -122,6 +122,16 @@ func (vtctlclient *VtctlClientProcess) OnlineDDLRetryMigration(Keyspace, uuid st ) } +// OnlineDDLRevertMigration reverts a given migration uuid +func (vtctlclient *VtctlClientProcess) OnlineDDLRevertMigration(Keyspace, uuid string) (result string, err error) { + return vtctlclient.ExecuteCommandWithOutput( + "OnlineDDL", + Keyspace, + "revert", + uuid, + ) +} + // VExec runs a VExec query func (vtctlclient *VtctlClientProcess) VExec(Keyspace, workflow, query string) (result string, err error) { return vtctlclient.ExecuteCommandWithOutput( diff --git a/go/test/endtoend/onlineddl_revert/onlineddl_revert_test.go b/go/test/endtoend/onlineddl_revert/onlineddl_revert_test.go new file mode 100644 index 00000000000..b9542378e75 --- /dev/null +++ b/go/test/endtoend/onlineddl_revert/onlineddl_revert_test.go @@ -0,0 +1,776 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package onlineddl + +import ( + "context" + "flag" + "fmt" + "math/rand" + "os" + "path" + "regexp" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/schema" + + "vitess.io/vitess/go/test/endtoend/cluster" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type WriteMetrics struct { + mu sync.Mutex + insertsAttempts, insertsFailures, insertsNoops, inserts int64 + updatesAttempts, updatesFailures, updatesNoops, updates int64 + deletesAttempts, deletesFailures, deletesNoops, deletes int64 +} + +func (w *WriteMetrics) Clear() { + w.mu.Lock() + defer w.mu.Unlock() + + w.inserts = 0 + w.updates = 0 + w.deletes = 0 + + w.insertsAttempts = 0 + w.insertsFailures = 0 + w.insertsNoops = 0 + + w.updatesAttempts = 0 + w.updatesFailures = 0 + w.updatesNoops = 0 + + w.deletesAttempts = 0 + w.deletesFailures = 0 + w.deletesNoops = 0 +} + +func (w *WriteMetrics) String() string { + return fmt.Sprintf(`WriteMetrics: inserts-deletes=%d, updates-deletes=%d, +insertsAttempts=%d, insertsFailures=%d, insertsNoops=%d, inserts=%d, +updatesAttempts=%d, updatesFailures=%d, updatesNoops=%d, updates=%d, +deletesAttempts=%d, deletesFailures=%d, deletesNoops=%d, deletes=%d, +`, + w.inserts-w.deletes, w.updates-w.deletes, + w.insertsAttempts, w.insertsFailures, w.insertsNoops, w.inserts, + w.updatesAttempts, w.updatesFailures, w.updatesNoops, w.updates, + w.deletesAttempts, w.deletesFailures, w.deletesNoops, w.deletes, + ) +} + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + + hostname = "localhost" + keyspaceName = "ks" + cell = "zone1" + schemaChangeDirectory = "" + tableName = `stress_test` + createStatement = ` + CREATE TABLE stress_test ( + id bigint(20) not null, + rand_val varchar(32) null default '', + hint_col varchar(64) not null default 'just-created', + created_timestamp timestamp not null default current_timestamp, + updates int unsigned not null default 0, + PRIMARY KEY (id), + key created_idx(created_timestamp), + key updates_idx(updates) + ) ENGINE=InnoDB + ` + createIfNotExistsStatement = ` + CREATE TABLE IF NOT EXISTS stress_test ( + id bigint(20) not null, + PRIMARY KEY (id) + ) ENGINE=InnoDB + ` + dropStatement = ` + DROP TABLE stress_test + ` + dropIfExistsStatement = ` + DROP TABLE IF EXISTS stress_test + ` + alterHintStatement = ` + ALTER TABLE stress_test modify hint_col varchar(64) not null default '%s' + ` + insertRowStatement = ` + INSERT IGNORE INTO stress_test (id, rand_val) VALUES (%d, left(md5(rand()), 8)) + ` + updateRowStatement = ` + UPDATE stress_test SET updates=updates+1 WHERE id=%d + ` + deleteRowStatement = ` + DELETE FROM stress_test WHERE id=%d AND updates=1 + ` + // We use CAST(SUM(updates) AS SIGNED) because SUM() returns a DECIMAL datatype, and we want to read a SIGNED INTEGER type + selectCountRowsStatement = ` + SELECT COUNT(*) AS num_rows, CAST(SUM(updates) AS SIGNED) AS sum_updates FROM stress_test + ` + truncateStatement = ` + TRUNCATE TABLE stress_test + ` + writeMetrics WriteMetrics +) + +const ( + maxTableRows = 4096 + maxConcurrency = 5 +) + +func fullWordUUIDRegexp(uuid, searchWord string) *regexp.Regexp { + return regexp.MustCompile(uuid + `.*?\b` + searchWord + `\b`) +} + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitcode, err := func() (int, error) { + clusterInstance = cluster.NewCluster(cell, hostname) + schemaChangeDirectory = path.Join("/tmp", fmt.Sprintf("schema_change_dir_%d", clusterInstance.GetAndReserveTabletUID())) + defer os.RemoveAll(schemaChangeDirectory) + defer clusterInstance.Teardown() + + if _, err := os.Stat(schemaChangeDirectory); os.IsNotExist(err) { + _ = os.Mkdir(schemaChangeDirectory, 0700) + } + + clusterInstance.VtctldExtraArgs = []string{ + "-schema_change_dir", schemaChangeDirectory, + "-schema_change_controller", "local", + "-schema_change_check_interval", "1"} + + clusterInstance.VtTabletExtraArgs = []string{ + "-enable-lag-throttler", + "-throttle_threshold", "1s", + "-heartbeat_enable", + "-heartbeat_interval", "250ms", + "-migration_check_interval", "5s", + } + clusterInstance.VtGateExtraArgs = []string{ + "-ddl_strategy", "online", + } + + if err := clusterInstance.StartTopo(); err != nil { + return 1, err + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + } + + // No need for replicas in this stress test + if err := clusterInstance.StartUnshardedKeyspace(*keyspace, 0, false); err != nil { + return 1, err + } + if err := clusterInstance.StartKeyspace(*keyspace, []string{"1"}, 1, false); err != nil { + return 1, err + } + + vtgateInstance := clusterInstance.NewVtgateInstance() + // set the gateway we want to use + vtgateInstance.GatewayImplementation = "tabletgateway" + // Start vtgate + if err := vtgateInstance.Setup(); err != nil { + return 1, err + } + // ensure it is torn down during cluster TearDown + clusterInstance.VtgateProcess = *vtgateInstance + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + + return m.Run(), nil + }() + if err != nil { + fmt.Printf("%v\n", err) + os.Exit(1) + } else { + os.Exit(exitcode) + } + +} + +func TestSchemaChange(t *testing.T) { + defer cluster.PanicHandler(t) + + var uuids []string + // CREATE + t.Run("CREATE TABLE IF NOT EXISTS where table does not exist", func(t *testing.T) { + // The table does not exist + uuid := testOnlineDDLStatement(t, createIfNotExistsStatement, "online", "vtgate", "") + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, true) + }) + t.Run("revert CREATE TABLE IF NOT EXISTS where did not exist", func(t *testing.T) { + // The table existed, so it will now be dropped (renamed) + uuid := testRevertMigration(t, uuids[len(uuids)-1]) + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, false) + }) + t.Run("revert revert CREATE TABLE IF NOT EXISTS where did not exist", func(t *testing.T) { + // Table was dropped (renamed) so it will now be restored + uuid := testRevertMigration(t, uuids[len(uuids)-1]) + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, true) + }) + t.Run("revert revert revert CREATE TABLE IF NOT EXISTS where did not exist", func(t *testing.T) { + // Table was restored, so it will now be dropped (renamed) + uuid := testRevertMigration(t, uuids[len(uuids)-1]) + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, false) + }) + t.Run("online CREATE TABLE", func(t *testing.T) { + uuid := testOnlineDDLStatement(t, createStatement, "online", "vtgate", "just-created") + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, true) + initTable(t) + testSelectTableMetrics(t) + }) + t.Run("revert CREATE TABLE", func(t *testing.T) { + // This will drop the table (well, actually, rename it away) + uuid := testRevertMigration(t, uuids[len(uuids)-1]) + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, false) + }) + t.Run("revert revert CREATE TABLE", func(t *testing.T) { + // Restore the table. Data should still be in the table! + uuid := testRevertMigration(t, uuids[len(uuids)-1]) + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, true) + testSelectTableMetrics(t) + }) + t.Run("fail revert older change", func(t *testing.T) { + // We shouldn't be able to revert one-before-last succcessful migration. + uuid := testRevertMigration(t, uuids[len(uuids)-2]) + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusFailed) + }) + t.Run("CREATE TABLE IF NOT EXISTS where table exists", func(t *testing.T) { + // The table exists. A noop. + uuid := testOnlineDDLStatement(t, createIfNotExistsStatement, "online", "vtgate", "") + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, true) + }) + t.Run("revert CREATE TABLE IF NOT EXISTS where table existed", func(t *testing.T) { + // Since the table already existed, thus not created by the reverts migration, + // we expect to _not_ drop it in this revert. A noop. + uuid := testRevertMigration(t, uuids[len(uuids)-1]) + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, true) + }) + t.Run("revert revert CREATE TABLE IF NOT EXISTS where table existed", func(t *testing.T) { + // Table was not dropped, thus isn't re-created, and it just still exists. A noop. + uuid := testRevertMigration(t, uuids[len(uuids)-1]) + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, true) + }) + t.Run("fail online CREATE TABLE", func(t *testing.T) { + // Table already exists + uuid := testOnlineDDLStatement(t, createStatement, "online", "vtgate", "just-created") + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusFailed) + checkTable(t, tableName, true) + }) + + // ALTER + + // Run two ALTER TABLE statements. + // These tests are similar to `onlineddl_vrepl_stress` endtond tests. + // If they fail, it has nothing to do with revert. + // We run these tests because we expect their functionality to work in the next step. + var alterHints []string + for i := 0; i < 2; i++ { + testName := fmt.Sprintf("online ALTER TABLE %d", i) + hint := fmt.Sprintf("hint-alter-%d", i) + alterHints = append(alterHints, hint) + t.Run(testName, func(t *testing.T) { + // One alter. We're not going to revert it. + // This specific test is similar to `onlineddl_vrepl_stress` endtond tests. + // If it fails, it has nothing to do with revert. + // We run this test because we expect its functionality to work in the next step. + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + runMultipleConnections(ctx, t) + }() + uuid := testOnlineDDLStatement(t, fmt.Sprintf(alterHintStatement, hint), "online", "vtgate", hint) + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + cancel() // will cause runMultipleConnections() to terminate + wg.Wait() + testSelectTableMetrics(t) + }) + } + t.Run("revert ALTER TABLE", func(t *testing.T) { + // This reverts the last ALTER TABLE. + // And we run traffic on the table during the revert + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + runMultipleConnections(ctx, t) + }() + uuid := testRevertMigration(t, uuids[len(uuids)-1]) + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + cancel() // will cause runMultipleConnections() to terminate + wg.Wait() + checkMigratedTable(t, tableName, alterHints[0]) + testSelectTableMetrics(t) + }) + t.Run("revert revert ALTER TABLE", func(t *testing.T) { + // This reverts the last revert (reapplying the last ALTER TABLE). + // And we run traffic on the table during the revert + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + runMultipleConnections(ctx, t) + }() + uuid := testRevertMigration(t, uuids[len(uuids)-1]) + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + cancel() // will cause runMultipleConnections() to terminate + wg.Wait() + checkMigratedTable(t, tableName, alterHints[1]) + testSelectTableMetrics(t) + }) + t.Run("revert revert revert ALTER TABLE", func(t *testing.T) { + // For good measure, let's verify that revert-revert-revert works... + // So this again pulls us back to first ALTER + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + runMultipleConnections(ctx, t) + }() + uuid := testRevertMigration(t, uuids[len(uuids)-1]) + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + cancel() // will cause runMultipleConnections() to terminate + wg.Wait() + checkMigratedTable(t, tableName, alterHints[0]) + testSelectTableMetrics(t) + }) + + // DROP + t.Run("online DROP TABLE", func(t *testing.T) { + uuid := testOnlineDDLStatement(t, dropStatement, "online", "vtgate", "") + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, false) + }) + t.Run("revert DROP TABLE", func(t *testing.T) { + // This will recreate the table (well, actually, rename it back into place) + uuid := testRevertMigration(t, uuids[len(uuids)-1]) + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, true) + testSelectTableMetrics(t) + }) + t.Run("revert revert DROP TABLE", func(t *testing.T) { + // This will reapply DROP TABLE + uuid := testRevertMigration(t, uuids[len(uuids)-1]) + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, false) + }) + + // DROP IF EXISTS + t.Run("online DROP TABLE IF EXISTS", func(t *testing.T) { + // The table doesn't actually exist right now + uuid := testOnlineDDLStatement(t, dropIfExistsStatement, "online", "vtgate", "") + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, false) + }) + t.Run("revert DROP TABLE IF EXISTS", func(t *testing.T) { + // Table will not be recreated because it didn't exist during the DROP TABLE IF EXISTS + uuid := testRevertMigration(t, uuids[len(uuids)-1]) + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, false) + }) + t.Run("revert revert DROP TABLE IF EXISTS", func(t *testing.T) { + // Table still does not exist + uuid := testRevertMigration(t, uuids[len(uuids)-1]) + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, false) + }) + t.Run("revert revert revert DROP TABLE IF EXISTS", func(t *testing.T) { + // Table still does not exist + uuid := testRevertMigration(t, uuids[len(uuids)-1]) + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, false) + }) + + // FAILURES + t.Run("fail online DROP TABLE", func(t *testing.T) { + // The table does not exist now + uuid := testOnlineDDLStatement(t, dropStatement, "online", "vtgate", "") + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusFailed) + checkTable(t, tableName, false) + }) + t.Run("fail revert failed online DROP TABLE", func(t *testing.T) { + // Cannot revert a failed migration + uuid := testRevertMigration(t, uuids[len(uuids)-1]) + uuids = append(uuids, uuid) + checkRecentMigrations(t, uuid, schema.OnlineDDLStatusFailed) + checkTable(t, tableName, false) + }) +} + +// testOnlineDDLStatement runs an online DDL, ALTER statement +func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy string, executeStrategy string, expectHint string) (uuid string) { + if executeStrategy == "vtgate" { + row := vtgateExec(t, ddlStrategy, alterStatement, "").Named().Row() + if row != nil { + uuid = row.AsString("uuid", "") + } + } else { + var err error + uuid, err = clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, alterStatement, ddlStrategy) + assert.NoError(t, err) + } + uuid = strings.TrimSpace(uuid) + fmt.Println("# Generated UUID (for debug purposes):") + fmt.Printf("<%s>\n", uuid) + + strategy, _, err := schema.ParseDDLStrategy(ddlStrategy) + assert.NoError(t, err) + + if !strategy.IsDirect() { + time.Sleep(time.Second * 20) + } + + if expectHint != "" { + checkMigratedTable(t, tableName, expectHint) + } + return uuid +} + +// testRevertMigration reverts a given migration +func testRevertMigration(t *testing.T, revertUUID string) (uuid string) { + uuid, err := clusterInstance.VtctlclientProcess.OnlineDDLRevertMigration(keyspaceName, revertUUID) + assert.NoError(t, err) + + uuid = strings.TrimSpace(uuid) + fmt.Println("# Generated UUID (for debug purposes):") + fmt.Printf("<%s>\n", uuid) + + time.Sleep(time.Second * 20) + return uuid +} + +// checkTable checks the number of tables in the first two shards. +func checkTable(t *testing.T, showTableName string, expectExists bool) bool { + expectCount := 0 + if expectExists { + expectCount = 1 + } + for i := range clusterInstance.Keyspaces[0].Shards { + if !checkTablesCount(t, clusterInstance.Keyspaces[0].Shards[i].Vttablets[0], showTableName, expectCount) { + return false + } + } + return true +} + +// checkTablesCount checks the number of tables in the given tablet +func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName string, expectCount int) bool { + query := fmt.Sprintf(`show tables like '%%%s%%';`, showTableName) + queryResult, err := tablet.VttabletProcess.QueryTablet(query, keyspaceName, true) + require.Nil(t, err) + return assert.Equal(t, expectCount, len(queryResult.Rows)) +} + +// checkRecentMigrations checks 'OnlineDDL show recent' output. Example to such output: +// +------------------+-------+--------------+-------------+--------------------------------------+----------+---------------------+---------------------+------------------+ +// | Tablet | shard | mysql_schema | mysql_table | migration_uuid | strategy | started_timestamp | completed_timestamp | migration_status | +// +------------------+-------+--------------+-------------+--------------------------------------+----------+---------------------+---------------------+------------------+ +// | zone1-0000003880 | 0 | vt_ks | stress_test | a0638f6b_ec7b_11ea_9bf8_000d3a9b8a9a | online | 2020-09-01 17:50:40 | 2020-09-01 17:50:41 | complete | +// | zone1-0000003884 | 1 | vt_ks | stress_test | a0638f6b_ec7b_11ea_9bf8_000d3a9b8a9a | online | 2020-09-01 17:50:40 | 2020-09-01 17:50:41 | complete | +// +------------------+-------+--------------+-------------+--------------------------------------+----------+---------------------+---------------------+------------------+ + +func checkRecentMigrations(t *testing.T, uuid string, expectStatus schema.OnlineDDLStatus) { + result, err := clusterInstance.VtctlclientProcess.OnlineDDLShowRecent(keyspaceName) + assert.NoError(t, err) + fmt.Println("# 'vtctlclient OnlineDDL show recent' output (for debug purposes):") + fmt.Println(result) + assert.Equal(t, len(clusterInstance.Keyspaces[0].Shards), strings.Count(result, uuid)) + // We ensure "full word" regexp because some column names may conflict + expectStatusRegexp := fullWordUUIDRegexp(uuid, string(expectStatus)) + m := expectStatusRegexp.FindAllString(result, -1) + assert.Equal(t, len(clusterInstance.Keyspaces[0].Shards), len(m)) + + result, err = clusterInstance.VtctlclientProcess.VExec(keyspaceName, uuid, `select migration_status, message from _vt.schema_migrations`) + assert.NoError(t, err) + fmt.Println("# 'vtctlclient VExec' output (for debug purposes):") + fmt.Println(result) +} + +// checkMigratedTables checks the CREATE STATEMENT of a table after migration +func checkMigratedTable(t *testing.T, tableName, expectHint string) { + for i := range clusterInstance.Keyspaces[0].Shards { + createStatement := getCreateTableStatement(t, clusterInstance.Keyspaces[0].Shards[i].Vttablets[0], tableName) + assert.Contains(t, createStatement, expectHint) + } +} + +// getCreateTableStatement returns the CREATE TABLE statement for a given table +func getCreateTableStatement(t *testing.T, tablet *cluster.Vttablet, tableName string) (statement string) { + queryResult, err := tablet.VttabletProcess.QueryTablet(fmt.Sprintf("show create table %s;", tableName), keyspaceName, true) + require.Nil(t, err) + + assert.Equal(t, len(queryResult.Rows), 1) + assert.Equal(t, len(queryResult.Rows[0]), 2) // table name, create statement + statement = queryResult.Rows[0][1].ToString() + return statement +} + +func generateInsert(t *testing.T, conn *mysql.Conn) error { + id := rand.Int31n(int32(maxTableRows)) + query := fmt.Sprintf(insertRowStatement, id) + qr, err := conn.ExecuteFetch(query, 1000, true) + + func() { + writeMetrics.mu.Lock() + defer writeMetrics.mu.Unlock() + + writeMetrics.insertsAttempts++ + if err != nil { + writeMetrics.insertsFailures++ + return + } + assert.Less(t, qr.RowsAffected, uint64(2)) + if qr.RowsAffected == 0 { + writeMetrics.insertsNoops++ + return + } + writeMetrics.inserts++ + }() + return err +} + +func generateUpdate(t *testing.T, conn *mysql.Conn) error { + id := rand.Int31n(int32(maxTableRows)) + query := fmt.Sprintf(updateRowStatement, id) + qr, err := conn.ExecuteFetch(query, 1000, true) + + func() { + writeMetrics.mu.Lock() + defer writeMetrics.mu.Unlock() + + writeMetrics.updatesAttempts++ + if err != nil { + writeMetrics.updatesFailures++ + return + } + assert.Less(t, qr.RowsAffected, uint64(2)) + if qr.RowsAffected == 0 { + writeMetrics.updatesNoops++ + return + } + writeMetrics.updates++ + }() + return err +} + +func generateDelete(t *testing.T, conn *mysql.Conn) error { + id := rand.Int31n(int32(maxTableRows)) + query := fmt.Sprintf(deleteRowStatement, id) + qr, err := conn.ExecuteFetch(query, 1000, true) + + func() { + writeMetrics.mu.Lock() + defer writeMetrics.mu.Unlock() + + writeMetrics.deletesAttempts++ + if err != nil { + writeMetrics.deletesFailures++ + return + } + assert.Less(t, qr.RowsAffected, uint64(2)) + if qr.RowsAffected == 0 { + writeMetrics.deletesNoops++ + return + } + writeMetrics.deletes++ + }() + return err +} + +func runSingleConnection(ctx context.Context, t *testing.T, done *int64) { + log.Infof("Running single connection") + conn, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn.Close() + + _, err = conn.ExecuteFetch("set autocommit=1", 1000, true) + require.Nil(t, err) + _, err = conn.ExecuteFetch("set transaction isolation level read committed", 1000, true) + require.Nil(t, err) + + for { + if atomic.LoadInt64(done) == 1 { + log.Infof("Terminating single connection") + return + } + switch rand.Int31n(3) { + case 0: + err = generateInsert(t, conn) + case 1: + err = generateUpdate(t, conn) + case 2: + err = generateDelete(t, conn) + } + if err != nil { + if strings.Contains(err.Error(), "disallowed due to rule: enforce blacklisted tables") { + err = nil + } + } + assert.Nil(t, err) + time.Sleep(10 * time.Millisecond) + } +} + +func runMultipleConnections(ctx context.Context, t *testing.T) { + log.Infof("Running multiple connections") + + require.True(t, checkTable(t, tableName, true)) + var done int64 + var wg sync.WaitGroup + for i := 0; i < maxConcurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + runSingleConnection(ctx, t, &done) + }() + } + <-ctx.Done() + atomic.StoreInt64(&done, 1) + log.Infof("Running multiple connections: done") + wg.Wait() + log.Infof("All connections cancelled") +} + +func initTable(t *testing.T) { + log.Infof("initTable begin") + defer log.Infof("initTable complete") + + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn.Close() + + writeMetrics.Clear() + _, err = conn.ExecuteFetch(truncateStatement, 1000, true) + require.Nil(t, err) + + for i := 0; i < maxTableRows/2; i++ { + generateInsert(t, conn) + } + for i := 0; i < maxTableRows/4; i++ { + generateUpdate(t, conn) + } + for i := 0; i < maxTableRows/4; i++ { + generateDelete(t, conn) + } +} + +func testSelectTableMetrics(t *testing.T) { + writeMetrics.mu.Lock() + defer writeMetrics.mu.Unlock() + + log.Infof("%s", writeMetrics.String()) + + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn.Close() + + rs, err := conn.ExecuteFetch(selectCountRowsStatement, 1000, true) + require.Nil(t, err) + + row := rs.Named().Row() + require.NotNil(t, row) + log.Infof("testSelectTableMetrics, row: %v", row) + numRows := row.AsInt64("num_rows", 0) + sumUpdates := row.AsInt64("sum_updates", 0) + + assert.NotZero(t, numRows) + assert.NotZero(t, sumUpdates) + assert.NotZero(t, writeMetrics.inserts) + assert.NotZero(t, writeMetrics.deletes) + assert.NotZero(t, writeMetrics.updates) + assert.Equal(t, writeMetrics.inserts-writeMetrics.deletes, numRows) + assert.Equal(t, writeMetrics.updates-writeMetrics.deletes, sumUpdates) // because we DELETE WHERE updates=1 +} + +func vtgateExec(t *testing.T, ddlStrategy string, query string, expectError string) *sqltypes.Result { + t.Helper() + + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn.Close() + + setSession := fmt.Sprintf("set @@ddl_strategy='%s'", ddlStrategy) + _, err = conn.ExecuteFetch(setSession, 1000, true) + assert.NoError(t, err) + + qr, err := conn.ExecuteFetch(query, 1000, true) + if expectError == "" { + require.NoError(t, err) + } else { + require.Error(t, err, "error should not be nil") + assert.Contains(t, err.Error(), expectError, "Unexpected error") + } + return qr +} diff --git a/go/vt/schema/online_ddl.go b/go/vt/schema/online_ddl.go index 4ee42565367..0cad7c3e3a3 100644 --- a/go/vt/schema/online_ddl.go +++ b/go/vt/schema/online_ddl.go @@ -34,10 +34,12 @@ var ( strategyParserRegexp = regexp.MustCompile(`^([\S]+)\s+(.*)$`) onlineDDLGeneratedTableNameRegexp = regexp.MustCompile(`^_[0-f]{8}_[0-f]{4}_[0-f]{4}_[0-f]{4}_[0-f]{12}_([0-9]{14})_(gho|ghc|del|new|vrepl)$`) ptOSCGeneratedTableNameRegexp = regexp.MustCompile(`^_.*_old$`) + revertStatementRegexp = regexp.MustCompile(`(?i)^revert\s+(.*)$`) ) const ( SchemaMigrationsTableName = "schema_migrations" + RevertActionStr = "revert" ) // MigrationBasePath is the root for all schema migration entries @@ -208,25 +210,41 @@ func (onlineDDL *OnlineDDL) ToJSON() ([]byte, error) { // GetAction extracts the DDL action type from the online DDL statement func (onlineDDL *OnlineDDL) GetAction() (action sqlparser.DDLAction, err error) { + if revertStatementRegexp.MatchString(onlineDDL.SQL) { + return sqlparser.RevertDDLAction, nil + } + _, action, err = ParseOnlineDDLStatement(onlineDDL.SQL) return action, err } // GetActionStr returns a string representation of the DDL action -func (onlineDDL *OnlineDDL) GetActionStr() (actionStr string, err error) { - action, err := onlineDDL.GetAction() +func (onlineDDL *OnlineDDL) GetActionStr() (action sqlparser.DDLAction, actionStr string, err error) { + action, err = onlineDDL.GetAction() if err != nil { - return actionStr, err + return action, actionStr, err } switch action { + case sqlparser.RevertDDLAction: + return action, RevertActionStr, nil case sqlparser.CreateDDLAction: - return sqlparser.CreateStr, nil + return action, sqlparser.CreateStr, nil case sqlparser.AlterDDLAction: - return sqlparser.AlterStr, nil + return action, sqlparser.AlterStr, nil case sqlparser.DropDDLAction: - return sqlparser.DropStr, nil + return action, sqlparser.DropStr, nil + } + return action, "", fmt.Errorf("Unsupported online DDL action. SQL=%s", onlineDDL.SQL) +} + +// GetRevertUUID works when this migration is a revert for another migration. It returns the UUID +// fo the reverted migration. +// The function returns error when this is not a revert migration. +func (onlineDDL *OnlineDDL) GetRevertUUID() (uuid string, err error) { + if submatch := revertStatementRegexp.FindStringSubmatch(onlineDDL.SQL); len(submatch) > 0 { + return submatch[1], nil } - return "", fmt.Errorf("Unsupported online DDL action. SQL=%s", onlineDDL.SQL) + return "", fmt.Errorf("Not a Revert DDL: '%s'", onlineDDL.SQL) } // ToString returns a simple string representation of this instance diff --git a/go/vt/schema/online_ddl_test.go b/go/vt/schema/online_ddl_test.go index 4ea6b6e79b4..642ebd7b926 100644 --- a/go/vt/schema/online_ddl_test.go +++ b/go/vt/schema/online_ddl_test.go @@ -114,7 +114,6 @@ func TestGetGCUUID(t *testing.T) { } assert.Equal(t, count, len(uuids)) } - func TestGetActionStr(t *testing.T) { tt := []struct { statement string @@ -139,14 +138,16 @@ func TestGetActionStr(t *testing.T) { }, } for _, ts := range tt { - onlineDDL := &OnlineDDL{SQL: ts.statement} - actionStr, err := onlineDDL.GetActionStr() - if ts.isError { - assert.Error(t, err) - } else { - assert.NoError(t, err) - assert.Equal(t, actionStr, ts.actionStr) - } + t.Run(ts.statement, func(t *testing.T) { + onlineDDL := &OnlineDDL{SQL: ts.statement} + _, actionStr, err := onlineDDL.GetActionStr() + if ts.isError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, actionStr, ts.actionStr) + } + }) } } @@ -177,3 +178,40 @@ func TestIsOnlineDDLTableName(t *testing.T) { assert.False(t, IsOnlineDDLTableName(tableName)) } } + +func TestGetRevertUUID(t *testing.T) { + tt := []struct { + statement string + uuid string + isError bool + }{ + { + statement: "revert 4e5dcf80_354b_11eb_82cd_f875a4d24e90_20201203114014", + uuid: "4e5dcf80_354b_11eb_82cd_f875a4d24e90_20201203114014", + }, + { + statement: "REVERT 4e5dcf80_354b_11eb_82cd_f875a4d24e90_20201203114014", + uuid: "4e5dcf80_354b_11eb_82cd_f875a4d24e90_20201203114014", + }, + { + statement: "REVERT", + isError: true, + }, + { + statement: "alter table t drop column c", + isError: true, + }, + } + for _, ts := range tt { + t.Run(ts.statement, func(t *testing.T) { + onlineDDL := &OnlineDDL{SQL: ts.statement} + uuid, err := onlineDDL.GetRevertUUID() + if ts.isError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, uuid, ts.uuid) + } + }) + } +} diff --git a/go/vt/schema/tablegc.go b/go/vt/schema/tablegc.go index 61b2e247447..80dbb6b773a 100644 --- a/go/vt/schema/tablegc.go +++ b/go/vt/schema/tablegc.go @@ -76,6 +76,11 @@ func generateGCTableName(state TableGCState, uuid string, t time.Time) (tableNam return fmt.Sprintf("_vt_%s_%s_%s", state, uuid, timestamp), nil } +// GenerateGCTableName creates a GC table name, based on desired state and time, and with random UUID +func GenerateGCTableName(state TableGCState, t time.Time) (tableName string, err error) { + return generateGCTableName(state, "", t) +} + // AnalyzeGCTableName analyzes a given table name to see if it's a GC table, and if so, parse out // its state, uuid, and timestamp func AnalyzeGCTableName(tableName string) (isGCTable bool, state TableGCState, uuid string, t time.Time, err error) { diff --git a/go/vt/sqlparser/ast.go b/go/vt/sqlparser/ast.go index a146d4b792d..43e239c97e7 100644 --- a/go/vt/sqlparser/ast.go +++ b/go/vt/sqlparser/ast.go @@ -63,6 +63,7 @@ type ( GetAction() DDLAction GetOptLike() *OptLike GetIfExists() bool + GetIfNotExists() bool GetTableSpec() *TableSpec GetFromTables() TableNames GetToTables() TableNames @@ -880,6 +881,46 @@ func (node *DropView) GetIfExists() bool { return node.IfExists } +// GetIfNotExists implements the DDLStatement interface +func (node *RenameTable) GetIfNotExists() bool { + return false +} + +// GetIfNotExists implements the DDLStatement interface +func (node *CreateTable) GetIfNotExists() bool { + return node.IfNotExists +} + +// GetIfNotExists implements the DDLStatement interface +func (node *TruncateTable) GetIfNotExists() bool { + return false +} + +// GetIfNotExists implements the DDLStatement interface +func (node *AlterTable) GetIfNotExists() bool { + return false +} + +// GetIfNotExists implements the DDLStatement interface +func (node *CreateView) GetIfNotExists() bool { + return false +} + +// GetIfNotExists implements the DDLStatement interface +func (node *AlterView) GetIfNotExists() bool { + return false +} + +// GetIfNotExists implements the DDLStatement interface +func (node *DropTable) GetIfNotExists() bool { + return false +} + +// GetIfNotExists implements the DDLStatement interface +func (node *DropView) GetIfNotExists() bool { + return false +} + // GetTableSpec implements the DDLStatement interface func (node *CreateTable) GetTableSpec() *TableSpec { return node.TableSpec diff --git a/go/vt/sqlparser/constants.go b/go/vt/sqlparser/constants.go index de5c899c8e0..60a237071a6 100644 --- a/go/vt/sqlparser/constants.go +++ b/go/vt/sqlparser/constants.go @@ -284,6 +284,7 @@ const ( DropColVindexDDLAction AddSequenceDDLAction AddAutoIncDDLAction + RevertDDLAction ) // Constants for Enum Type - Scope diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 2e3fcab3bad..5d73db924cf 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -2956,6 +2956,34 @@ func commandOnlineDDL(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag } query = `update _vt.schema_migrations set migration_status='cancel-all'` } + case "revert": + { + if arg == "" { + return fmt.Errorf("UUID required") + } + uuid = arg + contextUUID, err := schema.CreateUUID() + if err != nil { + return err + } + requestContext := fmt.Sprintf("vtctl:%s", contextUUID) + + onlineDDL, err := schema.NewOnlineDDL(keyspace, "", fmt.Sprintf("revert %s", uuid), schema.DDLStrategyOnline, "", requestContext) + if err != nil { + return err + } + conn, err := wr.TopoServer().ConnForCell(ctx, topo.GlobalCell) + if err != nil { + return err + } + err = onlineDDL.WriteTopo(ctx, conn, schema.MigrationRequestsPath()) + if err != nil { + return err + } + wr.Logger().Infof("UUID=%+v", onlineDDL.UUID) + wr.Logger().Printf("%s\n", onlineDDL.UUID) + return nil + } default: return fmt.Errorf("Unknown OnlineDDL command: %s", command) } diff --git a/go/vt/vtctld/schema.go b/go/vt/vtctld/schema.go index 294a3255b2c..ad57158cc7f 100644 --- a/go/vt/vtctld/schema.go +++ b/go/vt/vtctld/schema.go @@ -78,7 +78,7 @@ func reviewMigrationRequest(ctx context.Context, ts *topo.Server, tmClient tmcli if err != nil { return err } - actionStr, err := onlineDDL.GetActionStr() + _, actionStr, err := onlineDDL.GetActionStr() if err != nil { return err } diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 1307660a7eb..67294c3e878 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -98,10 +98,12 @@ var vexecInsertTemplates = []string{ } var emptyResult = &sqltypes.Result{} +var acceptableDropTableIfExistsErrorCodes = []int{mysql.ERCantFindFile, mysql.ERNoSuchTable} var ghostOverridePath = flag.String("gh-ost-path", "", "override default gh-ost binary full path") var ptOSCOverridePath = flag.String("pt-osc-path", "", "override default pt-online-schema-change binary full path") var migrationCheckInterval = flag.Duration("migration_check_interval", 1*time.Minute, "Interval between migration checks") +var retainOnlineDDLTables = flag.Duration("retain_online_ddl_tables", 24*time.Hour, "How long should vttablet keep an old migrated table before purging it") var migrationNextCheckInterval = 5 * time.Second const ( @@ -109,7 +111,6 @@ const ( staleMigrationMinutes = 10 progressPctStarted float64 = 0 progressPctFull float64 = 100.0 - gcHoldHours = 72 databasePoolSize = 3 cutOverThreshold = 3 * time.Second ) @@ -398,13 +399,10 @@ func (e *Executor) parseAlterOptions(ctx context.Context, onlineDDL *schema.Onli } // executeDirectly runs a DDL query directly on the backend MySQL server -func (e *Executor) executeDirectly(ctx context.Context, onlineDDL *schema.OnlineDDL, acceptableMySQLErrorCodes ...int) error { - e.migrationMutex.Lock() - defer e.migrationMutex.Unlock() - +func (e *Executor) executeDirectly(ctx context.Context, onlineDDL *schema.OnlineDDL, acceptableMySQLErrorCodes ...int) (acceptableErrorCodeFound bool, err error) { conn, err := dbconnpool.NewDBConnection(ctx, e.env.Config().DB.DbaWithDB()) if err != nil { - return err + return false, err } defer conn.Close() @@ -417,6 +415,7 @@ func (e *Executor) executeDirectly(ctx context.Context, onlineDDL *schema.Online for _, acceptableCode := range acceptableMySQLErrorCodes { if merr.Num == acceptableCode { // we don't consider this to be an error. + acceptableErrorCodeFound = true err = nil break } @@ -424,11 +423,11 @@ func (e *Executor) executeDirectly(ctx context.Context, onlineDDL *schema.Online } } if err != nil { - return err + return false, err } _ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull) - return nil + return acceptableErrorCodeFound, nil } // terminateVReplMigration stops vreplication, then removes the _vt.vreplication entry for the given migration @@ -468,16 +467,10 @@ func (e *Executor) terminateVReplMigration(ctx context.Context, uuid string) err // cutOverVReplMigration stops vreplication, then removes the _vt.vreplication entry for the given migration func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) error { // sanity checks: - if s == nil { - return vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "No vreplication stream migration %s", s.workflow) - } - if s.bls.Filter == nil { - return vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "No binlog source filter for migration %s", s.workflow) - } - if len(s.bls.Filter.Rules) != 1 { - return vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "Cannot detect filter rules for migration/vreplication %+v", s.workflow) + vreplTable, err := getVreplTable(ctx, s) + if err != nil { + return err } - vreplTable := s.bls.Filter.Rules[0].Match // get topology client & entities: tmClient := tmclient.NewTabletManagerClient() @@ -491,7 +484,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er } // information about source tablet - onlineDDL, err := e.readMigration(ctx, s.workflow) + onlineDDL, _, err := e.readMigration(ctx, s.workflow) if err != nil { return err } @@ -588,11 +581,54 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er // deferred function will unlock keyspace } -// ExecuteWithVReplication sets up the grounds for a vreplication schema migration -func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schema.OnlineDDL) error { - e.migrationMutex.Lock() - defer e.migrationMutex.Unlock() +func (e *Executor) initVreplicationOriginalMigration(ctx context.Context, onlineDDL *schema.OnlineDDL, conn *dbconnpool.DBConnection) (v *VRepl, err error) { + vreplTableName := fmt.Sprintf("_%s_%s_vrepl", onlineDDL.UUID, ReadableTimestamp()) + { + // Apply CREATE TABLE for materialized table + parsed := sqlparser.BuildParsedQuery(sqlCreateTableLike, vreplTableName, onlineDDL.Table) + if _, err := conn.ExecuteFetch(parsed.Query, 0, false); err != nil { + return v, err + } + } + alterOptions := e.parseAlterOptions(ctx, onlineDDL) + { + // Apply ALTER TABLE to materialized table + parsed := sqlparser.BuildParsedQuery(sqlAlterTableOptions, vreplTableName, alterOptions) + if _, err := conn.ExecuteFetch(parsed.Query, 0, false); err != nil { + return v, err + } + } + v = NewVRepl(onlineDDL.UUID, e.keyspace, e.shard, e.dbName, onlineDDL.Table, vreplTableName, alterOptions) + return v, nil +} +func (e *Executor) initVreplicationRevertMigration(ctx context.Context, onlineDDL *schema.OnlineDDL, revertMigration *schema.OnlineDDL) (v *VRepl, err error) { + // Getting here we've already validated that migration is revertible + + // Validation: vreplication still exists for reverted migration + revertStream, err := e.readVReplStream(ctx, revertMigration.UUID, false) + if err != nil { + // cannot read the vreplication stream which we want to revert + return nil, fmt.Errorf("can not revert vreplication migration %s because vreplication stream %s was not found", revertMigration.UUID, revertMigration.UUID) + } + + onlineDDL.Table = revertMigration.Table + if err := e.updateMySQLTable(ctx, onlineDDL.UUID, onlineDDL.Table); err != nil { + return nil, err + } + + vreplTableName, err := getVreplTable(ctx, revertStream) + if err != nil { + return nil, err + } + + v = NewVRepl(onlineDDL.UUID, e.keyspace, e.shard, e.dbName, onlineDDL.Table, vreplTableName, "") + v.pos = revertStream.pos + return v, nil +} + +// ExecuteWithVReplication sets up the grounds for a vreplication schema migration +func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schema.OnlineDDL, revertMigration *schema.OnlineDDL) error { // make sure there's no vreplication workflow running under same name _ = e.terminateVReplMigration(ctx, onlineDDL.UUID) @@ -615,30 +651,25 @@ func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schem if err := e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusRunning, false, progressPctStarted); err != nil { return err } - vreplTableName := fmt.Sprintf("_%s_%s_vrepl", onlineDDL.UUID, ReadableTimestamp()) - if err := e.updateArtifacts(ctx, onlineDDL.UUID, vreplTableName); err != nil { - return err + var v *VRepl + if revertMigration == nil { + // Original ALTER TABLE request for vreplication + v, err = e.initVreplicationOriginalMigration(ctx, onlineDDL, conn) + } else { + // this is a revert request + v, err = e.initVreplicationRevertMigration(ctx, onlineDDL, revertMigration) } - - { - parsed := sqlparser.BuildParsedQuery(sqlCreateTableLike, vreplTableName, onlineDDL.Table) - if _, err := conn.ExecuteFetch(parsed.Query, 0, false); err != nil { - return err - } + if err != nil { + return err } - alterOptions := e.parseAlterOptions(ctx, onlineDDL) - { - parsed := sqlparser.BuildParsedQuery(sqlAlterTableOptions, vreplTableName, alterOptions) - // Apply ALTER TABLE to materialized table - if _, err := conn.ExecuteFetch(parsed.Query, 0, false); err != nil { - return err - } + if err := v.analyze(ctx, conn); err != nil { + return err } - v := NewVRepl(onlineDDL.UUID, e.keyspace, e.shard, e.dbName, onlineDDL.Table, vreplTableName) - if err := v.analyze(ctx, conn, alterOptions); err != nil { + if err := e.updateArtifacts(ctx, onlineDDL.UUID, v.targetTable); err != nil { return err } + { // We need to talk to tabletmanager's VREngine. But we're on TabletServer. While we live in the same // process as VREngine, it is actually simpler to get hold of it via gRPC, just like wrangler does. @@ -676,9 +707,6 @@ func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schem // Validation included testing the backend MySQL server and the gh-ost binary itself // Execution runs first a dry run, then an actual migration func (e *Executor) ExecuteWithGhost(ctx context.Context, onlineDDL *schema.OnlineDDL) error { - e.migrationMutex.Lock() - defer e.migrationMutex.Unlock() - if e.isAnyMigrationRunning() { return ErrExecutorMigrationAlreadyRunning } @@ -867,9 +895,6 @@ curl -s 'http://localhost:%d/schema-migration/report-status?uuid=%s&status=%s&dr // Validation included testing the backend MySQL server and the pt-online-schema-change binary itself // Execution runs first a dry run, then an actual migration func (e *Executor) ExecuteWithPTOSC(ctx context.Context, onlineDDL *schema.OnlineDDL) error { - e.migrationMutex.Lock() - defer e.migrationMutex.Unlock() - if e.isAnyMigrationRunning() { return ErrExecutorMigrationAlreadyRunning } @@ -1091,7 +1116,7 @@ export MYSQL_PWD return nil } -func (e *Executor) readMigration(ctx context.Context, uuid string) (onlineDDL *schema.OnlineDDL, err error) { +func (e *Executor) readMigration(ctx context.Context, uuid string) (onlineDDL *schema.OnlineDDL, row sqltypes.RowNamedValues, err error) { parsed := sqlparser.BuildParsedQuery(sqlSelectMigration, ":migration_uuid") bindVars := map[string]*querypb.BindVariable{ @@ -1099,16 +1124,16 @@ func (e *Executor) readMigration(ctx context.Context, uuid string) (onlineDDL *s } bound, err := parsed.GenerateQuery(bindVars, nil) if err != nil { - return onlineDDL, err + return onlineDDL, nil, err } r, err := e.execQuery(ctx, bound) if err != nil { - return onlineDDL, err + return onlineDDL, nil, err } - row := r.Named().Row() + row = r.Named().Row() if row == nil { // No results - return nil, ErrMigrationNotFound + return nil, nil, ErrMigrationNotFound } onlineDDL = &schema.OnlineDDL{ Keyspace: row["keyspace"].ToString(), @@ -1122,7 +1147,7 @@ func (e *Executor) readMigration(ctx context.Context, uuid string) (onlineDDL *s Retries: row.AsInt64("retries", 0), TabletAlias: row["tablet"].ToString(), } - return onlineDDL, nil + return onlineDDL, row, nil } // terminateMigration attempts to interrupt and hard-stop a running migration @@ -1178,7 +1203,7 @@ func (e *Executor) cancelMigration(ctx context.Context, uuid string, terminateRu var rowsAffected uint64 - onlineDDL, err := e.readMigration(ctx, uuid) + onlineDDL, _, err := e.readMigration(ctx, uuid) if err != nil { return nil, err } @@ -1282,6 +1307,170 @@ func (e *Executor) scheduleNextMigration(ctx context.Context) error { return err } +func (e *Executor) validateMigrationRevertible(ctx context.Context, revertMigration *schema.OnlineDDL) (err error) { + // Validation: migration to revert exists and is in complete state + action, actionStr, err := revertMigration.GetActionStr() + if err != nil { + return err + } + switch action { + case sqlparser.AlterDDLAction: + if revertMigration.Strategy != schema.DDLStrategyOnline { + return fmt.Errorf("can only revert a %s strategy migration. Migration %s has %s strategy", schema.DDLStrategyOnline, revertMigration.UUID, revertMigration.Strategy) + } + case sqlparser.RevertDDLAction: + case sqlparser.CreateDDLAction: + case sqlparser.DropDDLAction: + default: + return fmt.Errorf("cannot revert migration %s: unexpected action %s", revertMigration.UUID, actionStr) + } + if revertMigration.Status != schema.OnlineDDLStatusComplete { + return fmt.Errorf("can only revert a migration in a '%s' state. Migration %s is in '%s' state", schema.OnlineDDLStatusComplete, revertMigration.UUID, revertMigration.Status) + } + { + // Validation: see if there's a pending migration on this table: + r, err := e.execQuery(ctx, sqlSelectPendingMigrations) + if err != nil { + return err + } + // we identify running migrations on requested table + for _, row := range r.Named().Rows { + pendingUUID := row["migration_uuid"].ToString() + keyspace := row["keyspace"].ToString() + table := row["mysql_table"].ToString() + status := schema.OnlineDDLStatus(row["migration_status"].ToString()) + + if keyspace == e.keyspace && table == revertMigration.Table { + return fmt.Errorf("can not revert migration %s on table %s because migration %s is in %s status. May only revert if all migrations on this table are completed or failed", revertMigration.UUID, revertMigration.Table, pendingUUID, status) + } + } + { + // Validation: see that we're reverting the last successful migration on this table: + query, err := sqlparser.ParseAndBind(sqlSelectCompleteMigrationsOnTable, + sqltypes.StringBindVariable(e.keyspace), + sqltypes.StringBindVariable(revertMigration.Table), + ) + if err != nil { + return err + } + r, err := e.execQuery(ctx, query) + if err != nil { + return err + } + for _, row := range r.Named().Rows { + completeUUID := row["migration_uuid"].ToString() + if completeUUID != revertMigration.UUID { + return fmt.Errorf("can not revert migration %s on table %s because it is not the last migration to complete on that table. The last migration to complete was %s", revertMigration.UUID, revertMigration.Table, completeUUID) + } + } + } + } + return nil +} + +// executeRevert is called for 'revert' migrations (SQL is of the form "revert 99caeca2_74e2_11eb_a693_f875a4d24e90", not a real SQL of course). +// In this function we: +// - figure out whether the revert is valid: can we really revert requested migration? +// - what type of migration we're reverting? (CREATE/DROP/ALTER) +// - revert appropriately to the type of migration +func (e *Executor) executeRevert(ctx context.Context, onlineDDL *schema.OnlineDDL) (err error) { + revertUUID, _ := onlineDDL.GetRevertUUID() + if err != nil { + return fmt.Errorf("cannot run a revert migration %v: %+v", onlineDDL.UUID, err) + } + + revertMigration, row, err := e.readMigration(ctx, revertUUID) + if err != nil { + return err + } + if err := e.validateMigrationRevertible(ctx, revertMigration); err != nil { + return err + } + revertActionStr := row["ddl_action"].ToString() + switch revertActionStr { + case sqlparser.CreateStr: + { + // We are reverting a CREATE migration. The revert is to DROP, only we don't actually + // drop the table, we rename it into lifecycle + // Possibly this was a CREATE TABLE IF NOT EXISTS, and possibly the table already existed + // before the DDL, in which case the CREATE was a noop. In that scenario we _do not_ drop + // the table. + // We can tell the difference by looking at the artifacts. A successful CREATE TABLE, where + // a table actually gets created, has a sentry, dummy artifact. A noop has not. + + if err := e.updateDDLAction(ctx, onlineDDL.UUID, sqlparser.DropStr); err != nil { + return err + } + if err := e.updateMySQLTable(ctx, onlineDDL.UUID, revertMigration.Table); err != nil { + return err + } + + artifacts := row["artifacts"].ToString() + artifactTables := textutil.SplitDelimitedList(artifacts) + if len(artifactTables) > 1 { + return fmt.Errorf("cannot run migration %s reverting %s: found %d artifact tables, expected maximum 1", onlineDDL.UUID, revertMigration.UUID, len(artifactTables)) + } + if len(artifactTables) == 0 { + // This indicates no table was actually created. this must have beena CREATE TABLE IF NOT EXISTS where the table already existed. + _ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull) + } + + for _, artifactTable := range artifactTables { + if err := e.updateArtifacts(ctx, onlineDDL.UUID, artifactTable); err != nil { + return err + } + onlineDDL.SQL = sqlparser.BuildParsedQuery(sqlRenameTable, revertMigration.Table, artifactTable).Query + if _, err := e.executeDirectly(ctx, onlineDDL); err != nil { + return err + } + } + } + case sqlparser.DropStr: + { + // We are reverting a DROP migration. But the table wasn't really dropped, because that's not how + // we run DROP migrations. It was renamed. So we need to rename it back. + // But we impose as if we are now CREATE-ing the table. + if err := e.updateDDLAction(ctx, onlineDDL.UUID, sqlparser.CreateStr); err != nil { + return err + } + if err := e.updateMySQLTable(ctx, onlineDDL.UUID, revertMigration.Table); err != nil { + return err + } + artifacts := row["artifacts"].ToString() + artifactTables := textutil.SplitDelimitedList(artifacts) + if len(artifactTables) > 1 { + return fmt.Errorf("cannot run migration %s reverting %s: found %d artifact tables, expected maximum 1", onlineDDL.UUID, revertMigration.UUID, len(artifactTables)) + } + if len(artifactTables) == 0 { + // Could happen on `DROP TABLE IF EXISTS` where the table did not exist... + _ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull) + } + for _, artifactTable := range artifactTables { + if err := e.updateArtifacts(ctx, onlineDDL.UUID, artifactTable); err != nil { + return err + } + onlineDDL.SQL = sqlparser.BuildParsedQuery(sqlRenameTable, artifactTable, revertMigration.Table).Query + if _, err := e.executeDirectly(ctx, onlineDDL); err != nil { + return err + } + } + } + case sqlparser.AlterStr: + { + if err := e.updateDDLAction(ctx, onlineDDL.UUID, sqlparser.AlterStr); err != nil { + return err + } + if err := e.ExecuteWithVReplication(ctx, onlineDDL, revertMigration); err != nil { + return err + } + } + default: + return fmt.Errorf("cannot run migration %s reverting %s: unexpected action %s", onlineDDL.UUID, revertMigration.UUID, revertActionStr) + } + + return nil +} + func (e *Executor) executeMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) error { failMigration := func(err error) error { _ = e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed) @@ -1299,12 +1488,15 @@ func (e *Executor) executeMigration(ctx context.Context, onlineDDL *schema.Onlin switch ddlAction { case sqlparser.DropDDLAction: go func() error { + e.migrationMutex.Lock() + defer e.migrationMutex.Unlock() + // Drop statement. // Normally, we're going to modify DROP to RENAME (see later on). But if table name is // already a GC-lifecycle table, then we don't put it through yet another GC lifecycle, // we just drop it. if schema.IsGCTableName(onlineDDL.Table) { - if err := e.executeDirectly(ctx, onlineDDL); err != nil { + if _, err := e.executeDirectly(ctx, onlineDDL); err != nil { return failMigration(err) } return nil @@ -1317,44 +1509,96 @@ func (e *Executor) executeMigration(ctx context.Context, onlineDDL *schema.Onlin return failMigration(err) } - onlineDDL.SQL, _, err = schema.GenerateRenameStatementWithUUID(onlineDDL.Table, schema.HoldTableGCState, onlineDDL.GetGCUUID(), time.Now().UTC().Add(gcHoldHours*time.Hour)) + var toTableName string + onlineDDL.SQL, toTableName, err = schema.GenerateRenameStatementWithUUID(onlineDDL.Table, schema.HoldTableGCState, onlineDDL.GetGCUUID(), time.Now().UTC().Add(*retainOnlineDDLTables)) if err != nil { return failMigration(err) } + if err := e.updateArtifacts(ctx, onlineDDL.UUID, toTableName); err != nil { + return err + } + acceptableErrorCodes := []int{} if ddlStmt.GetIfExists() { - err = e.executeDirectly(ctx, onlineDDL, mysql.ERCantFindFile, mysql.ERNoSuchTable) - } else { - err = e.executeDirectly(ctx, onlineDDL) + acceptableErrorCodes = acceptableDropTableIfExistsErrorCodes } - + acceptableErrCodeFound, err := e.executeDirectly(ctx, onlineDDL, acceptableErrorCodes...) if err != nil { return failMigration(err) } + if acceptableErrCodeFound { + // Table did not exist after all. There is no artifact + if err := e.clearArtifacts(ctx, onlineDDL.UUID); err != nil { + return err + } + } + return nil }() case sqlparser.CreateDDLAction: - go func() { - if err := e.executeDirectly(ctx, onlineDDL); err != nil { + go func() error { + e.migrationMutex.Lock() + defer e.migrationMutex.Unlock() + + sentryArtifactTableName, err := schema.GenerateGCTableName(schema.HoldTableGCState, time.Now().UTC().Add(*retainOnlineDDLTables)) + if err != nil { + return failMigration(err) + } + // we create a dummy artifact. Its existence means the table was created by this migration. + // It will be read by the revert operation. + if err := e.updateArtifacts(ctx, onlineDDL.UUID, sentryArtifactTableName); err != nil { + return err + } + ddlStmt, _, err := schema.ParseOnlineDDLStatement(onlineDDL.SQL) + if err != nil { + return failMigration(err) + } + if ddlStmt.GetIfNotExists() { + // This is a CREATE TABLE IF NOT EXISTS + // We want to know if the table actually exists before running this migration. + // If so, then the operation is noop, and when we revert the migration, we also do a noop. + exists, err := e.tableExists(ctx, onlineDDL.Table) + if err != nil { + return failMigration(err) + } + if exists { + // the table already exists. This CREATE TABLE IF NOT EXISTS statement is a noop. + // We therefore clear the artifact field. A revert operation will use this as a hint. + if err := e.clearArtifacts(ctx, onlineDDL.UUID); err != nil { + return failMigration(err) + } + } + } + if _, err := e.executeDirectly(ctx, onlineDDL); err != nil { failMigration(err) } + return nil }() case sqlparser.AlterDDLAction: switch onlineDDL.Strategy { case schema.DDLStrategyOnline: go func() { - if err := e.ExecuteWithVReplication(ctx, onlineDDL); err != nil { + e.migrationMutex.Lock() + defer e.migrationMutex.Unlock() + + if err := e.ExecuteWithVReplication(ctx, onlineDDL, nil); err != nil { failMigration(err) } }() case schema.DDLStrategyGhost: go func() { + e.migrationMutex.Lock() + defer e.migrationMutex.Unlock() + if err := e.ExecuteWithGhost(ctx, onlineDDL); err != nil { failMigration(err) } }() case schema.DDLStrategyPTOSC: go func() { + e.migrationMutex.Lock() + defer e.migrationMutex.Unlock() + if err := e.ExecuteWithPTOSC(ctx, onlineDDL); err != nil { failMigration(err) } @@ -1364,6 +1608,15 @@ func (e *Executor) executeMigration(ctx context.Context, onlineDDL *schema.Onlin return failMigration(fmt.Errorf("Unsupported strategy: %+v", onlineDDL.Strategy)) } } + case sqlparser.RevertDDLAction: + go func() { + e.migrationMutex.Lock() + defer e.migrationMutex.Unlock() + + if err := e.executeRevert(ctx, onlineDDL); err != nil { + failMigration(err) + } + }() } return nil } @@ -1676,7 +1929,7 @@ func (e *Executor) reviewStaleMigrations(ctx context.Context) error { for _, row := range r.Named().Rows { uuid := row["migration_uuid"].ToString() - onlineDDL, err := e.readMigration(ctx, uuid) + onlineDDL, _, err := e.readMigration(ctx, uuid) if err != nil { return err } @@ -1709,7 +1962,7 @@ func (e *Executor) retryTabletFailureMigrations(ctx context.Context) error { return err } -// gcArtifacts garbage-collects migration artifacts from completed/failed migrations +// gcArtifactTable garbage-collects a single table func (e *Executor) gcArtifactTable(ctx context.Context, artifactTable, uuid string) error { tableExists, err := e.tableExists(ctx, artifactTable) if err != nil { @@ -1718,6 +1971,8 @@ func (e *Executor) gcArtifactTable(ctx context.Context, artifactTable, uuid stri if !tableExists { return nil } + // We've already concluded in gcArtifacts() that this table was held for long enough. + // We therefore move it into PURGE state. renameStatement, _, err := schema.GenerateRenameStatementWithUUID(artifactTable, schema.PurgeTableGCState, schema.OnlineDDLToGCUUID(uuid), time.Now().UTC()) if err != nil { return err @@ -1737,7 +1992,13 @@ func (e *Executor) gcArtifacts(ctx context.Context) error { e.migrationMutex.Lock() defer e.migrationMutex.Unlock() - r, err := e.execQuery(ctx, sqlSelectUncollectedArtifacts) + query, err := sqlparser.ParseAndBind(sqlSelectUncollectedArtifacts, + sqltypes.Int64BindVariable(int64((*retainOnlineDDLTables).Seconds())), + ) + if err != nil { + return err + } + r, err := e.execQuery(ctx, query) if err != nil { return err } @@ -1844,37 +2105,38 @@ func (e *Executor) updateMigrationTimestamp(ctx context.Context, timestampColumn func (e *Executor) updateMigrationLogPath(ctx context.Context, uuid string, hostname, path string) error { logPath := fmt.Sprintf("%s:%s", hostname, path) - parsed := sqlparser.BuildParsedQuery(sqlUpdateMigrationLogPath, - ":log_path", - ":migration_uuid", + query, err := sqlparser.ParseAndBind(sqlUpdateMigrationLogPath, + sqltypes.StringBindVariable(logPath), + sqltypes.StringBindVariable(uuid), ) - bindVars := map[string]*querypb.BindVariable{ - "log_path": sqltypes.StringBindVariable(logPath), - "migration_uuid": sqltypes.StringBindVariable(uuid), - } - bound, err := parsed.GenerateQuery(bindVars, nil) if err != nil { return err } - _, err = e.execQuery(ctx, bound) + _, err = e.execQuery(ctx, query) return err } func (e *Executor) updateArtifacts(ctx context.Context, uuid string, artifacts ...string) error { bindArtifacts := strings.Join(artifacts, ",") - parsed := sqlparser.BuildParsedQuery(sqlUpdateArtifacts, - ":artifacts", - ":migration_uuid", + query, err := sqlparser.ParseAndBind(sqlUpdateArtifacts, + sqltypes.StringBindVariable(bindArtifacts), + sqltypes.StringBindVariable(uuid), ) - bindVars := map[string]*querypb.BindVariable{ - "artifacts": sqltypes.StringBindVariable(bindArtifacts), - "migration_uuid": sqltypes.StringBindVariable(uuid), + if err != nil { + return err } - bound, err := parsed.GenerateQuery(bindVars, nil) + _, err = e.execQuery(ctx, query) + return err +} + +func (e *Executor) clearArtifacts(ctx context.Context, uuid string) error { + query, err := sqlparser.ParseAndBind(sqlClearArtifacts, + sqltypes.StringBindVariable(uuid), + ) if err != nil { return err } - _, err = e.execQuery(ctx, bound) + _, err = e.execQuery(ctx, query) return err } @@ -1895,19 +2157,26 @@ func (e *Executor) updateTabletFailure(ctx context.Context, uuid string) error { } func (e *Executor) updateMigrationStatus(ctx context.Context, uuid string, status schema.OnlineDDLStatus) error { - parsed := sqlparser.BuildParsedQuery(sqlUpdateMigrationStatus, - ":migration_status", - ":migration_uuid", + query, err := sqlparser.ParseAndBind(sqlUpdateMigrationStatus, + sqltypes.StringBindVariable(string(status)), + sqltypes.StringBindVariable(uuid), ) - bindVars := map[string]*querypb.BindVariable{ - "migration_status": sqltypes.StringBindVariable(string(status)), - "migration_uuid": sqltypes.StringBindVariable(uuid), + if err != nil { + return err } - bound, err := parsed.GenerateQuery(bindVars, nil) + _, err = e.execQuery(ctx, query) + return err +} + +func (e *Executor) updateDDLAction(ctx context.Context, uuid string, actionStr string) error { + query, err := sqlparser.ParseAndBind(sqlUpdateDDLAction, + sqltypes.StringBindVariable(actionStr), + sqltypes.StringBindVariable(uuid), + ) if err != nil { return err } - _, err = e.execQuery(ctx, bound) + _, err = e.execQuery(ctx, query) return err } @@ -1923,6 +2192,18 @@ func (e *Executor) updateMigrationMessage(ctx context.Context, uuid string, mess return err } +func (e *Executor) updateMySQLTable(ctx context.Context, uuid string, tableName string) error { + query, err := sqlparser.ParseAndBind(sqlUpdateMySQLTable, + sqltypes.StringBindVariable(tableName), + sqltypes.StringBindVariable(uuid), + ) + if err != nil { + return err + } + _, err = e.execQuery(ctx, query) + return err +} + func (e *Executor) updateMigrationProgress(ctx context.Context, uuid string, progress float64) error { if progress <= 0 { // progress starts at 0, and can only increase. diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index 3d7959fc19f..9f80f678f0e 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -54,6 +54,7 @@ const ( alterSchemaMigrationsTableContext = "ALTER TABLE _vt.schema_migrations add column migration_context varchar(1024) NOT NULL DEFAULT ''" alterSchemaMigrationsTableDDLAction = "ALTER TABLE _vt.schema_migrations add column ddl_action varchar(16) NOT NULL DEFAULT ''" alterSchemaMigrationsTableMessage = "ALTER TABLE _vt.schema_migrations add column message TEXT NOT NULL" + alterSchemaMigrationsTableTableCompleteIndex = "ALTER TABLE _vt.schema_migrations add KEY table_complete_idx (migration_status, keyspace(64), mysql_table(64), completed_timestamp)" sqlScheduleSingleMigration = `UPDATE _vt.schema_migrations SET @@ -65,6 +66,11 @@ const ( requested_timestamp ASC LIMIT 1 ` + sqlUpdateMySQLTable = `UPDATE _vt.schema_migrations + SET mysql_table=%a + WHERE + migration_uuid=%a + ` sqlUpdateMigrationStatus = `UPDATE _vt.schema_migrations SET migration_status=%a WHERE @@ -95,11 +101,21 @@ const ( WHERE migration_uuid=%a ` + sqlClearArtifacts = `UPDATE _vt.schema_migrations + SET artifacts='' + WHERE + migration_uuid=%a + ` sqlUpdateTabletFailure = `UPDATE _vt.schema_migrations SET tablet_failure=1 WHERE migration_uuid=%a ` + sqlUpdateDDLAction = `UPDATE _vt.schema_migrations + SET ddl_action=%a + WHERE + migration_uuid=%a + ` sqlUpdateMessage = `UPDATE _vt.schema_migrations SET message=%a WHERE @@ -133,6 +149,18 @@ const ( WHERE migration_status='running' ` + sqlSelectCompleteMigrationsOnTable = `SELECT + migration_uuid, + strategy + FROM _vt.schema_migrations + WHERE + migration_status='complete' + AND keyspace=%a + AND mysql_table=%a + ORDER BY + completed_timestamp DESC + LIMIT 1 + ` sqlSelectCountReadyMigrations = `SELECT count(*) as count_ready FROM _vt.schema_migrations @@ -159,6 +187,7 @@ const ( WHERE migration_status IN ('complete', 'failed') AND cleanup_timestamp IS NULL + AND completed_timestamp <= NOW() - INTERVAL %a SECOND ` sqlSelectMigration = `SELECT id, @@ -178,6 +207,8 @@ const ( migration_status, log_path, retries, + ddl_action, + artifacts, tablet FROM _vt.schema_migrations WHERE @@ -201,6 +232,8 @@ const ( migration_status, log_path, retries, + ddl_action, + artifacts, tablet FROM _vt.schema_migrations WHERE @@ -245,7 +278,8 @@ const ( _vt.copy_state WHERE vrepl_id=%a ` - sqlSwapTables = "RENAME TABLE `%a` TO `%a`, `%a` TO `%a`, `%a` TO `%a`" + sqlSwapTables = "RENAME TABLE `%a` TO `%a`, `%a` TO `%a`, `%a` TO `%a`" + sqlRenameTable = "RENAME TABLE `%a` TO `%a`" ) const ( @@ -278,4 +312,5 @@ var applyDDL = []string{ alterSchemaMigrationsTableContext, alterSchemaMigrationsTableDDLAction, alterSchemaMigrationsTableMessage, + alterSchemaMigrationsTableTableCompleteIndex, } diff --git a/go/vt/vttablet/onlineddl/vrepl.go b/go/vt/vttablet/onlineddl/vrepl.go index 6a3d4035ce8..4b300e61236 100644 --- a/go/vt/vttablet/onlineddl/vrepl.go +++ b/go/vt/vttablet/onlineddl/vrepl.go @@ -34,7 +34,9 @@ import ( "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/dbconnpool" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/onlineddl/vrepl" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" ) @@ -54,12 +56,14 @@ type VReplStream struct { // VRepl is an online DDL helper for VReplication based migrations (ddl_strategy="online") type VRepl struct { - workflow string - keyspace string - shard string - dbName string - sourceTable string - targetTable string + workflow string + keyspace string + shard string + dbName string + sourceTable string + targetTable string + pos string + alterOptions string sharedPKColumns *vrepl.ColumnList @@ -74,15 +78,16 @@ type VRepl struct { } // NewVRepl creates a VReplication handler for Online DDL -func NewVRepl(workflow, keyspace, shard, dbName, sourceTable, targetTable string) *VRepl { +func NewVRepl(workflow, keyspace, shard, dbName, sourceTable, targetTable, alterOptions string) *VRepl { return &VRepl{ - workflow: workflow, - keyspace: keyspace, - shard: shard, - dbName: dbName, - sourceTable: sourceTable, - targetTable: targetTable, - parser: vrepl.NewAlterTableParser(), + workflow: workflow, + keyspace: keyspace, + shard: shard, + dbName: dbName, + sourceTable: sourceTable, + targetTable: targetTable, + alterOptions: alterOptions, + parser: vrepl.NewAlterTableParser(), } } @@ -237,12 +242,12 @@ func (v *VRepl) getSharedUniqueKeys(sourceUniqueKeys, targetUniqueKeys [](*vrepl return uniqueKeys, nil } -func (v *VRepl) analyzeAlter(ctx context.Context, alterOptions string) error { - if err := v.parser.ParseAlterStatement(alterOptions); err != nil { +func (v *VRepl) analyzeAlter(ctx context.Context) error { + if err := v.parser.ParseAlterStatement(v.alterOptions); err != nil { return err } if v.parser.IsRenameTable() { - return fmt.Errorf("Renaming the table is not aupported in ALTER TABLE: %s", alterOptions) + return fmt.Errorf("Renaming the table is not aupported in ALTER TABLE: %s", v.alterOptions) } return nil } @@ -309,8 +314,8 @@ func (v *VRepl) analyzeBinlogSource(ctx context.Context) { v.bls = bls } -func (v *VRepl) analyze(ctx context.Context, conn *dbconnpool.DBConnection, alterOptions string) error { - if err := v.analyzeAlter(ctx, alterOptions); err != nil { +func (v *VRepl) analyze(ctx context.Context, conn *dbconnpool.DBConnection) error { + if err := v.analyzeAlter(ctx); err != nil { return err } if err := v.analyzeTables(ctx, conn); err != nil { @@ -326,7 +331,7 @@ func (v *VRepl) analyze(ctx context.Context, conn *dbconnpool.DBConnection, alte // generateInsertStatement generates the INSERT INTO _vt.replication stataement that creates the vreplication workflow func (v *VRepl) generateInsertStatement(ctx context.Context) (string, error) { ig := vreplication.NewInsertGenerator(binlogplayer.BlpStopped, v.dbName) - ig.AddRow(v.workflow, v.bls, "", "", "MASTER") + ig.AddRow(v.workflow, v.bls, v.pos, "", "MASTER") return ig.String(), nil } @@ -339,6 +344,21 @@ func (v *VRepl) generateStartStatement(ctx context.Context) (string, error) { ) } +func getVreplTable(ctx context.Context, s *VReplStream) (string, error) { + // sanity checks: + if s == nil { + return "", vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "No vreplication stream migration %s", s.workflow) + } + if s.bls.Filter == nil { + return "", vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "No binlog source filter for migration %s", s.workflow) + } + if len(s.bls.Filter.Rules) != 1 { + return "", vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "Cannot detect filter rules for migration/vreplication %+v", s.workflow) + } + vreplTable := s.bls.Filter.Rules[0].Match + return vreplTable, nil +} + // escapeName will escape a db/table/column/... name by wrapping with backticks. // It is not fool proof. I'm just trying to do the right thing here, not solving // SQL injection issues, which should be irrelevant for this tool. diff --git a/test/ci_workflow_gen.go b/test/ci_workflow_gen.go index 6e0cde4f4e9..1407d0a0808 100644 --- a/test/ci_workflow_gen.go +++ b/test/ci_workflow_gen.go @@ -32,7 +32,7 @@ const ( unitTestDatabases = "percona56, mysql57, mysql80, mariadb101, mariadb102, mariadb103" clusterTestTemplate = "templates/cluster_endtoend_test.tpl" - clusterList = "11,12,13,14,15,16,17,18,19,20,21,22,23,24,26,27,vreplication_basic,vreplication_multicell,vreplication_cellalias,vreplication_v2,onlineddl_ghost,onlineddl_vrepl,onlineddl_vrepl_stress,vreplication_migrate" + clusterList = "11,12,13,14,15,16,17,18,19,20,21,22,23,24,26,27,vreplication_basic,vreplication_multicell,vreplication_cellalias,vreplication_v2,onlineddl_ghost,onlineddl_vrepl,onlineddl_vrepl_stress,vreplication_migrate,onlineddl_revert" // TODO: currently some percona tools including xtrabackup are installed on all clusters, we can possibly optimize // this by only installing them in the required clusters clustersRequiringXtraBackup = clusterList diff --git a/test/config.json b/test/config.json index 1b4f608b882..31f20149763 100644 --- a/test/config.json +++ b/test/config.json @@ -1,17 +1,17 @@ { "Tests": { - "java": { - "File": "", - "Args": [], - "Command": [ - "make", - "java_test" - ], - "Manual": false, - "Shard": "10", - "RetryMax": 0, - "Tags": [] - }, + "java": { + "File": "", + "Args": [], + "Command": [ + "make", + "java_test" + ], + "Manual": false, + "Shard": "10", + "RetryMax": 0, + "Tags": [] + }, "client_test": { "File": "", "Args": [], @@ -290,7 +290,16 @@ "Command": [], "Manual": false, "Shard": "onlineddl_vrepl_stress", - "RetryMax": 1, + "RetryMax": 0, + "Tags": [] + }, + "onlineddl_revert": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/onlineddl_revert"], + "Command": [], + "Manual": false, + "Shard": "onlineddl_revert", + "RetryMax": 0, "Tags": [] }, "pitr": {