diff --git a/go/test/endtoend/onlineddl/singleton/onlineddl_singleton_test.go b/go/test/endtoend/onlineddl/singleton/onlineddl_singleton_test.go index 71db3ff4fe7..84a0ed91f38 100644 --- a/go/test/endtoend/onlineddl/singleton/onlineddl_singleton_test.go +++ b/go/test/endtoend/onlineddl/singleton/onlineddl_singleton_test.go @@ -39,13 +39,14 @@ var ( clusterInstance *cluster.LocalProcessCluster vtParams mysql.ConnParams - hostname = "localhost" - keyspaceName = "ks" - cell = "zone1" - schemaChangeDirectory = "" - tableName = `stress_test` - onlineDDLStrategy = "online -singleton" - createStatement = ` + hostname = "localhost" + keyspaceName = "ks" + cell = "zone1" + schemaChangeDirectory = "" + tableName = `stress_test` + onlineSingletonDDLStrategy = "online -singleton" + onlineSingletonContextDDLStrategy = "online -singleton-context" + createStatement = ` CREATE TABLE stress_test ( id bigint(20) not null, rand_val varchar(32) null default '', @@ -61,6 +62,11 @@ var ( alterTableThrottlingStatement = ` ALTER TABLE stress_test DROP COLUMN created_timestamp ` + multiAlterTableThrottlingStatement = ` + ALTER TABLE stress_test ENGINE=InnoDB; + ALTER TABLE stress_test ENGINE=InnoDB; + ALTER TABLE stress_test ENGINE=InnoDB; + ` // A trivial statement which must succeed and does not change the schema alterTableTrivialStatement = ` ALTER TABLE stress_test ENGINE=InnoDB @@ -68,6 +74,7 @@ var ( dropStatement = ` DROP TABLE stress_test ` + multiDropStatements = `DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t2; DROP TABLE IF EXISTS t3;` ) func TestMain(m *testing.M) { @@ -145,7 +152,7 @@ func TestSchemaChange(t *testing.T) { // CREATE t.Run("CREATE TABLE", func(t *testing.T) { // The table does not exist - uuid := testOnlineDDLStatement(t, createStatement, onlineDDLStrategy, "vtgate", "", "", false) + uuid := testOnlineDDLStatement(t, createStatement, onlineSingletonDDLStrategy, "vtgate", "", "", false) uuids = append(uuids, uuid) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) checkTable(t, tableName, true) @@ -202,7 +209,7 @@ func TestSchemaChange(t *testing.T) { }) t.Run("successful online alter, vtgate", func(t *testing.T) { - uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, onlineDDLStrategy, "vtgate", "hint_col", "", false) + uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, onlineSingletonDDLStrategy, "vtgate", "hint_col", "", false) uuids = append(uuids, uuid) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false) @@ -217,10 +224,45 @@ func TestSchemaChange(t *testing.T) { checkTable(t, tableName, true) }) + var throttledUUIDs []string + // singleton-context + t.Run("throttled migrations, singleton-context", func(t *testing.T) { + uuidList := testOnlineDDLStatement(t, multiAlterTableThrottlingStatement, "gh-ost -singleton-context --max-load=Threads_running=1", "vtctl", "hint_col", "", false) + throttledUUIDs = strings.Split(uuidList, "\n") + assert.Equal(t, 3, len(throttledUUIDs)) + for _, uuid := range throttledUUIDs { + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning, schema.OnlineDDLStatusQueued) + } + }) + t.Run("failed migrations, singleton-context", func(t *testing.T) { + _ = testOnlineDDLStatement(t, multiAlterTableThrottlingStatement, "gh-ost -singleton-context --max-load=Threads_running=1", "vtctl", "hint_col", "rejected", false) + }) + t.Run("terminate throttled migrations", func(t *testing.T) { + for _, uuid := range throttledUUIDs { + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning, schema.OnlineDDLStatusQueued) + onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, true) + } + time.Sleep(2 * time.Second) + for _, uuid := range throttledUUIDs { + uuid = strings.TrimSpace(uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled) + } + }) + + t.Run("successful multiple statement, singleton-context, vtctl", func(t *testing.T) { + uuidList := testOnlineDDLStatement(t, multiDropStatements, onlineSingletonContextDDLStrategy, "vtctl", "", "", false) + uuidSlice := strings.Split(uuidList, "\n") + assert.Equal(t, 3, len(uuidSlice)) + for _, uuid := range uuidSlice { + uuid = strings.TrimSpace(uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + } + }) + //DROP t.Run("online DROP TABLE", func(t *testing.T) { - uuid := testOnlineDDLStatement(t, dropStatement, onlineDDLStrategy, "vtgate", "", "", false) + uuid := testOnlineDDLStatement(t, dropStatement, onlineSingletonDDLStrategy, "vtgate", "", "", false) uuids = append(uuids, uuid) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) checkTable(t, tableName, false) @@ -283,7 +325,7 @@ func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy str func testRevertMigration(t *testing.T, revertUUID string, executeStrategy string, expectError string, skipWait bool) (uuid string) { revertQuery := fmt.Sprintf("revert vitess_migration '%s'", revertUUID) if executeStrategy == "vtgate" { - result := onlineddl.VtgateExecDDL(t, &vtParams, onlineDDLStrategy, revertQuery, expectError) + result := onlineddl.VtgateExecDDL(t, &vtParams, onlineSingletonDDLStrategy, revertQuery, expectError) if result != nil { row := result.Named().Row() if row != nil { @@ -291,7 +333,7 @@ func testRevertMigration(t *testing.T, revertUUID string, executeStrategy string } } } else { - output, err := clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, revertQuery, cluster.VtctlClientParams{DDLStrategy: onlineDDLStrategy, SkipPreflight: true}) + output, err := clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, revertQuery, cluster.VtctlClientParams{DDLStrategy: onlineSingletonDDLStrategy, SkipPreflight: true}) if expectError == "" { assert.NoError(t, err) uuid = output diff --git a/go/test/endtoend/onlineddl/vtgate_util.go b/go/test/endtoend/onlineddl/vtgate_util.go index 833f155ec8f..0c1f3db23e6 100644 --- a/go/test/endtoend/onlineddl/vtgate_util.go +++ b/go/test/endtoend/onlineddl/vtgate_util.go @@ -107,7 +107,7 @@ func CheckCancelAllMigrations(t *testing.T, vtParams *mysql.ConnParams, expectCo } // CheckMigrationStatus verifies that the migration indicated by given UUID has the given expected status -func CheckMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, expectStatus schema.OnlineDDLStatus) { +func CheckMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, expectStatuses ...schema.OnlineDDLStatus) { showQuery := fmt.Sprintf("show vitess_migrations like '%s'", uuid) r := VtgateExecQuery(t, vtParams, showQuery, "") fmt.Printf("# output for `%s`:\n", showQuery) @@ -115,8 +115,14 @@ func CheckMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []clu count := 0 for _, row := range r.Named().Rows { - if row["migration_uuid"].ToString() == uuid && row["migration_status"].ToString() == string(expectStatus) { - count++ + if row["migration_uuid"].ToString() != uuid { + continue + } + for _, expectStatus := range expectStatuses { + if row["migration_status"].ToString() == string(expectStatus) { + count++ + break + } } } assert.Equal(t, len(shards), count) diff --git a/go/vt/schema/ddl_strategy.go b/go/vt/schema/ddl_strategy.go index 0b2fb8fada0..a05d5eabc52 100644 --- a/go/vt/schema/ddl_strategy.go +++ b/go/vt/schema/ddl_strategy.go @@ -28,9 +28,10 @@ var ( ) const ( - declarativeFlag = "declarative" - skipTopoFlag = "skip-topo" - singletonFlag = "singleton" + declarativeFlag = "declarative" + skipTopoFlag = "skip-topo" + singletonFlag = "singleton" + singletonContextFlag = "singleton-context" ) // DDLStrategy suggests how an ALTER TABLE should run (e.g. "direct", "online", "gh-ost" or "pt-osc") @@ -123,6 +124,11 @@ func (setting *DDLStrategySetting) IsSingleton() bool { return setting.hasFlag(singletonFlag) } +// IsSingletonContext checks if strategy options include -singleton-context +func (setting *DDLStrategySetting) IsSingletonContext() bool { + return setting.hasFlag(singletonContextFlag) +} + // RuntimeOptions returns the options used as runtime flags for given strategy, removing any internal hint options func (setting *DDLStrategySetting) RuntimeOptions() []string { opts, _ := shlex.Split(setting.Options) @@ -132,6 +138,7 @@ func (setting *DDLStrategySetting) RuntimeOptions() []string { case isFlag(opt, declarativeFlag): case isFlag(opt, skipTopoFlag): case isFlag(opt, singletonFlag): + case isFlag(opt, singletonContextFlag): default: validOpts = append(validOpts, opt) } @@ -142,7 +149,7 @@ func (setting *DDLStrategySetting) RuntimeOptions() []string { // IsSkipTopo suggests that DDL should apply to tables bypassing global topo request func (setting *DDLStrategySetting) IsSkipTopo() bool { switch { - case setting.IsSingleton(): + case setting.IsSingleton(), setting.IsSingletonContext(): return true case setting.hasFlag(skipTopoFlag): return true diff --git a/go/vt/schemamanager/tablet_executor.go b/go/vt/schemamanager/tablet_executor.go index be7354de8c9..a6419bb4c0a 100644 --- a/go/vt/schemamanager/tablet_executor.go +++ b/go/vt/schemamanager/tablet_executor.go @@ -250,7 +250,9 @@ func (exec *TabletExecutor) executeSQL(ctx context.Context, sql string, execResu for _, onlineDDL := range onlineDDLs { if exec.ddlStrategySetting.IsSkipTopo() { exec.executeOnAllTablets(ctx, execResult, onlineDDL.SQL, true) - exec.wr.Logger().Printf("%s\n", onlineDDL.UUID) + if len(execResult.SuccessShards) > 0 { + exec.wr.Logger().Printf("%s\n", onlineDDL.UUID) + } } else { exec.executeOnlineDDL(ctx, execResult, onlineDDL) } diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 5e7d806b3a6..ab3db8bb475 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -1167,16 +1167,17 @@ func (e *Executor) readMigration(ctx context.Context, uuid string) (onlineDDL *s return nil, nil, ErrMigrationNotFound } onlineDDL = &schema.OnlineDDL{ - Keyspace: row["keyspace"].ToString(), - Table: row["mysql_table"].ToString(), - Schema: row["mysql_schema"].ToString(), - SQL: row["migration_statement"].ToString(), - UUID: row["migration_uuid"].ToString(), - Strategy: schema.DDLStrategy(row["strategy"].ToString()), - Options: row["options"].ToString(), - Status: schema.OnlineDDLStatus(row["migration_status"].ToString()), - Retries: row.AsInt64("retries", 0), - TabletAlias: row["tablet"].ToString(), + Keyspace: row["keyspace"].ToString(), + Table: row["mysql_table"].ToString(), + Schema: row["mysql_schema"].ToString(), + SQL: row["migration_statement"].ToString(), + UUID: row["migration_uuid"].ToString(), + Strategy: schema.DDLStrategy(row["strategy"].ToString()), + Options: row["options"].ToString(), + Status: schema.OnlineDDLStatus(row["migration_status"].ToString()), + Retries: row.AsInt64("retries", 0), + TabletAlias: row["tablet"].ToString(), + RequestContext: row["migration_context"].ToString(), } return onlineDDL, row, nil } @@ -2518,7 +2519,7 @@ func (e *Executor) SubmitMigration( onlineDDL, err := schema.OnlineDDLFromCommentedStatement(stmt) if err != nil { - return nil, fmt.Errorf("Error submitting migration %s: %v", sqlparser.String(stmt), err) + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Error submitting migration %s: %v", sqlparser.String(stmt), err) } _, actionStr, err := onlineDDL.GetActionStr() if err != nil { @@ -2543,16 +2544,36 @@ func (e *Executor) SubmitMigration( return nil, err } - if onlineDDL.StrategySetting().IsSingleton() { + if err := e.initSchema(ctx); err != nil { + log.Error(err) + return nil, err + } + + if onlineDDL.StrategySetting().IsSingleton() || onlineDDL.StrategySetting().IsSingletonContext() { e.migrationMutex.Lock() defer e.migrationMutex.Unlock() - uuids, err := e.readPendingMigrationsUUIDs(ctx) + pendingUUIDs, err := e.readPendingMigrationsUUIDs(ctx) if err != nil { - return result, err + return nil, err } - if len(uuids) > 0 { - return result, fmt.Errorf("singleton migration rejected: found pending migrations [%s]", strings.Join(uuids, ", ")) + switch { + case onlineDDL.StrategySetting().IsSingleton(): + // We will reject this migration if there's any pending migration + if len(pendingUUIDs) > 0 { + return result, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "singleton migration rejected: found pending migrations [%s]", strings.Join(pendingUUIDs, ", ")) + } + case onlineDDL.StrategySetting().IsSingletonContext(): + // We will reject this migration if there's any pending migration within a different context + for _, pendingUUID := range pendingUUIDs { + pendingOnlineDDL, _, err := e.readMigration(ctx, pendingUUID) + if err != nil { + return nil, err + } + if pendingOnlineDDL.RequestContext != onlineDDL.RequestContext { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "singleton migration rejected: found pending migration: %s in different context: %s", pendingUUID, pendingOnlineDDL.RequestContext) + } + } } } diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index 90adc05cd76..05717e10c94 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -248,7 +248,8 @@ const ( retries, ddl_action, artifacts, - tablet + tablet, + migration_context FROM _vt.schema_migrations WHERE migration_uuid=%a @@ -273,7 +274,8 @@ const ( retries, ddl_action, artifacts, - tablet + tablet, + migration_context FROM _vt.schema_migrations WHERE migration_status='ready'