Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OnlineDDL: better scheduling/cancellation logic #8603

Merged
177 changes: 159 additions & 18 deletions go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ var (
CREATE TABLE %s (
id bigint NOT NULL,
test_val bigint unsigned NOT NULL DEFAULT 0,
online_ddl_create_col INT NOT NULL,
online_ddl_create_col INT NOT NULL DEFAULT 0,
PRIMARY KEY (id)
) ENGINE=InnoDB;`
onlineDDLDropTableStatement = `
Expand Down Expand Up @@ -241,16 +241,19 @@ func TestSchemaChange(t *testing.T) {

shards = clusterInstance.Keyspaces[0].Shards
assert.Equal(t, 2, len(shards))
for _, shard := range shards {
assert.Equal(t, 2, len(shard.Vttablets))
}

testWithInitialSchema(t)
t.Run("alter non_online", func(t *testing.T) {
_ = testOnlineDDLStatement(t, alterTableNormalStatement, string(schema.DDLStrategyDirect), "vtctl", "non_online")
_ = testOnlineDDLStatement(t, alterTableNormalStatement, string(schema.DDLStrategyDirect), "vtctl", "non_online", false)
insertRows(t, 2)
testRows(t)
})
t.Run("successful online alter, vtgate", func(t *testing.T) {
insertRows(t, 2)
uuid := testOnlineDDLStatement(t, alterTableSuccessfulStatement, "online", "vtgate", "vrepl_col")
uuid := testOnlineDDLStatement(t, alterTableSuccessfulStatement, "online", "vtgate", "vrepl_col", false)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
testRows(t)
testMigrationRowCount(t, uuid)
Expand All @@ -259,7 +262,7 @@ func TestSchemaChange(t *testing.T) {
})
t.Run("successful online alter, vtctl", func(t *testing.T) {
insertRows(t, 2)
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "online", "vtctl", "vrepl_col")
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "online", "vtctl", "vrepl_col", false)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
testRows(t)
testMigrationRowCount(t, uuid)
Expand All @@ -268,20 +271,43 @@ func TestSchemaChange(t *testing.T) {
})
t.Run("throttled migration", func(t *testing.T) {
insertRows(t, 2)
for i := range clusterInstance.Keyspaces[0].Shards {
throttleApp(clusterInstance.Keyspaces[0].Shards[i].Vttablets[0], throttlerAppName)
defer unthrottleApp(clusterInstance.Keyspaces[0].Shards[i].Vttablets[0], throttlerAppName)
for i := range shards {
throttleApp(shards[i].Vttablets[0], throttlerAppName)
defer unthrottleApp(shards[i].Vttablets[0], throttlerAppName)
}
uuid := testOnlineDDLStatement(t, alterTableThrottlingStatement, "online", "vtgate", "vrepl_col")
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning)
uuid := testOnlineDDLStatement(t, alterTableThrottlingStatement, "online", "vtgate", "vrepl_col", true)
_ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 20*time.Second, schema.OnlineDDLStatusRunning)
testRows(t)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, true)
time.Sleep(2 * time.Second)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed)
})

t.Run("throttled and unthrottled migration", func(t *testing.T) {
insertRows(t, 2)
for i := range shards {
_, body, err := throttleApp(shards[i].Vttablets[0], throttlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, throttlerAppName)

defer unthrottleApp(shards[i].Vttablets[0], throttlerAppName)
}
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "online", "vtgate", "test_val", true)
_ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 20*time.Second, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning)
testRows(t)
for i := range shards {
_, body, err := unthrottleApp(shards[i].Vttablets[0], throttlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, throttlerAppName)
}
_ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
})

t.Run("failed migration", func(t *testing.T) {
insertRows(t, 2)
uuid := testOnlineDDLStatement(t, alterTableFailedStatement, "online", "vtgate", "vrepl_col")
uuid := testOnlineDDLStatement(t, alterTableFailedStatement, "online", "vtgate", "vrepl_col", false)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed)
testRows(t)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
Expand All @@ -305,46 +331,158 @@ func TestSchemaChange(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
_ = testOnlineDDLStatement(t, alterTableThrottlingStatement, "online", "vtgate", "vrepl_col")
_ = testOnlineDDLStatement(t, alterTableThrottlingStatement, "online", "vtgate", "vrepl_col", false)
}()
}
wg.Wait()
onlineddl.CheckCancelAllMigrations(t, &vtParams, len(shards)*count)
})

// reparent shard -80 to replica
// and then reparent it back to original state
// (two pretty much identical tests, the point is to end up with original state)
for currentPrimaryTabletIndex, reparentTabletIndex := range []int{1, 0} {
t.Run(fmt.Sprintf("PlannedReparentShard via throttling %d/2", (currentPrimaryTabletIndex+1)), func(t *testing.T) {
// resetRowCount()
insertRows(t, 2)
for i := range shards {
var body string
var err error
switch i {
case 0:
// this is the shard where we run PRS
_, body, err = throttleApp(shards[i].Vttablets[currentPrimaryTabletIndex], throttlerAppName)
defer unthrottleApp(shards[i].Vttablets[currentPrimaryTabletIndex], throttlerAppName)
case 1:
// no PRS on this shard
_, body, err = throttleApp(shards[i].Vttablets[0], throttlerAppName)
defer unthrottleApp(shards[i].Vttablets[0], throttlerAppName)
}
assert.NoError(t, err)
assert.Contains(t, body, throttlerAppName)
}
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "online", "vtgate", "test_val", true)

t.Run("wait for migration and vreplication to run", func(t *testing.T) {
_ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 20*time.Second, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning)
time.Sleep(5 * time.Second) // wait for _vt.vreplication to be created
vreplStatus := onlineddl.WaitForVReplicationStatus(t, &vtParams, shards, uuid, 20*time.Second, "Copying")
require.Equal(t, "Copying", vreplStatus)
// again see that we're still 'running'
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning)
testRows(t)
})

t.Run("Check tablet", func(t *testing.T) {
// onlineddl.Executor marks this migration with its tablet alias
// reminder that onlineddl.Executor runs on the primary tablet.
rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
shard := row["shard"].ToString()
tablet := row["tablet"].ToString()

switch shard {
case "-80":
require.Equal(t, shards[0].Vttablets[currentPrimaryTabletIndex].Alias, tablet)
case "80-":
require.Equal(t, shards[1].Vttablets[0].Alias, tablet)
default:
require.NoError(t, fmt.Errorf("unexpected shard name: %s", shard))
}
}
})
t.Run("PRS shard -80", func(t *testing.T) {
// migration has started and is throttled. We now run PRS
err := clusterInstance.VtctlclientProcess.ExecuteCommand("PlannedReparentShard", "-keyspace_shard", keyspaceName+"/-80", "-new_master", shards[0].Vttablets[reparentTabletIndex].Alias)
require.NoError(t, err, "failed PRS: %v", err)
})

t.Run("unthrottle and expect completion", func(t *testing.T) {
for i := range shards {
var body string
var err error
switch i {
case 0:
// this is the shard where we run PRS
_, body, err = unthrottleApp(shards[i].Vttablets[currentPrimaryTabletIndex], throttlerAppName)
case 1:
// no PRS on this shard
_, body, err = unthrottleApp(shards[i].Vttablets[0], throttlerAppName)
}
assert.NoError(t, err)
assert.Contains(t, body, throttlerAppName)
}

_ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
})

t.Run("Check tablet post PRS", func(t *testing.T) {
// onlineddl.Executor will find that a vrepl migration started in a different tablet.
// it will own the tablet and will update 'tablet' column in _vt.schema_migrations with its own
// (promoted primary) tablet alias.
rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
shard := row["shard"].ToString()
tablet := row["tablet"].ToString()

switch shard {
case "-80":
// PRS for this tablet, we promoted tablet[1]
require.Equal(t, shards[0].Vttablets[reparentTabletIndex].Alias, tablet)
case "80-":
// No PRS for this tablet
require.Equal(t, shards[1].Vttablets[0].Alias, tablet)
default:
require.NoError(t, fmt.Errorf("unexpected shard name: %s", shard))
}
}
})
})
}
t.Run("Online DROP, vtctl", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLDropTableStatement, "online", "vtctl", "")
uuid := testOnlineDDLStatement(t, onlineDDLDropTableStatement, "online", "vtctl", "", false)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false)
})
t.Run("Online CREATE, vtctl", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLCreateTableStatement, "online", "vtctl", "online_ddl_create_col")
uuid := testOnlineDDLStatement(t, onlineDDLCreateTableStatement, "online", "vtctl", "online_ddl_create_col", false)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false)
})
t.Run("Online DROP TABLE IF EXISTS, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLDropTableIfExistsStatement, "online", "vtgate", "")
uuid := testOnlineDDLStatement(t, onlineDDLDropTableIfExistsStatement, "online", "vtgate", "", false)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false)
// this table existed
checkTables(t, schema.OnlineDDLToGCUUID(uuid), 1)
})
t.Run("Online DROP TABLE IF EXISTS for nonexistent table, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLDropTableIfExistsStatement, "online", "vtgate", "")
uuid := testOnlineDDLStatement(t, onlineDDLDropTableIfExistsStatement, "online", "vtgate", "", false)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false)
// this table did not exist
checkTables(t, schema.OnlineDDLToGCUUID(uuid), 0)
})
t.Run("Online DROP TABLE for nonexistent table, expect error, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLDropTableStatement, "online", "vtgate", "")
uuid := testOnlineDDLStatement(t, onlineDDLDropTableStatement, "online", "vtgate", "", false)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, true)
})
t.Run("Online CREATE, vtctl", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLCreateTableStatement, "online", "vtctl", "online_ddl_create_col", false)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false)
})
}

func insertRow(t *testing.T) {
Expand Down Expand Up @@ -406,7 +544,7 @@ func testWithInitialSchema(t *testing.T) {
}

// testOnlineDDLStatement runs an online DDL, ALTER statement
func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy string, executeStrategy string, expectHint string) (uuid string) {
func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy string, executeStrategy string, expectHint string, skipWait bool) (uuid string) {
tableName := fmt.Sprintf("vt_onlineddl_test_%02d", 3)
sqlQuery := fmt.Sprintf(alterStatement, tableName)
if executeStrategy == "vtgate" {
Expand All @@ -426,7 +564,10 @@ func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy str
strategySetting, err := schema.ParseDDLStrategy(ddlStrategy)
assert.NoError(t, err)

if !strategySetting.Strategy.IsDirect() {
if strategySetting.Strategy.IsDirect() {
skipWait = true
}
if !skipWait {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
}
Expand Down
66 changes: 66 additions & 0 deletions go/test/endtoend/onlineddl/vttablet_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
Copyright 2021 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package onlineddl

import (
"testing"
"time"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/sqlparser"

"vitess.io/vitess/go/test/endtoend/cluster"

"github.com/stretchr/testify/require"
)

// WaitForVReplicationStatus waits for a vreplication stream to be in one of given states, or timeout
func WaitForVReplicationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...string) (status string) {

query, err := sqlparser.ParseAndBind("select workflow, state from _vt.vreplication where workflow=%a",
sqltypes.StringBindVariable(uuid),
)
require.NoError(t, err)

statusesMap := map[string]bool{}
for _, status := range expectStatuses {
statusesMap[status] = true
}
startTime := time.Now()
lastKnownStatus := ""
for time.Since(startTime) < timeout {
countMatchedShards := 0

for _, shard := range shards {
r, err := shard.Vttablets[0].VttabletProcess.QueryTablet(query, "", false)
require.NoError(t, err)

for _, row := range r.Named().Rows {
lastKnownStatus = row["state"].ToString()
if row["workflow"].ToString() == uuid && statusesMap[lastKnownStatus] {
countMatchedShards++
}
}
}
if countMatchedShards == len(shards) {
return lastKnownStatus
}
time.Sleep(1 * time.Second)
}
return lastKnownStatus
}
Loading