Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Online DDL: introducing ddl_strategy -singleton-context flag #7946

Merged
66 changes: 54 additions & 12 deletions go/test/endtoend/onlineddl/singleton/onlineddl_singleton_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams

hostname = "localhost"
keyspaceName = "ks"
cell = "zone1"
schemaChangeDirectory = ""
tableName = `stress_test`
onlineDDLStrategy = "online -singleton"
createStatement = `
hostname = "localhost"
keyspaceName = "ks"
cell = "zone1"
schemaChangeDirectory = ""
tableName = `stress_test`
onlineSingletonDDLStrategy = "online -singleton"
onlineSingletonContextDDLStrategy = "online -singleton-context"
createStatement = `
CREATE TABLE stress_test (
id bigint(20) not null,
rand_val varchar(32) null default '',
Expand All @@ -61,13 +62,19 @@ var (
alterTableThrottlingStatement = `
ALTER TABLE stress_test DROP COLUMN created_timestamp
`
multiAlterTableThrottlingStatement = `
ALTER TABLE stress_test ENGINE=InnoDB;
ALTER TABLE stress_test ENGINE=InnoDB;
ALTER TABLE stress_test ENGINE=InnoDB;
`
// A trivial statement which must succeed and does not change the schema
alterTableTrivialStatement = `
ALTER TABLE stress_test ENGINE=InnoDB
`
dropStatement = `
DROP TABLE stress_test
`
multiDropStatements = `DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t2; DROP TABLE IF EXISTS t3;`
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -145,7 +152,7 @@ func TestSchemaChange(t *testing.T) {
// CREATE
t.Run("CREATE TABLE", func(t *testing.T) {
// The table does not exist
uuid := testOnlineDDLStatement(t, createStatement, onlineDDLStrategy, "vtgate", "", "", false)
uuid := testOnlineDDLStatement(t, createStatement, onlineSingletonDDLStrategy, "vtgate", "", "", false)
uuids = append(uuids, uuid)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
checkTable(t, tableName, true)
Expand Down Expand Up @@ -202,7 +209,7 @@ func TestSchemaChange(t *testing.T) {
})

t.Run("successful online alter, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, onlineDDLStrategy, "vtgate", "hint_col", "", false)
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, onlineSingletonDDLStrategy, "vtgate", "hint_col", "", false)
uuids = append(uuids, uuid)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
Expand All @@ -217,10 +224,45 @@ func TestSchemaChange(t *testing.T) {
checkTable(t, tableName, true)
})

var throttledUUIDs []string
// singleton-context
t.Run("throttled migrations, singleton-context", func(t *testing.T) {
uuidList := testOnlineDDLStatement(t, multiAlterTableThrottlingStatement, "gh-ost -singleton-context --max-load=Threads_running=1", "vtctl", "hint_col", "", false)
throttledUUIDs = strings.Split(uuidList, "\n")
assert.Equal(t, 3, len(throttledUUIDs))
for _, uuid := range throttledUUIDs {
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning, schema.OnlineDDLStatusQueued)
}
})
t.Run("failed migrations, singleton-context", func(t *testing.T) {
_ = testOnlineDDLStatement(t, multiAlterTableThrottlingStatement, "gh-ost -singleton-context --max-load=Threads_running=1", "vtctl", "hint_col", "rejected", false)
})
t.Run("terminate throttled migrations", func(t *testing.T) {
for _, uuid := range throttledUUIDs {
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning, schema.OnlineDDLStatusQueued)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, true)
}
time.Sleep(2 * time.Second)
for _, uuid := range throttledUUIDs {
uuid = strings.TrimSpace(uuid)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled)
}
})

t.Run("successful multiple statement, singleton-context, vtctl", func(t *testing.T) {
uuidList := testOnlineDDLStatement(t, multiDropStatements, onlineSingletonContextDDLStrategy, "vtctl", "", "", false)
uuidSlice := strings.Split(uuidList, "\n")
assert.Equal(t, 3, len(uuidSlice))
for _, uuid := range uuidSlice {
uuid = strings.TrimSpace(uuid)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
}
})

//DROP

t.Run("online DROP TABLE", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, dropStatement, onlineDDLStrategy, "vtgate", "", "", false)
uuid := testOnlineDDLStatement(t, dropStatement, onlineSingletonDDLStrategy, "vtgate", "", "", false)
uuids = append(uuids, uuid)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
checkTable(t, tableName, false)
Expand Down Expand Up @@ -283,15 +325,15 @@ func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy str
func testRevertMigration(t *testing.T, revertUUID string, executeStrategy string, expectError string, skipWait bool) (uuid string) {
revertQuery := fmt.Sprintf("revert vitess_migration '%s'", revertUUID)
if executeStrategy == "vtgate" {
result := onlineddl.VtgateExecDDL(t, &vtParams, onlineDDLStrategy, revertQuery, expectError)
result := onlineddl.VtgateExecDDL(t, &vtParams, onlineSingletonDDLStrategy, revertQuery, expectError)
if result != nil {
row := result.Named().Row()
if row != nil {
uuid = row.AsString("uuid", "")
}
}
} else {
output, err := clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, revertQuery, cluster.VtctlClientParams{DDLStrategy: onlineDDLStrategy, SkipPreflight: true})
output, err := clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, revertQuery, cluster.VtctlClientParams{DDLStrategy: onlineSingletonDDLStrategy, SkipPreflight: true})
if expectError == "" {
assert.NoError(t, err)
uuid = output
Expand Down
12 changes: 9 additions & 3 deletions go/test/endtoend/onlineddl/vtgate_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,22 @@ func CheckCancelAllMigrations(t *testing.T, vtParams *mysql.ConnParams, expectCo
}

// CheckMigrationStatus verifies that the migration indicated by given UUID has the given expected status
func CheckMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, expectStatus schema.OnlineDDLStatus) {
func CheckMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, expectStatuses ...schema.OnlineDDLStatus) {
showQuery := fmt.Sprintf("show vitess_migrations like '%s'", uuid)
r := VtgateExecQuery(t, vtParams, showQuery, "")
fmt.Printf("# output for `%s`:\n", showQuery)
PrintQueryResult(os.Stdout, r)

count := 0
for _, row := range r.Named().Rows {
if row["migration_uuid"].ToString() == uuid && row["migration_status"].ToString() == string(expectStatus) {
count++
if row["migration_uuid"].ToString() != uuid {
continue
}
for _, expectStatus := range expectStatuses {
if row["migration_status"].ToString() == string(expectStatus) {
count++
break
}
}
}
assert.Equal(t, len(shards), count)
Expand Down
15 changes: 11 additions & 4 deletions go/vt/schema/ddl_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ var (
)

const (
declarativeFlag = "declarative"
skipTopoFlag = "skip-topo"
singletonFlag = "singleton"
declarativeFlag = "declarative"
skipTopoFlag = "skip-topo"
singletonFlag = "singleton"
singletonContextFlag = "singleton-context"
)

// DDLStrategy suggests how an ALTER TABLE should run (e.g. "direct", "online", "gh-ost" or "pt-osc")
Expand Down Expand Up @@ -123,6 +124,11 @@ func (setting *DDLStrategySetting) IsSingleton() bool {
return setting.hasFlag(singletonFlag)
}

// IsSingletonContext checks if strategy options include -singleton-context
func (setting *DDLStrategySetting) IsSingletonContext() bool {
return setting.hasFlag(singletonContextFlag)
}

// RuntimeOptions returns the options used as runtime flags for given strategy, removing any internal hint options
func (setting *DDLStrategySetting) RuntimeOptions() []string {
opts, _ := shlex.Split(setting.Options)
Expand All @@ -132,6 +138,7 @@ func (setting *DDLStrategySetting) RuntimeOptions() []string {
case isFlag(opt, declarativeFlag):
case isFlag(opt, skipTopoFlag):
case isFlag(opt, singletonFlag):
case isFlag(opt, singletonContextFlag):
default:
validOpts = append(validOpts, opt)
}
Expand All @@ -142,7 +149,7 @@ func (setting *DDLStrategySetting) RuntimeOptions() []string {
// IsSkipTopo suggests that DDL should apply to tables bypassing global topo request
func (setting *DDLStrategySetting) IsSkipTopo() bool {
switch {
case setting.IsSingleton():
case setting.IsSingleton(), setting.IsSingletonContext():
return true
case setting.hasFlag(skipTopoFlag):
return true
Expand Down
4 changes: 3 additions & 1 deletion go/vt/schemamanager/tablet_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ func (exec *TabletExecutor) executeSQL(ctx context.Context, sql string, execResu
for _, onlineDDL := range onlineDDLs {
if exec.ddlStrategySetting.IsSkipTopo() {
exec.executeOnAllTablets(ctx, execResult, onlineDDL.SQL, true)
exec.wr.Logger().Printf("%s\n", onlineDDL.UUID)
if len(execResult.SuccessShards) > 0 {
exec.wr.Logger().Printf("%s\n", onlineDDL.UUID)
}
} else {
exec.executeOnlineDDL(ctx, execResult, onlineDDL)
}
Expand Down
51 changes: 36 additions & 15 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1167,16 +1167,17 @@ func (e *Executor) readMigration(ctx context.Context, uuid string) (onlineDDL *s
return nil, nil, ErrMigrationNotFound
}
onlineDDL = &schema.OnlineDDL{
Keyspace: row["keyspace"].ToString(),
Table: row["mysql_table"].ToString(),
Schema: row["mysql_schema"].ToString(),
SQL: row["migration_statement"].ToString(),
UUID: row["migration_uuid"].ToString(),
Strategy: schema.DDLStrategy(row["strategy"].ToString()),
Options: row["options"].ToString(),
Status: schema.OnlineDDLStatus(row["migration_status"].ToString()),
Retries: row.AsInt64("retries", 0),
TabletAlias: row["tablet"].ToString(),
Keyspace: row["keyspace"].ToString(),
Table: row["mysql_table"].ToString(),
Schema: row["mysql_schema"].ToString(),
SQL: row["migration_statement"].ToString(),
UUID: row["migration_uuid"].ToString(),
Strategy: schema.DDLStrategy(row["strategy"].ToString()),
Options: row["options"].ToString(),
Status: schema.OnlineDDLStatus(row["migration_status"].ToString()),
Retries: row.AsInt64("retries", 0),
TabletAlias: row["tablet"].ToString(),
RequestContext: row["migration_context"].ToString(),
}
return onlineDDL, row, nil
}
Expand Down Expand Up @@ -2543,16 +2544,36 @@ func (e *Executor) SubmitMigration(
return nil, err
}

if onlineDDL.StrategySetting().IsSingleton() {
if err := e.initSchema(ctx); err != nil {
log.Error(err)
return nil, err
}

if onlineDDL.StrategySetting().IsSingleton() || onlineDDL.StrategySetting().IsSingletonContext() {
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()

uuids, err := e.readPendingMigrationsUUIDs(ctx)
pendingUUIDs, err := e.readPendingMigrationsUUIDs(ctx)
if err != nil {
return result, err
return nil, err
}
if len(uuids) > 0 {
return result, fmt.Errorf("singleton migration rejected: found pending migrations [%s]", strings.Join(uuids, ", "))
switch {
case onlineDDL.StrategySetting().IsSingleton():
// We will reject this migration if there's any pending migration
if len(pendingUUIDs) > 0 {
return result, fmt.Errorf("singleton migration rejected: found pending migrations [%s]", strings.Join(pendingUUIDs, ", "))
Copy link
Collaborator

@systay systay Apr 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vterrors should be used, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right 😛

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}
case onlineDDL.StrategySetting().IsSingletonContext():
// We will reject this migration if there's any pending migration within a different context
for _, pendingUUID := range pendingUUIDs {
pendingOnlineDDL, _, err := e.readMigration(ctx, pendingUUID)
if err != nil {
return nil, err
}
if pendingOnlineDDL.RequestContext != onlineDDL.RequestContext {
return nil, fmt.Errorf("singleton migration rejected: found pending migration: %s in different context: %s", pendingUUID, pendingOnlineDDL.RequestContext)
}
}
}
}

Expand Down
6 changes: 4 additions & 2 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ const (
retries,
ddl_action,
artifacts,
tablet
tablet,
migration_context
FROM _vt.schema_migrations
WHERE
migration_uuid=%a
Expand All @@ -273,7 +274,8 @@ const (
retries,
ddl_action,
artifacts,
tablet
tablet,
migration_context
FROM _vt.schema_migrations
WHERE
migration_status='ready'
Expand Down