diff --git a/.github/workflows/cluster_endtoend_onlineddl_scheduler.yml b/.github/workflows/cluster_endtoend_onlineddl_scheduler.yml new file mode 100644 index 00000000000..a7c17b80b4b --- /dev/null +++ b/.github/workflows/cluster_endtoend_onlineddl_scheduler.yml @@ -0,0 +1,88 @@ +# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows" + +name: Cluster (onlineddl_scheduler) +on: [push, pull_request] +concurrency: + group: format('{0}-{1}', ${{ github.ref }}, 'Cluster (onlineddl_scheduler)') + cancel-in-progress: true + +env: + LAUNCHABLE_ORGANIZATION: "vitess" + LAUNCHABLE_WORKSPACE: "vitess-app" + GITHUB_PR_HEAD_SHA: "${{ github.event.pull_request.head.sha }}" + +jobs: + build: + name: Run endtoend tests on Cluster (onlineddl_scheduler) + runs-on: ubuntu-18.04 + + steps: + - name: Set up Go + uses: actions/setup-go@v2 + with: + go-version: 1.17 + + - name: Set up python + uses: actions/setup-python@v2 + + - name: Tune the OS + run: | + echo '1024 65535' | sudo tee -a /proc/sys/net/ipv4/ip_local_port_range + + # TEMPORARY WHILE GITHUB FIXES THIS https://github.com/actions/virtual-environments/issues/3185 + - name: Add the current IP address, long hostname and short hostname record to /etc/hosts file + run: | + echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts + # DON'T FORGET TO REMOVE CODE ABOVE WHEN ISSUE IS ADRESSED! + + - name: Check out code + uses: actions/checkout@v2 + + - name: Get dependencies + run: | + sudo apt-get update + sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata + sudo service mysql stop + sudo service etcd stop + sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/ + sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld + go mod download + + # install JUnit report formatter + go get -u github.com/vitessio/go-junit-report@HEAD + + wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb + sudo apt-get install -y gnupg2 + sudo dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb + sudo apt-get update + sudo apt-get install percona-xtrabackup-24 + + - name: Setup launchable dependencies + run: | + # Get Launchable CLI installed. If you can, make it a part of the builder image to speed things up + pip3 install --user launchable~=1.0 > /dev/null + + # verify that launchable setup is all correct. + launchable verify || true + + # Tell Launchable about the build you are producing and testing + launchable record build --name "$GITHUB_RUN_ID" --source . + + - name: Run cluster endtoend test + timeout-minutes: 30 + run: | + source build.env + + set -x + + # run the tests however you normally do, then produce a JUnit XML file + eatmydata -- go run test.go -docker=false -follow -shard onlineddl_scheduler | tee -a output.txt | go-junit-report -set-exit-code > report.xml + + - name: Print test output and Record test result in launchable + run: | + # send recorded tests to launchable + launchable record tests --build "$GITHUB_RUN_ID" go-test . || true + + # print test output + cat output.txt + if: always() diff --git a/doc/releasenotes/13_0_0_summary.md b/doc/releasenotes/13_0_0_summary.md index e1d9c8afa77..a35868d3dbc 100644 --- a/doc/releasenotes/13_0_0_summary.md +++ b/doc/releasenotes/13_0_0_summary.md @@ -29,7 +29,6 @@ A new query is supported: ```sql alter vitess_migration '9748c3b7_7fdb_11eb_ac2c_f875a4d24e90' cleanup ``` - This query tells Vitess that a migration's artifacts are good to be cleaned up asap. This allows Vitess to free disk resources sooner. As a reminder, once a migration's artifacts are cleaned up, the migration is no longer revertible. diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go new file mode 100644 index 00000000000..8c5a9b2799f --- /dev/null +++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go @@ -0,0 +1,610 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "flag" + "fmt" + "os" + "path" + "strings" + "testing" + "time" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/schema" + "vitess.io/vitess/go/vt/sqlparser" + + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/onlineddl" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + shards []cluster.Shard + vtParams mysql.ConnParams + + hostname = "localhost" + keyspaceName = "ks" + cell = "zone1" + schemaChangeDirectory = "" + ddlStrategy = "online" + t1Name = "t1_test" + t2Name = "t2_test" + createT1Statement = ` + CREATE TABLE t1_test ( + id bigint(20) not null, + hint_col varchar(64) not null default 'just-created', + PRIMARY KEY (id) + ) ENGINE=InnoDB + ` + createT2Statement = ` + CREATE TABLE t2_test ( + id bigint(20) not null, + hint_col varchar(64) not null default 'just-created', + PRIMARY KEY (id) + ) ENGINE=InnoDB + ` + trivialAlterT1Statement = ` + ALTER TABLE t1_test ENGINE=InnoDB; + ` + trivialAlterT2Statement = ` + ALTER TABLE t2_test ENGINE=InnoDB; + ` + dropT1Statement = ` + DROP TABLE IF EXISTS t1_test + ` + dropT3Statement = ` + DROP TABLE IF EXISTS t3_test + ` + dropT4Statement = ` + DROP TABLE IF EXISTS t3_test + ` +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitcode, err := func() (int, error) { + clusterInstance = cluster.NewCluster(cell, hostname) + schemaChangeDirectory = path.Join("/tmp", fmt.Sprintf("schema_change_dir_%d", clusterInstance.GetAndReserveTabletUID())) + defer os.RemoveAll(schemaChangeDirectory) + defer clusterInstance.Teardown() + + if _, err := os.Stat(schemaChangeDirectory); os.IsNotExist(err) { + _ = os.Mkdir(schemaChangeDirectory, 0700) + } + + clusterInstance.VtctldExtraArgs = []string{ + "-schema_change_dir", schemaChangeDirectory, + "-schema_change_controller", "local", + "-schema_change_check_interval", "1"} + + clusterInstance.VtTabletExtraArgs = []string{ + "-enable-lag-throttler", + "-throttle_threshold", "1s", + "-heartbeat_enable", + "-heartbeat_interval", "250ms", + } + clusterInstance.VtGateExtraArgs = []string{} + + if err := clusterInstance.StartTopo(); err != nil { + return 1, err + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + } + + // No need for replicas in this stress test + if err := clusterInstance.StartKeyspace(*keyspace, []string{"1"}, 0, false); err != nil { + return 1, err + } + + vtgateInstance := clusterInstance.NewVtgateInstance() + // set the gateway we want to use + vtgateInstance.GatewayImplementation = "tabletgateway" + // Start vtgate + if err := vtgateInstance.Setup(); err != nil { + return 1, err + } + // ensure it is torn down during cluster TearDown + clusterInstance.VtgateProcess = *vtgateInstance + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + + return m.Run(), nil + }() + if err != nil { + fmt.Printf("%v\n", err) + os.Exit(1) + } else { + os.Exit(exitcode) + } + +} + +func TestSchemaChange(t *testing.T) { + defer cluster.PanicHandler(t) + shards = clusterInstance.Keyspaces[0].Shards + require.Equal(t, 1, len(shards)) + + var t1uuid string + var t2uuid string + + testCompareTableTimes := func(t *testing.T, t1uuid, t2uuid string) { + t.Run("Compare t1, t2 times", func(t *testing.T) { + var t1endTime, t2startTime string + { + rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + t1endTime = row.AsString("completed_timestamp", "") + assert.NotEmpty(t, t1endTime) + } + } + { + rs := onlineddl.ReadMigrations(t, &vtParams, t2uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + t2startTime = row.AsString("started_timestamp", "") + assert.NotEmpty(t, t2startTime) + } + } + assert.GreaterOrEqual(t, t2startTime, t1endTime) + }) + } + + // CREATE + t.Run("CREATE TABLEs t1, t1", func(t *testing.T) { + { // The table does not exist + t1uuid = testOnlineDDLStatement(t, createT1Statement, ddlStrategy, "vtgate", "just-created", "", false) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete) + checkTable(t, t1Name, true) + } + { + // The table does not exist + t2uuid = testOnlineDDLStatement(t, createT2Statement, ddlStrategy, "vtgate", "just-created", "", false) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusComplete) + checkTable(t, t2Name, true) + } + testCompareTableTimes(t, t1uuid, t2uuid) + }) + t.Run("ALTER both tables non-concurrent", func(t *testing.T) { + t1uuid = testOnlineDDLStatement(t, trivialAlterT1Statement, ddlStrategy, "vtgate", "", "", true) // skip wait + t2uuid = testOnlineDDLStatement(t, trivialAlterT2Statement, ddlStrategy, "vtgate", "", "", true) // skip wait + + t.Run("wait for t1 complete", func(t *testing.T) { + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + }) + t.Run("wait for t1 complete", func(t *testing.T) { + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + }) + t.Run("check both complete", func(t *testing.T) { + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusComplete) + }) + testCompareTableTimes(t, t1uuid, t2uuid) + }) + t.Run("ALTER both tables non-concurrent, postponed", func(t *testing.T) { + t1uuid = testOnlineDDLStatement(t, trivialAlterT1Statement, ddlStrategy+" -postpone-completion", "vtgate", "", "", true) // skip wait + t2uuid = testOnlineDDLStatement(t, trivialAlterT2Statement, ddlStrategy+" -postpone-completion", "vtgate", "", "", true) // skip wait + + t.Run("test allow-concurrent", func(t *testing.T) { + rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + allowConcurrent := row.AsInt64("allow_concurrent", 0) + assert.Equal(t, int64(0), allowConcurrent) + } + }) + t.Run("expect t1 running, t2 queued", func(t *testing.T) { + onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, 20*time.Second, schema.OnlineDDLStatusRunning) + // now that t1 is running, let's unblock t2. We expect it to remain queued. + onlineddl.CheckCompleteMigration(t, &vtParams, shards, t2uuid, true) + time.Sleep(5 * time.Second) + // t1 should be still running! + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning) + // non-concurrent -- should be queued! + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) + }) + t.Run("complete t1", func(t *testing.T) { + // Issue a complete and wait for successful completion + onlineddl.CheckCompleteMigration(t, &vtParams, shards, t1uuid, true) + // This part may take a while, because we depend on vreplication polling + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete) + }) + t.Run("expect t2 to complete", func(t *testing.T) { + // This part may take a while, because we depend on vreplication polling + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusComplete) + }) + testCompareTableTimes(t, t1uuid, t2uuid) + }) + t.Run("ALTER both tables, not elligible for concurrenct", func(t *testing.T) { + // An ALTE RTABLE is not allowed to run concurrently. We add -allow-concurrent but it will be ignored + t1uuid = testOnlineDDLStatement(t, trivialAlterT1Statement, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", "", true) // skip wait + t2uuid = testOnlineDDLStatement(t, trivialAlterT2Statement, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", "", true) // skip wait + + t.Run("test allow-concurrent", func(t *testing.T) { + rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + allowConcurrent := row.AsInt64("allow_concurrent", 0) + assert.Equal(t, int64(0), allowConcurrent) + } + }) + t.Run("expect t1 running, t2 queued", func(t *testing.T) { + onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, 20*time.Second, schema.OnlineDDLStatusRunning) + // now that t1 is running, let's unblock t2. We expect it to remain queued. + onlineddl.CheckCompleteMigration(t, &vtParams, shards, t2uuid, true) + time.Sleep(5 * time.Second) + // t1 should be still running! + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning) + // non-concurrent -- should be queued! + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) + }) + t.Run("complete t1", func(t *testing.T) { + // Issue a complete and wait for successful completion + onlineddl.CheckCompleteMigration(t, &vtParams, shards, t1uuid, true) + // This part may take a while, because we depend on vreplication polling + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete) + }) + t.Run("expect t2 to complete", func(t *testing.T) { + // This part may take a while, because we depend on vreplication polling + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusComplete) + }) + testCompareTableTimes(t, t1uuid, t2uuid) + }) + t.Run("REVERT both tables concurrent, postponed", func(t *testing.T) { + t1uuid = testRevertMigration(t, t1uuid, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", true) + t2uuid = testRevertMigration(t, t2uuid, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", true) + + t.Run("test allow-concurrent", func(t *testing.T) { + rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + allowConcurrent := row.AsInt64("allow_concurrent", 0) + assert.Equal(t, int64(1), allowConcurrent) + } + }) + t.Run("expect both migrations to run", func(t *testing.T) { + onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, 20*time.Second, schema.OnlineDDLStatusRunning) + onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, 20*time.Second, schema.OnlineDDLStatusRunning) + }) + t.Run("complete t2", func(t *testing.T) { + // now that both are running, let's unblock t2. We expect it to complete. + // Issue a complete and wait for successful completion + onlineddl.CheckCompleteMigration(t, &vtParams, shards, t2uuid, true) + // This part may take a while, because we depend on vreplication polling + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusComplete) + }) + t.Run("complete t1", func(t *testing.T) { + // t1 should be still running! + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning) + // Issue a complete and wait for successful completion + onlineddl.CheckCompleteMigration(t, &vtParams, shards, t1uuid, true) + // This part may take a while, because we depend on vreplication polling + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete) + }) + }) + t.Run("concurrent REVERT vs two non-concurrent DROPs", func(t *testing.T) { + t1uuid = testRevertMigration(t, t1uuid, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", true) + drop3uuid := testOnlineDDLStatement(t, dropT3Statement, ddlStrategy, "vtgate", "", "", true) // skip wait + + t.Run("test allow-concurrent", func(t *testing.T) { + rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + allowConcurrent := row.AsInt64("allow_concurrent", 0) + assert.Equal(t, int64(1), allowConcurrent) + } + }) + t.Run("test allow-concurrent for drop3", func(t *testing.T) { + rs := onlineddl.ReadMigrations(t, &vtParams, drop3uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + allowConcurrent := row.AsInt64("allow_concurrent", 0) + assert.Equal(t, int64(0), allowConcurrent) + } + }) + t.Run("expect t1 migration to run", func(t *testing.T) { + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, 20*time.Second, schema.OnlineDDLStatusRunning) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning) + }) + drop1uuid := testOnlineDDLStatement(t, dropT1Statement, ddlStrategy, "vtgate", "", "", true) // skip wait + t.Run("drop3 complete", func(t *testing.T) { + // drop3 migration should not block. It can run concurrently to t1, and does not conflict + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, drop3uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, drop3uuid, schema.OnlineDDLStatusComplete) + }) + t.Run("drop1 blocked", func(t *testing.T) { + // drop1 migration should block. It can run concurrently to t1, but conflicts on table name + onlineddl.CheckMigrationStatus(t, &vtParams, shards, drop1uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) + // let's cancel it + onlineddl.CheckCancelMigration(t, &vtParams, shards, drop1uuid, true) + time.Sleep(2 * time.Second) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, drop1uuid, schema.OnlineDDLStatusFailed) + }) + t.Run("complete t1", func(t *testing.T) { + // t1 should be still running! + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning) + // Issue a complete and wait for successful completion + onlineddl.CheckCompleteMigration(t, &vtParams, shards, t1uuid, true) + // This part may take a while, because we depend on vreplication polling + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete) + }) + }) + t.Run("non-concurrent REVERT vs three concurrent drops", func(t *testing.T) { + t1uuid = testRevertMigration(t, t1uuid, ddlStrategy+" -postpone-completion", "vtgate", "", true) + drop3uuid := testOnlineDDLStatement(t, dropT3Statement, ddlStrategy+" -allow-concurrent", "vtgate", "", "", true) // skip wait + drop4uuid := testOnlineDDLStatement(t, dropT4Statement, ddlStrategy+" -allow-concurrent -postpone-completion", "vtgate", "", "", true) // skip wait + + t.Run("test allow-concurrent for drop3", func(t *testing.T) { + rs := onlineddl.ReadMigrations(t, &vtParams, drop3uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + allowConcurrent := row.AsInt64("allow_concurrent", 0) + assert.Equal(t, int64(1), allowConcurrent) + } + }) + t.Run("expect t1 migration to run", func(t *testing.T) { + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, 20*time.Second, schema.OnlineDDLStatusRunning) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning) + }) + drop1uuid := testOnlineDDLStatement(t, dropT1Statement, ddlStrategy+" -allow-concurrent", "vtgate", "", "", true) // skip wait + t.Run("test allow-concurrent for drop1", func(t *testing.T) { + rs := onlineddl.ReadMigrations(t, &vtParams, drop1uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + allowConcurrent := row.AsInt64("allow_concurrent", 0) + assert.Equal(t, int64(1), allowConcurrent) + } + }) + t.Run("t3drop complete", func(t *testing.T) { + // drop3 migration should not block. It can run concurrently to t1, and does not conflict + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, drop3uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, drop3uuid, schema.OnlineDDLStatusComplete) + }) + t.Run("t1drop blocked", func(t *testing.T) { + // drop1 migration should block. It can run concurrently to t1, but conflicts on table name + onlineddl.CheckMigrationStatus(t, &vtParams, shards, drop1uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) + }) + t.Run("t4 postponed", func(t *testing.T) { + // drop4 migration should postpone. + onlineddl.CheckMigrationStatus(t, &vtParams, shards, drop4uuid, schema.OnlineDDLStatusQueued) + // Issue a complete and wait for successful completion. drop4 is non-conflicting and should be able to proceed + onlineddl.CheckCompleteMigration(t, &vtParams, shards, drop4uuid, true) + // This part may take a while, because we depend on vreplication polling + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, drop4uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, drop4uuid, schema.OnlineDDLStatusComplete) + }) + t.Run("complete t1", func(t *testing.T) { + // t1 should be still running! + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning) + // Issue a complete and wait for successful completion + onlineddl.CheckCompleteMigration(t, &vtParams, shards, t1uuid, true) + // This part may take a while, because we depend on vreplication polling + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete) + }) + t.Run("t1drop unblocked", func(t *testing.T) { + // t1drop should now be unblocked! + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, drop1uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, drop1uuid, schema.OnlineDDLStatusComplete) + checkTable(t, t1Name, false) + }) + t.Run("revert t1 drop", func(t *testing.T) { + revertDrop3uuid := testRevertMigration(t, drop1uuid, ddlStrategy+" -allow-concurrent", "vtgate", "", true) + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, revertDrop3uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, revertDrop3uuid, schema.OnlineDDLStatusComplete) + checkTable(t, t1Name, true) + }) + }) + t.Run("conflicting migration does not block other queued migrations", func(t *testing.T) { + t1uuid = testOnlineDDLStatement(t, trivialAlterT1Statement, ddlStrategy, "vtgate", "", "", false) // skip wait + t.Run("trivial t1 migration", func(t *testing.T) { + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete) + checkTable(t, t1Name, true) + }) + + t1uuid = testRevertMigration(t, t1uuid, ddlStrategy+" -postpone-completion", "vtgate", "", true) + t.Run("expect t1 revert migration to run", func(t *testing.T) { + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, 20*time.Second, schema.OnlineDDLStatusRunning) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning) + }) + drop1uuid := testOnlineDDLStatement(t, dropT1Statement, ddlStrategy+" -allow-concurrent", "vtgate", "", "", true) // skip wait + t.Run("t1drop blocked", func(t *testing.T) { + time.Sleep(5 * time.Second) + // drop1 migration should block. It can run concurrently to t1, but conflicts on table name + onlineddl.CheckMigrationStatus(t, &vtParams, shards, drop1uuid, schema.OnlineDDLStatusReady) + }) + t.Run("t3drop complete", func(t *testing.T) { + // drop3 migration should not block. It can run concurrently to t1, and does not conflict + // even though t1drop is blocked! This is the heart of this test + drop3uuid := testOnlineDDLStatement(t, dropT3Statement, ddlStrategy+" -allow-concurrent", "vtgate", "", "", false) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, drop3uuid, schema.OnlineDDLStatusComplete) + }) + t.Run("cancel drop1", func(t *testing.T) { + // drop1 migration should block. It can run concurrently to t1, but conflicts on table name + onlineddl.CheckMigrationStatus(t, &vtParams, shards, drop1uuid, schema.OnlineDDLStatusReady) + // let's cancel it + onlineddl.CheckCancelMigration(t, &vtParams, shards, drop1uuid, true) + time.Sleep(2 * time.Second) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, drop1uuid, schema.OnlineDDLStatusFailed) + }) + t.Run("complete t1", func(t *testing.T) { + // t1 should be still running! + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning) + // Issue a complete and wait for successful completion + onlineddl.CheckCompleteMigration(t, &vtParams, shards, t1uuid, true) + // This part may take a while, because we depend on vreplication polling + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete) + }) + }) +} + +// testOnlineDDLStatement runs an online DDL, ALTER statement +func testOnlineDDLStatement(t *testing.T, ddlStatement string, ddlStrategy string, executeStrategy string, expectHint string, expectError string, skipWait bool) (uuid string) { + strategySetting, err := schema.ParseDDLStrategy(ddlStrategy) + require.NoError(t, err) + + stmt, err := sqlparser.Parse(ddlStatement) + require.NoError(t, err) + ddlStmt, ok := stmt.(sqlparser.DDLStatement) + require.True(t, ok) + tableName := ddlStmt.GetTable().Name.String() + + if executeStrategy == "vtgate" { + result := onlineddl.VtgateExecDDL(t, &vtParams, ddlStrategy, ddlStatement, expectError) + if result != nil { + row := result.Named().Row() + if row != nil { + uuid = row.AsString("uuid", "") + } + } + } else { + output, err := clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, ddlStatement, cluster.VtctlClientParams{DDLStrategy: ddlStrategy, SkipPreflight: true}) + if expectError == "" { + assert.NoError(t, err) + uuid = output + } else { + assert.Error(t, err) + assert.Contains(t, output, expectError) + } + } + uuid = strings.TrimSpace(uuid) + fmt.Println("# Generated UUID (for debug purposes):") + fmt.Printf("<%s>\n", uuid) + + if !strategySetting.Strategy.IsDirect() && !skipWait { + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + } + + if expectError == "" && expectHint != "" { + checkMigratedTable(t, tableName, expectHint) + } + return uuid +} + +// testRevertMigration reverts a given migration +func testRevertMigration(t *testing.T, revertUUID string, ddlStrategy, executeStrategy string, expectError string, skipWait bool) (uuid string) { + revertQuery := fmt.Sprintf("revert vitess_migration '%s'", revertUUID) + if executeStrategy == "vtgate" { + result := onlineddl.VtgateExecDDL(t, &vtParams, ddlStrategy, revertQuery, expectError) + if result != nil { + row := result.Named().Row() + if row != nil { + uuid = row.AsString("uuid", "") + } + } + } else { + output, err := clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, revertQuery, cluster.VtctlClientParams{DDLStrategy: ddlStrategy, SkipPreflight: true}) + if expectError == "" { + assert.NoError(t, err) + uuid = output + } else { + assert.Error(t, err) + assert.Contains(t, output, expectError) + } + } + + if expectError == "" { + uuid = strings.TrimSpace(uuid) + fmt.Println("# Generated UUID (for debug purposes):") + fmt.Printf("<%s>\n", uuid) + } + if !skipWait { + time.Sleep(time.Second * 20) + } + return uuid +} + +// checkTable checks the number of tables in the first two shards. +func checkTable(t *testing.T, showTableName string, expectExists bool) bool { + expectCount := 0 + if expectExists { + expectCount = 1 + } + for i := range clusterInstance.Keyspaces[0].Shards { + if !checkTablesCount(t, clusterInstance.Keyspaces[0].Shards[i].Vttablets[0], showTableName, expectCount) { + return false + } + } + return true +} + +// checkTablesCount checks the number of tables in the given tablet +func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName string, expectCount int) bool { + query := fmt.Sprintf(`show tables like '%%%s%%';`, showTableName) + queryResult, err := tablet.VttabletProcess.QueryTablet(query, keyspaceName, true) + require.Nil(t, err) + return assert.Equal(t, expectCount, len(queryResult.Rows)) +} + +// checkMigratedTables checks the CREATE STATEMENT of a table after migration +func checkMigratedTable(t *testing.T, tableName, expectHint string) { + for i := range clusterInstance.Keyspaces[0].Shards { + createStatement := getCreateTableStatement(t, clusterInstance.Keyspaces[0].Shards[i].Vttablets[0], tableName) + assert.Contains(t, createStatement, expectHint) + } +} + +// getCreateTableStatement returns the CREATE TABLE statement for a given table +func getCreateTableStatement(t *testing.T, tablet *cluster.Vttablet, tableName string) (statement string) { + queryResult, err := tablet.VttabletProcess.QueryTablet(fmt.Sprintf("show create table %s;", tableName), keyspaceName, true) + require.Nil(t, err) + + assert.Equal(t, len(queryResult.Rows), 1) + assert.Equal(t, len(queryResult.Rows[0]), 2) // table name, create statement + statement = queryResult.Rows[0][1].ToString() + return statement +} diff --git a/go/test/endtoend/onlineddl/singleton/onlineddl_singleton_test.go b/go/test/endtoend/onlineddl/singleton/onlineddl_singleton_test.go index 84a0ed91f38..ccf53ee5c54 100644 --- a/go/test/endtoend/onlineddl/singleton/onlineddl_singleton_test.go +++ b/go/test/endtoend/onlineddl/singleton/onlineddl_singleton_test.go @@ -37,6 +37,7 @@ import ( var ( clusterInstance *cluster.LocalProcessCluster + shards []cluster.Shard vtParams mysql.ConnParams hostname = "localhost" @@ -145,7 +146,7 @@ func TestMain(m *testing.M) { func TestSchemaChange(t *testing.T) { defer cluster.PanicHandler(t) - shards := clusterInstance.Keyspaces[0].Shards + shards = clusterInstance.Keyspaces[0].Shards require.Equal(t, 1, len(shards)) var uuids []string @@ -231,7 +232,7 @@ func TestSchemaChange(t *testing.T) { throttledUUIDs = strings.Split(uuidList, "\n") assert.Equal(t, 3, len(throttledUUIDs)) for _, uuid := range throttledUUIDs { - onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning, schema.OnlineDDLStatusQueued) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady, schema.OnlineDDLStatusRunning) } }) t.Run("failed migrations, singleton-context", func(t *testing.T) { @@ -239,7 +240,7 @@ func TestSchemaChange(t *testing.T) { }) t.Run("terminate throttled migrations", func(t *testing.T) { for _, uuid := range throttledUUIDs { - onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning, schema.OnlineDDLStatusQueued) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady, schema.OnlineDDLStatusRunning) onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, true) } time.Sleep(2 * time.Second) @@ -311,8 +312,9 @@ func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy str fmt.Println("# Generated UUID (for debug purposes):") fmt.Printf("<%s>\n", uuid) - if !strategySetting.Strategy.IsDirect() && !skipWait { - time.Sleep(time.Second * 20) + if !strategySetting.Strategy.IsDirect() && !skipWait && uuid != "" { + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) } if expectError == "" && expectHint != "" { diff --git a/go/vt/schema/ddl_strategy.go b/go/vt/schema/ddl_strategy.go index d6fea8a6c55..8ae4ecc2423 100644 --- a/go/vt/schema/ddl_strategy.go +++ b/go/vt/schema/ddl_strategy.go @@ -34,6 +34,7 @@ const ( singletonContextFlag = "singleton-context" allowZeroInDateFlag = "allow-zero-in-date" postponeCompletionFlag = "postpone-completion" + allowConcurrentFlag = "allow-concurrent" vreplicationTestSuite = "vreplication-test-suite" ) @@ -142,6 +143,11 @@ func (setting *DDLStrategySetting) IsPostponeCompletion() bool { return setting.hasFlag(postponeCompletionFlag) } +// IsAllowConcurrent checks if strategy options include -allow-concurrent +func (setting *DDLStrategySetting) IsAllowConcurrent() bool { + return setting.hasFlag(allowConcurrentFlag) +} + // IsVreplicationTestSuite checks if strategy options include -vreplicatoin-test-suite func (setting *DDLStrategySetting) IsVreplicationTestSuite() bool { return setting.hasFlag(vreplicationTestSuite) @@ -159,6 +165,7 @@ func (setting *DDLStrategySetting) RuntimeOptions() []string { case isFlag(opt, singletonContextFlag): case isFlag(opt, allowZeroInDateFlag): case isFlag(opt, postponeCompletionFlag): + case isFlag(opt, allowConcurrentFlag): case isFlag(opt, vreplicationTestSuite): default: validOpts = append(validOpts, opt) diff --git a/go/vt/schema/ddl_strategy_test.go b/go/vt/schema/ddl_strategy_test.go index c719f3d6334..cd19aea8793 100644 --- a/go/vt/schema/ddl_strategy_test.go +++ b/go/vt/schema/ddl_strategy_test.go @@ -42,6 +42,7 @@ func TestParseDDLStrategy(t *testing.T) { isDeclarative bool isSingleton bool isPostponeCompletion bool + isAllowConcurrent bool runtimeOptions string err error }{ @@ -99,6 +100,13 @@ func TestParseDDLStrategy(t *testing.T) { runtimeOptions: "", isPostponeCompletion: true, }, + { + strategyVariable: "online -allow-concurrent", + strategy: DDLStrategyOnline, + options: "-allow-concurrent", + runtimeOptions: "", + isAllowConcurrent: true, + }, } for _, ts := range tt { setting, err := ParseDDLStrategy(ts.strategyVariable) @@ -108,6 +116,7 @@ func TestParseDDLStrategy(t *testing.T) { assert.Equal(t, ts.isDeclarative, setting.IsDeclarative()) assert.Equal(t, ts.isSingleton, setting.IsSingleton()) assert.Equal(t, ts.isPostponeCompletion, setting.IsPostponeCompletion()) + assert.Equal(t, ts.isAllowConcurrent, setting.IsAllowConcurrent()) runtimeOptions := strings.Join(setting.RuntimeOptions(), " ") assert.Equal(t, ts.runtimeOptions, runtimeOptions) diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 05aac3a40dc..157987832d6 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -158,8 +158,9 @@ type Executor struct { // - be adopted by this executor (possible for vreplication migrations), or // - be terminated (example: pt-osc migration gone rogue, process still running even as the migration failed) // The Executor auto-reviews the map and cleans up migrations thought to be running which are not running. - ownedRunningMigrations sync.Map - tickReentranceFlag int64 + ownedRunningMigrations sync.Map + tickReentranceFlag int64 + reviewedRunningMigrationsFlag bool ticks *timer.Timer isOpen bool @@ -272,6 +273,7 @@ func (e *Executor) Open() error { if e.isOpen || !e.env.Config().EnableOnlineDDL { return nil } + e.reviewedRunningMigrationsFlag = false // will be set as "true" by reviewRunningMigrations() e.pool.Open(e.env.Config().DB.AppWithDB(), e.env.Config().DB.DbaWithDB(), e.env.Config().DB.AppDebugWithDB()) e.ticks.Start(e.onMigrationCheckTick) e.triggerNextCheckInterval() @@ -306,15 +308,85 @@ func (e *Executor) triggerNextCheckInterval() { } } -// isAnyMigrationRunning sees if there's any migration running right now -func (e *Executor) isAnyMigrationRunning() bool { - migrationFound := false +// allowConcurrentMigration checks if the given migration is allowed to run concurrently. +// First, the migration itself must declare --allow-concurrent. But then, there's also some +// restrictions on which migrations exactly are allowed such concurrency. +func (e *Executor) allowConcurrentMigration(onlineDDL *schema.OnlineDDL) bool { + if !onlineDDL.StrategySetting().IsAllowConcurrent() { + return false + } + + action, err := onlineDDL.GetAction() + if err != nil { + return false + } + switch action { + case sqlparser.CreateDDLAction, sqlparser.DropDDLAction: + // CREATE TABLE, DROP TABLE are allowed to run concurrently. + return true + case sqlparser.RevertDDLAction: + // REVERT is allowed to run concurrently. + // Reminder that REVERT is supported for CREATE, DROP and for 'online' ALTER, but never for + // 'gh-ost' or 'pt-osc' ALTERs + return true + } + return false +} + +// isAnyNonConcurrentMigrationRunning sees if there's any migration running right now +// that does not have -allow-concurrent. +// such a running migration will for example prevent a new non-concurrent migration from running. +func (e *Executor) isAnyNonConcurrentMigrationRunning() bool { + nonConcurrentMigrationFound := false - e.ownedRunningMigrations.Range(func(_, _ interface{}) bool { - migrationFound = true - return false // stop iteration + e.ownedRunningMigrations.Range(func(_, val interface{}) bool { + onlineDDL, ok := val.(*schema.OnlineDDL) + if !ok { + return true + } + if !e.allowConcurrentMigration(onlineDDL) { + // The migratoin may have declared itself to be --allow-concurrent, but our scheduler + // reserves the right to say "no, you're NOT in fact allowed to run concurrently" + // (as example, think a `gh-ost` ALTER migration that says --allow-concurrent) + nonConcurrentMigrationFound = true + return false // stop iteration, no need to review other migrations + } + return true + }) + + return nonConcurrentMigrationFound +} + +// isAnyMigrationRunningOnTable sees if there's any migration running right now +// operating on given table. +func (e *Executor) isAnyMigrationRunningOnTable(tableName string) bool { + sameTableMigrationFound := false + e.ownedRunningMigrations.Range(func(_, val interface{}) bool { + onlineDDL, ok := val.(*schema.OnlineDDL) + if !ok { + return true + } + if onlineDDL.Table == tableName { + sameTableMigrationFound = true + return false // stop iteration, no need to review other migrations + } + return true }) - return migrationFound + return sameTableMigrationFound +} + +// isAnyConflictingMigrationRunning checks if there's any running migration that conflicts with the +// given migration, such that they can't both run concurrently. +func (e *Executor) isAnyConflictingMigrationRunning(onlineDDL *schema.OnlineDDL) bool { + + if e.isAnyNonConcurrentMigrationRunning() && !e.allowConcurrentMigration(onlineDDL) { + return true + } + if e.isAnyMigrationRunningOnTable(onlineDDL.Table) { + return true + } + + return false } func (e *Executor) ghostPanicFlagFileName(uuid string) string { @@ -855,7 +927,7 @@ func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schem // make sure there's no vreplication workflow running under same name _ = e.terminateVReplMigration(ctx, onlineDDL.UUID) - if e.isAnyMigrationRunning() { + if e.isAnyConflictingMigrationRunning(onlineDDL) { return ErrExecutorMigrationAlreadyRunning } @@ -869,7 +941,7 @@ func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schem } defer conn.Close() - e.ownedRunningMigrations.Store(onlineDDL.UUID, true) + e.ownedRunningMigrations.Store(onlineDDL.UUID, onlineDDL) if err := e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusRunning, false, progressPctStarted, etaSecondsUnknown, rowsCopiedUnknown); err != nil { return err } @@ -953,7 +1025,7 @@ func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schem // Validation included testing the backend MySQL server and the gh-ost binary itself // Execution runs first a dry run, then an actual migration func (e *Executor) ExecuteWithGhost(ctx context.Context, onlineDDL *schema.OnlineDDL) error { - if e.isAnyMigrationRunning() { + if e.isAnyConflictingMigrationRunning(onlineDDL) { return ErrExecutorMigrationAlreadyRunning } @@ -1127,7 +1199,7 @@ exit $exit_code return err } - e.ownedRunningMigrations.Store(onlineDDL.UUID, true) + e.ownedRunningMigrations.Store(onlineDDL.UUID, onlineDDL) go func() error { defer e.ownedRunningMigrations.Delete(onlineDDL.UUID) @@ -1168,7 +1240,7 @@ exit $exit_code // Validation included testing the backend MySQL server and the pt-online-schema-change binary itself // Execution runs first a dry run, then an actual migration func (e *Executor) ExecuteWithPTOSC(ctx context.Context, onlineDDL *schema.OnlineDDL) error { - if e.isAnyMigrationRunning() { + if e.isAnyConflictingMigrationRunning(onlineDDL) { return ErrExecutorMigrationAlreadyRunning } @@ -1348,7 +1420,7 @@ export MYSQL_PWD return err } - e.ownedRunningMigrations.Store(onlineDDL.UUID, true) + e.ownedRunningMigrations.Store(onlineDDL.UUID, onlineDDL) go func() error { defer e.ownedRunningMigrations.Delete(onlineDDL.UUID) @@ -1561,36 +1633,15 @@ func (e *Executor) CancelPendingMigrations(ctx context.Context, message string) } // scheduleNextMigration attemps to schedule a single migration to run next. -// possibly there's no migrations to run. Possibly there's a migration running right now, -// in which cases nothing happens. +// possibly there are migrations to run. +// The effect of this function is to move a migration from 'queued' state to 'ready' state, is all. func (e *Executor) scheduleNextMigration(ctx context.Context) error { e.migrationMutex.Lock() defer e.migrationMutex.Unlock() - if e.isAnyMigrationRunning() { - return ErrExecutorMigrationAlreadyRunning - } - - { - r, err := e.execQuery(ctx, sqlSelectCountReadyMigrations) - if err != nil { - return err - } - - row := r.Named().Row() - countReady, err := row.ToInt64("count_ready") - if err != nil { - return err - } - - if countReady > 0 { - // seems like there's already one migration that's good to go - return nil - } - } // Cool, seems like no migration is ready. Let's try and make a single 'queued' migration 'ready' - + // The query sqlScheduleSingleMigration has some business logic; in the future, we can + // consider moving the logic outside the query and into this function's code. _, err := e.execQuery(ctx, sqlScheduleSingleMigration) - return err } @@ -1661,7 +1712,7 @@ func (e *Executor) reviewQueuedMigrations(ctx context.Context) error { return nil } -func (e *Executor) validateMigrationRevertible(ctx context.Context, revertMigration *schema.OnlineDDL) (err error) { +func (e *Executor) validateMigrationRevertible(ctx context.Context, revertMigration *schema.OnlineDDL, revertingMigrationUUID string) (err error) { // Validation: migration to revert exists and is in complete state action, actionStr, err := revertMigration.GetActionStr() if err != nil { @@ -1690,6 +1741,10 @@ func (e *Executor) validateMigrationRevertible(ctx context.Context, revertMigrat // we identify running migrations on requested table for _, row := range r.Named().Rows { pendingUUID := row["migration_uuid"].ToString() + if pendingUUID == revertingMigrationUUID { + // that's fine; the migration we're looking at is the very one that's trying to issue this revert + continue + } keyspace := row["keyspace"].ToString() table := row["mysql_table"].ToString() status := schema.OnlineDDLStatus(row["migration_status"].ToString()) @@ -1737,7 +1792,7 @@ func (e *Executor) executeRevert(ctx context.Context, onlineDDL *schema.OnlineDD if err != nil { return err } - if err := e.validateMigrationRevertible(ctx, revertMigration); err != nil { + if err := e.validateMigrationRevertible(ctx, revertMigration, onlineDDL.UUID); err != nil { return err } revertedActionStr := row["ddl_action"].ToString() @@ -2230,48 +2285,68 @@ func (e *Executor) executeMigration(ctx context.Context, onlineDDL *schema.Onlin return nil } +// runNextMigration picks up to one 'ready' migration that is able to run, and executes it. +// Possible scenarios: +// - no migration is in 'ready' state -- nothing to be done +// - a migration is 'ready', but conflicts with other running migrations -- try another 'ready' migration +// - multiple migrations are 'ready' -- we just handle one here +// Note that per the above breakdown, and due to potential conflicts, it is possible to have one or +// more 'ready' migration, and still none is executed. func (e *Executor) runNextMigration(ctx context.Context) error { e.migrationMutex.Lock() defer e.migrationMutex.Unlock() - if e.isAnyMigrationRunning() { - return ErrExecutorMigrationAlreadyRunning + if !e.reviewedRunningMigrationsFlag { + // Since Open(), we havent's once executed reviewRunningMigrations() successfully. + // This means we may not have a good picture of what is actually running. Perhaps there's + // a vreplication migration from a pre-PRS/ERS that we still need to learn about? + // We're going to be careful here, and avoid running new migrations until we have + // a better picture. It will likely take a couple seconds till next iteration. + // This delay only takes place shortly after Open(). + return nil } - r, err := e.execQuery(ctx, sqlSelectReadyMigration) - if err != nil { - return err - } - named := r.Named() - for i, row := range named.Rows { - onlineDDL := &schema.OnlineDDL{ - Keyspace: row["keyspace"].ToString(), - Table: row["mysql_table"].ToString(), - Schema: row["mysql_schema"].ToString(), - SQL: row["migration_statement"].ToString(), - UUID: row["migration_uuid"].ToString(), - Strategy: schema.DDLStrategy(row["strategy"].ToString()), - Options: row["options"].ToString(), - Status: schema.OnlineDDLStatus(row["migration_status"].ToString()), - Retries: row.AsInt64("retries", 0), - TabletAlias: row["tablet"].ToString(), - RequestContext: row["migration_context"].ToString(), + // getNonConflictingMigration finds a single 'ready' migration which does not conflict with running migrations. + // Conflicts are: + // - a migration is 'ready' but is not set to run _concurrently_, and there's a running migration that is also non-concurrent + // - a migration is 'ready' but there's another migration 'running' on the exact same table + getNonConflictingMigration := func() (*schema.OnlineDDL, error) { + r, err := e.execQuery(ctx, sqlSelectReadyMigrations) + if err != nil { + return nil, err } - { - // We strip out any VT query comments because our simplified parser doesn't work well with comments - ddlStmt, _, err := schema.ParseOnlineDDLStatement(onlineDDL.SQL) - if err == nil { - ddlStmt.SetComments(sqlparser.Comments{}) - onlineDDL.SQL = sqlparser.String(ddlStmt) + for _, row := range r.Named().Rows { + uuid := row["migration_uuid"].ToString() + onlineDDL, _, err := e.readMigration(ctx, uuid) + if err != nil { + return nil, err + } + if !e.isAnyConflictingMigrationRunning(onlineDDL) { + // This migration seems good to go + return onlineDDL, err } } - e.executeMigration(ctx, onlineDDL) - // the query should only ever return a single row at the most - // but let's make it also explicit here that we only run a single migration - if i == 0 { - break + // no non-conflicting migration found... + // Either all ready migrations are conflicting, or there are no ready migrations... + return nil, nil + } + onlineDDL, err := getNonConflictingMigration() + if err != nil { + return err + } + if onlineDDL == nil { + // nothing to do + return nil + } + { + // We strip out any VT query comments because our simplified parser doesn't work well with comments + ddlStmt, _, err := schema.ParseOnlineDDLStatement(onlineDDL.SQL) + if err == nil { + ddlStmt.SetComments(sqlparser.Comments{}) + onlineDDL.SQL = sqlparser.String(ddlStmt) } } + e.executeMigration(ctx, onlineDDL) return nil } @@ -2470,14 +2545,16 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i uuidsFoundRunning := map[string]bool{} for _, row := range r.Named().Rows { uuid := row["migration_uuid"].ToString() - strategy := schema.DDLStrategy(row["strategy"].ToString()) - strategySettings := schema.NewDDLStrategySetting(strategy, row["options"].ToString()) + onlineDDL, _, err := e.readMigration(ctx, uuid) + if err != nil { + return countRunnning, cancellable, err + } postponeCompletion := row.AsBool("postpone_completion", false) elapsedSeconds := row.AsInt64("elapsed_seconds", 0) uuidsFoundRunning[uuid] = true - switch strategy { + switch onlineDDL.StrategySetting().Strategy { case schema.DDLStrategyOnline: { // We check the _vt.vreplication table @@ -2485,7 +2562,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i if err != nil { return countRunnning, cancellable, err } - isVreplicationTestSuite := strategySettings.IsVreplicationTestSuite() + isVreplicationTestSuite := onlineDDL.StrategySetting().IsVreplicationTestSuite() if isVreplicationTestSuite { e.triggerNextCheckInterval() } @@ -2494,7 +2571,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i // this executor may not own the migration _yet_. We make sure to own it. // VReplication migrations are unique in this respect: we are able to complete // a vreplicaiton migration started by another tablet. - e.ownedRunningMigrations.Store(uuid, true) + e.ownedRunningMigrations.Store(uuid, onlineDDL) _ = e.updateMigrationTimestamp(ctx, "liveness_timestamp", uuid) _ = e.updateMigrationTablet(ctx, uuid) _ = e.updateRowsCopied(ctx, uuid, s.rowsCopied) @@ -2571,7 +2648,8 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i }) } - return countRunnning, cancellable, err + e.reviewedRunningMigrationsFlag = true + return countRunnning, cancellable, nil } // reviewStaleMigrations marks as 'failed' migrations whose status is 'running' but which have @@ -3133,6 +3211,7 @@ func (e *Executor) SubmitMigration( sqltypes.StringBindVariable(e.TabletAliasString()), sqltypes.Int64BindVariable(retainArtifactsSeconds), sqltypes.BoolBindVariable(onlineDDL.StrategySetting().IsPostponeCompletion()), + sqltypes.BoolBindVariable(e.allowConcurrentMigration(onlineDDL)), ) if err != nil { return nil, err diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index a0bb4279dd4..46693a1caff 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -68,6 +68,7 @@ const ( alterSchemaMigrationsTableRemovedNoDefaultColNames = "ALTER TABLE _vt.schema_migrations add column dropped_no_default_column_names text NOT NULL" alterSchemaMigrationsTableExpandedColNames = "ALTER TABLE _vt.schema_migrations add column expanded_column_names text NOT NULL" alterSchemaMigrationsTableRevertibleNotes = "ALTER TABLE _vt.schema_migrations add column revertible_notes text NOT NULL" + alterSchemaMigrationsTableAllowConcurrent = "ALTER TABLE _vt.schema_migrations add column allow_concurrent tinyint unsigned NOT NULL DEFAULT 0" sqlInsertMigration = `INSERT IGNORE INTO _vt.schema_migrations ( migration_uuid, @@ -84,9 +85,10 @@ const ( migration_status, tablet, retain_artifacts_seconds, - postpone_completion + postpone_completion, + allow_concurrent ) VALUES ( - %a, %a, %a, %a, %a, %a, %a, %a, %a, FROM_UNIXTIME(NOW()), %a, %a, %a, %a, %a + %a, %a, %a, %a, %a, %a, %a, %a, %a, FROM_UNIXTIME(NOW()), %a, %a, %a, %a, %a, %a )` sqlScheduleSingleMigration = `UPDATE _vt.schema_migrations @@ -255,8 +257,6 @@ const ( ` sqlSelectRunningMigrations = `SELECT migration_uuid, - strategy, - options, postpone_completion, timestampdiff(second, started_timestamp, now()) as elapsed_seconds FROM _vt.schema_migrations @@ -286,12 +286,6 @@ const ( AND migration_statement=%a LIMIT 1 ` - sqlSelectCountReadyMigrations = `SELECT - count(*) as count_ready - FROM _vt.schema_migrations - WHERE - migration_status='ready' - ` sqlSelectStaleMigrations = `SELECT migration_uuid FROM _vt.schema_migrations @@ -300,7 +294,10 @@ const ( AND liveness_timestamp < NOW() - INTERVAL %a MINUTE ` sqlSelectPendingMigrations = `SELECT - migration_uuid + migration_uuid, + keyspace, + mysql_table, + migration_status FROM _vt.schema_migrations WHERE migration_status IN ('queued', 'ready', 'running') @@ -364,37 +361,12 @@ const ( WHERE migration_uuid=%a ` - sqlSelectReadyMigration = `SELECT - id, - migration_uuid, - keyspace, - shard, - mysql_schema, - mysql_table, - migration_statement, - strategy, - options, - added_timestamp, - ready_timestamp, - started_timestamp, - liveness_timestamp, - completed_timestamp, - migration_status, - log_path, - log_file, - retries, - ddl_action, - artifacts, - tablet, - added_unique_keys, - removed_unique_keys, - migration_context, - retain_artifacts_seconds, - postpone_completion + sqlSelectReadyMigrations = `SELECT + migration_uuid FROM _vt.schema_migrations WHERE migration_status='ready' - LIMIT 1 + ORDER BY id ` sqlSelectPTOSCMigrationTriggers = `SELECT TRIGGER_SCHEMA as trigger_schema, @@ -583,4 +555,5 @@ var ApplyDDL = []string{ alterSchemaMigrationsTableRemovedNoDefaultColNames, alterSchemaMigrationsTableExpandedColNames, alterSchemaMigrationsTableRevertibleNotes, + alterSchemaMigrationsTableAllowConcurrent, } diff --git a/test/ci_workflow_gen.go b/test/ci_workflow_gen.go index 67528445de0..243f3c402b6 100644 --- a/test/ci_workflow_gen.go +++ b/test/ci_workflow_gen.go @@ -72,6 +72,7 @@ var ( "onlineddl_revert", "onlineddl_declarative", "onlineddl_singleton", + "onlineddl_scheduler", "onlineddl_revertible", "tabletmanager_throttler", "tabletmanager_throttler_custom_config", diff --git a/test/config.json b/test/config.json index f37499102ae..cbdc6c21f0e 100644 --- a/test/config.json +++ b/test/config.json @@ -391,6 +391,15 @@ "RetryMax": 1, "Tags": [] }, + "onlineddl_scheduler": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/onlineddl/scheduler"], + "Command": [], + "Manual": false, + "Shard": "onlineddl_scheduler", + "RetryMax": 1, + "Tags": [] + }, "pitr": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/recovery/pitr"],