From ecf0dba4a32c67f53d9589c3bfccaae1e602adf0 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 9 Aug 2021 10:38:14 +0300 Subject: [PATCH 1/8] OnlineDDL: better scheduling/cancellation logic Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/onlineddl/executor.go | 187 +++++++++++++++------------ 1 file changed, 106 insertions(+), 81 deletions(-) diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 167479a4865..56b936e9078 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -150,19 +150,25 @@ type Executor struct { shard string dbName string - initMutex sync.Mutex - migrationMutex sync.Mutex - vreplMigrationRunning int64 - ghostMigrationRunning int64 - ptoscMigrationRunning int64 - lastMigrationUUID string - tickReentranceFlag int64 + initMutex sync.Mutex + migrationMutex sync.Mutex + ownedRunningMigrations sync.Map // UUIDs owned by this executor + tickReentranceFlag int64 ticks *timer.Timer isOpen bool schemaInitialized bool } +type cancellableMigration struct { + uuid string + message string +} + +func newCancellableMigration(uuid string, message string) *cancellableMigration { + return &cancellableMigration{uuid: uuid, message: message} +} + // GhostBinaryFileName returns the full path+name of the gh-ost binary func GhostBinaryFileName() (fileName string, isOverride bool) { if *ghostOverridePath != "" { @@ -296,16 +302,13 @@ func (e *Executor) triggerNextCheckInterval() { // isAnyMigrationRunning sees if there's any migration running right now func (e *Executor) isAnyMigrationRunning() bool { - if atomic.LoadInt64(&e.vreplMigrationRunning) > 0 { - return true - } - if atomic.LoadInt64(&e.ghostMigrationRunning) > 0 { - return true - } - if atomic.LoadInt64(&e.ptoscMigrationRunning) > 0 { - return true - } - return false + migrationFound := false + + e.ownedRunningMigrations.Range(func(_, _ interface{}) bool { + migrationFound = true + return false // stop iteration + }) + return migrationFound } func (e *Executor) ghostPanicFlagFileName(uuid string) string { @@ -675,6 +678,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er } } } + e.ownedRunningMigrations.Delete(onlineDDL.UUID) go func() { // Tables are swapped! Let's take the opportunity to ReloadSchema now @@ -781,8 +785,7 @@ func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schem } defer conn.Close() - atomic.StoreInt64(&e.vreplMigrationRunning, 1) - e.lastMigrationUUID = onlineDDL.UUID + e.ownedRunningMigrations.Store(onlineDDL.UUID, true) if err := e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusRunning, false, progressPctStarted, etaSecondsUnknown, rowsCopiedUnknown); err != nil { return err } @@ -1020,11 +1023,10 @@ exit $exit_code return err } - atomic.StoreInt64(&e.ghostMigrationRunning, 1) - e.lastMigrationUUID = onlineDDL.UUID + e.ownedRunningMigrations.Store(onlineDDL.UUID, true) go func() error { - defer atomic.StoreInt64(&e.ghostMigrationRunning, 0) + defer e.ownedRunningMigrations.Delete(onlineDDL.UUID) defer e.dropOnlineDDLUser(ctx) defer e.gcArtifacts(ctx) @@ -1241,11 +1243,10 @@ export MYSQL_PWD return err } - atomic.StoreInt64(&e.ptoscMigrationRunning, 1) - e.lastMigrationUUID = onlineDDL.UUID + e.ownedRunningMigrations.Store(onlineDDL.UUID, true) go func() error { - defer atomic.StoreInt64(&e.ptoscMigrationRunning, 0) + defer e.ownedRunningMigrations.Delete(onlineDDL.UUID) defer e.dropOnlineDDLUser(ctx) defer e.gcArtifacts(ctx) @@ -1329,7 +1330,12 @@ func (e *Executor) readPendingMigrationsUUIDs(ctx context.Context) (uuids []stri } // terminateMigration attempts to interrupt and hard-stop a running migration -func (e *Executor) terminateMigration(ctx context.Context, onlineDDL *schema.OnlineDDL, lastMigrationUUID string) (foundRunning bool, err error) { +func (e *Executor) terminateMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) (foundRunning bool, err error) { + // It's possible the killing the migratoin fails for whatever reason, in which case + // the logic will retry killing it later on. + // Whatever happens in this function, this executor stops owning the given migration. + defer e.ownedRunningMigrations.Delete(onlineDDL.UUID) + switch onlineDDL.Strategy { case schema.DDLStrategyOnline: // migration could have started by a different tablet. We need to actively verify if it is running @@ -1358,24 +1364,22 @@ func (e *Executor) terminateMigration(ctx context.Context, onlineDDL *schema.Onl } } case schema.DDLStrategyGhost: - if atomic.LoadInt64(&e.ghostMigrationRunning) > 0 { - // double check: is the running migration the very same one we wish to cancel? - if onlineDDL.UUID == lastMigrationUUID { - // assuming all goes well in next steps, we can already report that there has indeed been a migration - foundRunning = true - } + // double check: is the running migration the very same one we wish to cancel? + if _, ok := e.ownedRunningMigrations.Load(onlineDDL.UUID); ok { + // assuming all goes well in next steps, we can already report that there has indeed been a migration + foundRunning = true } // gh-ost migrations are easy to kill: just touch their specific panic flag files. We trust // gh-ost to terminate. No need to KILL it. And there's no trigger cleanup. if err := e.createGhostPanicFlagFile(onlineDDL.UUID); err != nil { - return foundRunning, fmt.Errorf("Error cancelling migration, flag file error: %+v", err) + return foundRunning, fmt.Errorf("Error cancelling gh-ost migration, flag file error: %+v", err) } } return foundRunning, nil } // CancelMigration attempts to abort a scheduled or a running migration -func (e *Executor) CancelMigration(ctx context.Context, uuid string, terminateRunningMigration bool, message string) (result *sqltypes.Result, err error) { +func (e *Executor) CancelMigration(ctx context.Context, uuid string, message string) (result *sqltypes.Result, err error) { if !e.isOpen { return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "online ddl is disabled") } @@ -1400,30 +1404,27 @@ func (e *Executor) CancelMigration(ctx context.Context, uuid string, terminateRu rowsAffected = 1 } - if terminateRunningMigration { - migrationFound, err := e.terminateMigration(ctx, onlineDDL, e.lastMigrationUUID) - defer e.updateMigrationMessage(ctx, onlineDDL.UUID, message) + migrationFound, err := e.terminateMigration(ctx, onlineDDL) + defer e.updateMigrationMessage(ctx, onlineDDL.UUID, message) - if migrationFound { - rowsAffected = 1 - } - if err != nil { - return result, err - } + if migrationFound { + rowsAffected = 1 + } + if err != nil { + return result, err } result = &sqltypes.Result{ RowsAffected: rowsAffected, } - return result, nil } // cancelMigrations attempts to abort a list of migrations -func (e *Executor) cancelMigrations(ctx context.Context, uuids []string, message string) (err error) { - for _, uuid := range uuids { - log.Infof("cancelMigrations: cancelling %s", uuid) - if _, err := e.CancelMigration(ctx, uuid, true, message); err != nil { +func (e *Executor) cancelMigrations(ctx context.Context, cancellable []*cancellableMigration) (err error) { + for _, migration := range cancellable { + log.Infof("cancelMigrations: cancelling %s", migration.uuid) + if _, err := e.CancelMigration(ctx, migration.uuid, migration.message); err != nil { return err } } @@ -1445,7 +1446,7 @@ func (e *Executor) CancelPendingMigrations(ctx context.Context, message string) result = &sqltypes.Result{} for _, uuid := range uuids { log.Infof("CancelPendingMigrations: cancelling %s", uuid) - res, err := e.CancelMigration(ctx, uuid, true, message) + res, err := e.CancelMigration(ctx, uuid, message) if err != nil { return result, err } @@ -2223,7 +2224,7 @@ func (e *Executor) isVReplMigrationRunning(ctx context.Context, uuid string) (is // reviewRunningMigrations iterates migrations in 'running' state. Normally there's only one running, which was // spawned by this tablet; but vreplication migrations could also resume from failure. -func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning int, cancellable []string, err error) { +func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning int, cancellable []*cancellableMigration, err error) { e.migrationMutex.Lock() defer e.migrationMutex.Unlock() @@ -2231,14 +2232,15 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i if err != nil { return countRunnning, cancellable, err } - // we identify running vreplication migrations in this function - atomic.StoreInt64(&e.vreplMigrationRunning, 0) + uuidsFoundRunning := map[string]bool{} for _, row := range r.Named().Rows { uuid := row["migration_uuid"].ToString() strategy := schema.DDLStrategy(row["strategy"].ToString()) strategySettings := schema.NewDDLStrategySetting(strategy, row["options"].ToString()) elapsedSeconds := row.AsInt64("elapsed_seconds", 0) + uuidsFoundRunning[uuid] = true + switch strategy { case schema.DDLStrategyOnline: { @@ -2253,9 +2255,10 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i } if running { // This VRepl migration may have started from outside this tablet, so - // vreplMigrationRunning could be zero. Whatever the case is, we're under - // migrationMutex lock and it's now safe to ensure vreplMigrationRunning is 1 - atomic.StoreInt64(&e.vreplMigrationRunning, 1) + // this executor may not own the migration _yet_. We make sure to own it. + // VReplication migrations are unique in this respect: we are able to complete + // a vreplicaiton migration started by another tablet. + e.ownedRunningMigrations.Store(uuid, true) _ = e.updateMigrationTimestamp(ctx, "liveness_timestamp", uuid) _ = e.updateRowsCopied(ctx, uuid, s.rowsCopied) @@ -2291,30 +2294,48 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i if running { _ = e.updateMigrationTimestamp(ctx, "liveness_timestamp", uuid) } - if uuid != e.lastMigrationUUID { - // This executor can only spawn one migration at a time. And that - // migration is identified by e.lastMigrationUUID. - // If we find a _running_ migration that does not have this UUID, it _must_ - // mean the migration was started by a former vttablet (ie vttablet crashed and restarted) - cancellable = append(cancellable, uuid) + if _, ok := e.ownedRunningMigrations.Load(uuid); !ok { + // Ummm, the migration is running but we don't own it. This means the migration + // is rogue. Maybe executed by another tablet. Anyway, if we don't own it, we can't + // complete the migration. Even if it runs, the logic around announcing it as complete + // is missing. So we may as well cancel it. + message := fmt.Sprintf("cancelling a pt-osc running migration %s which is not owned (not started, or is assumed to be terminated) by this executor", uuid) + cancellable = append(cancellable, newCancellableMigration(uuid, message)) + } + } + case schema.DDLStrategyGhost: + { + if _, ok := e.ownedRunningMigrations.Load(uuid); !ok { + // Ummm, the migration is running but we don't own it. This means the migration + // is rogue. Maybe executed by another tablet. Anyway, if we don't own it, we can't + // complete the migration. Even if it runs, the logic around announcing it as complete + // is missing. So we may as well cancel it. + message := fmt.Sprintf("cancelling a gh-ost running migration %s which is not owned (not started, or is assumed to be terminated) by this executor", uuid) + cancellable = append(cancellable, newCancellableMigration(uuid, message)) } } } countRunnning++ - - if uuid != e.lastMigrationUUID { - // This executor can only run one migration at a time. And that - // migration is identified by e.lastMigrationUUID. - // If we find a _running_ migration that does not have this UUID, it _must_ - // mean the migration was started by a former vttablet (ie vttablet crashed and restarted) - cancellable = append(cancellable, uuid) - } } + { + // now, let's look at UUIDs we own and _think_ should be running, and see which of tham _isn't_ actually running... + e.ownedRunningMigrations.Range(func(k, _ interface{}) bool { + uuid, ok := k.(string) + if !ok { + return true + } + if !uuidsFoundRunning[uuid] { + e.ownedRunningMigrations.Delete(uuid) + } + return true + }) + } + return countRunnning, cancellable, err } // reviewStaleMigrations marks as 'failed' migrations whose status is 'running' but which have -// shown no liveness in past X minutes +// shown no liveness in past X minutes. It also attempts to terminate them func (e *Executor) reviewStaleMigrations(ctx context.Context) error { e.migrationMutex.Lock() defer e.migrationMutex.Unlock() @@ -2336,27 +2357,31 @@ func (e *Executor) reviewStaleMigrations(ctx context.Context) error { if err != nil { return err } - // If this is pt-osc migration, then it may have crashed without having its triggers cleaned up. - // make sure to drop them. - if onlineDDL.Strategy == schema.DDLStrategyPTOSC { - if err := e.dropPTOSCMigrationTriggers(ctx, onlineDDL); err != nil { - return err - } - } + message := fmt.Sprintf("stale migration %s: found running but indicates no liveness", onlineDDL.UUID) if onlineDDL.TabletAlias != e.TabletAliasString() { // This means another tablet started the migration, and the migration has failed due to the tablet failure (e.g. master failover) if err := e.updateTabletFailure(ctx, onlineDDL.UUID); err != nil { return err } + message = fmt.Sprintf("%s; executed by different tablet %s", message, onlineDDL.TabletAlias) + } + if _, err := e.terminateMigration(ctx, onlineDDL); err != nil { + message = fmt.Sprintf("error terminating migration (%v): %v", message, err) + e.updateMigrationMessage(ctx, onlineDDL.UUID, message) + continue // we still want to handle rest of migrations + } + if err := e.updateMigrationMessage(ctx, onlineDDL.UUID, message); err != nil { + return err } if err := e.updateMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusFailed); err != nil { return err } _ = e.updateMigrationStartedTimestamp(ctx, uuid) - if err := e.updateMigrationTimestamp(ctx, "completed_timestamp", uuid); err != nil { + // Because the migration is stale, it may not update completed_timestamp. It is essential to set completed_timestamp + // as this is then used when cleaning artifacts + if err := e.updateMigrationTimestamp(ctx, "completed_timestamp", onlineDDL.UUID); err != nil { return err } - _ = e.updateMigrationMessage(ctx, onlineDDL.UUID, "stale migration") } return nil @@ -2488,7 +2513,7 @@ func (e *Executor) onMigrationCheckTick() { } if _, cancellable, err := e.reviewRunningMigrations(ctx); err != nil { log.Error(err) - } else if err := e.cancelMigrations(ctx, cancellable, "auto cancel"); err != nil { + } else if err := e.cancelMigrations(ctx, cancellable); err != nil { log.Error(err) } if err := e.reviewStaleMigrations(ctx); err != nil { @@ -3010,7 +3035,7 @@ func (e *Executor) VExec(ctx context.Context, vx *vexec.TabletVExec) (qr *queryp if !schema.IsOnlineDDLUUID(uuid) { return nil, fmt.Errorf("Not an Online DDL UUID: %s", uuid) } - return response(e.CancelMigration(ctx, uuid, true, "cancel by user")) + return response(e.CancelMigration(ctx, uuid, "cancel by user")) case cancelAllMigrationHint: uuid, _ := vx.ColumnStringVal(vx.WhereCols, "migration_uuid") if uuid != "" { From 5d1f23727743be6712110d2950cb8ebdb1942bda Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 9 Aug 2021 10:57:21 +0300 Subject: [PATCH 2/8] a bit more comments Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/onlineddl/executor.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 56b936e9078..3937fa41fc9 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -150,9 +150,15 @@ type Executor struct { shard string dbName string - initMutex sync.Mutex - migrationMutex sync.Mutex - ownedRunningMigrations sync.Map // UUIDs owned by this executor + initMutex sync.Mutex + migrationMutex sync.Mutex + // ownedRunningMigrations lists UUIDs owned by this executor (consider this a map[string]bool) + // A UUID listed in this map stands for a migration that is executing, and that this executor can control. + // Migrations found to be running which are not listed in this map will either: + // - be adopted by this executor (possible for vreplication migrations), or + // - be terminated (example: pt-osc migration gone rogue, process still running even as the migration failed) + // The Executor auto-reviews the map and cleans up migrations thought to be running which are not running. + ownedRunningMigrations sync.Map tickReentranceFlag int64 ticks *timer.Timer From 9a27d024d41184209b41284aacce0e255a5dfa9c Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 9 Aug 2021 13:19:30 +0300 Subject: [PATCH 3/8] empty commit to kick CI Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> From 3ec69000f3840fa70e7e9aa33fba9e09e7250c5e Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 9 Aug 2021 14:01:46 +0300 Subject: [PATCH 4/8] fix call to CancelMigration Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/tabletserver/query_executor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 6a0ae556a5d..bf35c2467e9 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -838,7 +838,7 @@ func (qre *QueryExecutor) execAlterMigration() (*sqltypes.Result, error) { case sqlparser.CompleteMigrationType: return nil, vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "ALTER VITESS_MIGRATION COMPLETE is not implemented yet") case sqlparser.CancelMigrationType: - return qre.tsv.onlineDDLExecutor.CancelMigration(qre.ctx, alterMigration.UUID, true, "CANCEL issued by user") + return qre.tsv.onlineDDLExecutor.CancelMigration(qre.ctx, alterMigration.UUID, "CANCEL issued by user") case sqlparser.CancelAllMigrationType: return qre.tsv.onlineDDLExecutor.CancelPendingMigrations(qre.ctx, "CANCEL ALL issued by user") } From d58b02c38dd5f599a02d8bb94ddcd206d0bcfda8 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 11 Aug 2021 08:59:26 +0300 Subject: [PATCH 5/8] updateMigrationTablet: executor to adopt vreplication started in another tablet, updating to its own tablet Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/onlineddl/executor.go | 15 ++++++++++++++- go/vt/vttablet/onlineddl/schema.go | 5 +++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 1ac28920761..e17a7164cc5 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -2266,7 +2266,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i // a vreplicaiton migration started by another tablet. e.ownedRunningMigrations.Store(uuid, true) _ = e.updateMigrationTimestamp(ctx, "liveness_timestamp", uuid) - + _ = e.updateMigrationTablet(ctx, uuid) _ = e.updateRowsCopied(ctx, uuid, s.rowsCopied) _ = e.updateMigrationProgressByRowsCopied(ctx, uuid, s.rowsCopied) _ = e.updateMigrationETASecondsByProgress(ctx, uuid) @@ -2599,6 +2599,19 @@ func (e *Executor) clearArtifacts(ctx context.Context, uuid string) error { return err } +// updateMigrationTablet sets 'tablet' column to be this executor's tablet alias for given migration +func (e *Executor) updateMigrationTablet(ctx context.Context, uuid string) error { + query, err := sqlparser.ParseAndBind(sqlUpdateTablet, + sqltypes.StringBindVariable(e.TabletAliasString()), + sqltypes.StringBindVariable(uuid), + ) + if err != nil { + return err + } + _, err = e.execQuery(ctx, query) + return err +} + // updateTabletFailure marks a given migration as "tablet_failed" func (e *Executor) updateTabletFailure(ctx context.Context, uuid string) error { parsed := sqlparser.BuildParsedQuery(sqlUpdateTabletFailure, diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index 64473525108..dfeb836eab4 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -141,6 +141,11 @@ const ( WHERE migration_uuid=%a ` + sqlUpdateTablet = `UPDATE _vt.schema_migrations + SET tablet=%a + WHERE + migration_uuid=%a + ` sqlUpdateTabletFailure = `UPDATE _vt.schema_migrations SET tablet_failure=1 WHERE From a0f005a0ded7ba58a231648db8346878df8ef387 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 11 Aug 2021 09:02:59 +0300 Subject: [PATCH 6/8] WIP: Adding endtoend test to validate vrepl migrations survives PRS Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../onlineddl/vrepl/onlineddl_vrepl_test.go | 111 +++++++++++++++--- go/test/endtoend/onlineddl/vttablet_util.go | 66 +++++++++++ 2 files changed, 160 insertions(+), 17 deletions(-) create mode 100644 go/test/endtoend/onlineddl/vttablet_util.go diff --git a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go index dd64a83150b..7e955b09c10 100644 --- a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go +++ b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go @@ -241,16 +241,19 @@ func TestSchemaChange(t *testing.T) { shards = clusterInstance.Keyspaces[0].Shards assert.Equal(t, 2, len(shards)) + for _, shard := range shards { + assert.Equal(t, 2, len(shard.Vttablets)) + } testWithInitialSchema(t) t.Run("alter non_online", func(t *testing.T) { - _ = testOnlineDDLStatement(t, alterTableNormalStatement, string(schema.DDLStrategyDirect), "vtctl", "non_online") + _ = testOnlineDDLStatement(t, alterTableNormalStatement, string(schema.DDLStrategyDirect), "vtctl", "non_online", false) insertRows(t, 2) testRows(t) }) t.Run("successful online alter, vtgate", func(t *testing.T) { insertRows(t, 2) - uuid := testOnlineDDLStatement(t, alterTableSuccessfulStatement, "online", "vtgate", "vrepl_col") + uuid := testOnlineDDLStatement(t, alterTableSuccessfulStatement, "online", "vtgate", "vrepl_col", false) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) testRows(t) testMigrationRowCount(t, uuid) @@ -259,7 +262,7 @@ func TestSchemaChange(t *testing.T) { }) t.Run("successful online alter, vtctl", func(t *testing.T) { insertRows(t, 2) - uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "online", "vtctl", "vrepl_col") + uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "online", "vtctl", "vrepl_col", false) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) testRows(t) testMigrationRowCount(t, uuid) @@ -268,12 +271,12 @@ func TestSchemaChange(t *testing.T) { }) t.Run("throttled migration", func(t *testing.T) { insertRows(t, 2) - for i := range clusterInstance.Keyspaces[0].Shards { - throttleApp(clusterInstance.Keyspaces[0].Shards[i].Vttablets[0], throttlerAppName) - defer unthrottleApp(clusterInstance.Keyspaces[0].Shards[i].Vttablets[0], throttlerAppName) + for i := range shards { + throttleApp(shards[i].Vttablets[0], throttlerAppName) + defer unthrottleApp(shards[i].Vttablets[0], throttlerAppName) } - uuid := testOnlineDDLStatement(t, alterTableThrottlingStatement, "online", "vtgate", "vrepl_col") - onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning) + uuid := testOnlineDDLStatement(t, alterTableThrottlingStatement, "online", "vtgate", "vrepl_col", true) + _ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 20*time.Second, schema.OnlineDDLStatusRunning) testRows(t) onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, true) time.Sleep(2 * time.Second) @@ -281,7 +284,7 @@ func TestSchemaChange(t *testing.T) { }) t.Run("failed migration", func(t *testing.T) { insertRows(t, 2) - uuid := testOnlineDDLStatement(t, alterTableFailedStatement, "online", "vtgate", "vrepl_col") + uuid := testOnlineDDLStatement(t, alterTableFailedStatement, "online", "vtgate", "vrepl_col", false) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed) testRows(t) onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false) @@ -305,26 +308,26 @@ func TestSchemaChange(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - _ = testOnlineDDLStatement(t, alterTableThrottlingStatement, "online", "vtgate", "vrepl_col") + _ = testOnlineDDLStatement(t, alterTableThrottlingStatement, "online", "vtgate", "vrepl_col", false) }() } wg.Wait() onlineddl.CheckCancelAllMigrations(t, &vtParams, len(shards)*count) }) t.Run("Online DROP, vtctl", func(t *testing.T) { - uuid := testOnlineDDLStatement(t, onlineDDLDropTableStatement, "online", "vtctl", "") + uuid := testOnlineDDLStatement(t, onlineDDLDropTableStatement, "online", "vtctl", "", false) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false) onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false) }) t.Run("Online CREATE, vtctl", func(t *testing.T) { - uuid := testOnlineDDLStatement(t, onlineDDLCreateTableStatement, "online", "vtctl", "online_ddl_create_col") + uuid := testOnlineDDLStatement(t, onlineDDLCreateTableStatement, "online", "vtctl", "online_ddl_create_col", false) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false) onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false) }) t.Run("Online DROP TABLE IF EXISTS, vtgate", func(t *testing.T) { - uuid := testOnlineDDLStatement(t, onlineDDLDropTableIfExistsStatement, "online", "vtgate", "") + uuid := testOnlineDDLStatement(t, onlineDDLDropTableIfExistsStatement, "online", "vtgate", "", false) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false) onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false) @@ -332,7 +335,7 @@ func TestSchemaChange(t *testing.T) { checkTables(t, schema.OnlineDDLToGCUUID(uuid), 1) }) t.Run("Online DROP TABLE IF EXISTS for nonexistent table, vtgate", func(t *testing.T) { - uuid := testOnlineDDLStatement(t, onlineDDLDropTableIfExistsStatement, "online", "vtgate", "") + uuid := testOnlineDDLStatement(t, onlineDDLDropTableIfExistsStatement, "online", "vtgate", "", false) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false) onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false) @@ -340,11 +343,82 @@ func TestSchemaChange(t *testing.T) { checkTables(t, schema.OnlineDDLToGCUUID(uuid), 0) }) t.Run("Online DROP TABLE for nonexistent table, expect error, vtgate", func(t *testing.T) { - uuid := testOnlineDDLStatement(t, onlineDDLDropTableStatement, "online", "vtgate", "") + uuid := testOnlineDDLStatement(t, onlineDDLDropTableStatement, "online", "vtgate", "", false) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed) onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false) onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, true) }) + t.Run("PlannedReparentShard via throttling", func(t *testing.T) { + insertRows(t, 2) + for i := range shards { + _, body, err := throttleApp(shards[i].Vttablets[0], throttlerAppName) + assert.NoError(t, err) + assert.Contains(t, body, throttlerAppName) + + defer unthrottleApp(shards[i].Vttablets[0], throttlerAppName) + } + uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "online", "vtgate", "vrepl_col", true) + time.Sleep(2 * time.Second) + t.Run("verify running status", func(t *testing.T) { + _ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 20*time.Second, schema.OnlineDDLStatusRunning) + vreplStatus := onlineddl.WaitForVReplicationStatus(t, &vtParams, shards, uuid, 20*time.Second, "Copying") + require.Equal(t, "Copying", vreplStatus) + testRows(t) + }) + + t.Run("Check tablet", func(t *testing.T) { + rs := onlineddl.ReadMigrations(t, &vtParams, uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + shard := row["shard"].ToString() + tablet := row["tablet"].ToString() + + switch shard { + case "-80": + require.Equal(t, shards[0].Vttablets[0].Alias, tablet) + case "80-": + require.Equal(t, shards[1].Vttablets[0].Alias, tablet) + default: + require.NoError(t, fmt.Errorf("unexpected shard name: %s", shard)) + } + } + }) + t.Run("PRS", func(t *testing.T) { + // migration has started and is throttled. We now run PRS + err := clusterInstance.VtctlclientProcess.ExecuteCommand("PlannedReparentShard", "-keyspace_shard", keyspaceName+"/-80", "-new_master", shards[0].Vttablets[1].Alias) + require.NoError(t, err, "failed PRS: %v", err) + }) + + t.Run("unthrottle and wait for completion", func(t *testing.T) { + // unthrottle. The shard with new primary is unaffected by throttling, but the untouched shard us. We wish to let it proceeed. + for i := range shards { + _, body, err := unthrottleApp(shards[i].Vttablets[0], throttlerAppName) + assert.NoError(t, err) + assert.Contains(t, body, throttlerAppName) + } + _ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 130*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + }) + t.Run("Check tablet post PRS", func(t *testing.T) { + rs := onlineddl.ReadMigrations(t, &vtParams, uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + shard := row["shard"].ToString() + tablet := row["tablet"].ToString() + + switch shard { + case "-80": + require.Equal(t, shards[0].Vttablets[1].Alias, tablet) + case "80-": + require.Equal(t, shards[1].Vttablets[0].Alias, tablet) + default: + require.NoError(t, fmt.Errorf("unexpected shard name: %s", shard)) + } + + } + }) + + }) } func insertRow(t *testing.T) { @@ -406,7 +480,7 @@ func testWithInitialSchema(t *testing.T) { } // testOnlineDDLStatement runs an online DDL, ALTER statement -func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy string, executeStrategy string, expectHint string) (uuid string) { +func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy string, executeStrategy string, expectHint string, skipWait bool) (uuid string) { tableName := fmt.Sprintf("vt_onlineddl_test_%02d", 3) sqlQuery := fmt.Sprintf(alterStatement, tableName) if executeStrategy == "vtgate" { @@ -426,7 +500,10 @@ func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy str strategySetting, err := schema.ParseDDLStrategy(ddlStrategy) assert.NoError(t, err) - if !strategySetting.Strategy.IsDirect() { + if strategySetting.Strategy.IsDirect() { + skipWait = true + } + if !skipWait { status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) } diff --git a/go/test/endtoend/onlineddl/vttablet_util.go b/go/test/endtoend/onlineddl/vttablet_util.go new file mode 100644 index 00000000000..ec0a1e9d8d1 --- /dev/null +++ b/go/test/endtoend/onlineddl/vttablet_util.go @@ -0,0 +1,66 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package onlineddl + +import ( + "testing" + "time" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/sqlparser" + + "vitess.io/vitess/go/test/endtoend/cluster" + + "github.com/stretchr/testify/require" +) + +// WaitForVReplicationStatus waits for a vreplication stream to be in one of given states, or timeout +func WaitForVReplicationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...string) (status string) { + + query, err := sqlparser.ParseAndBind("select workflow, state from _vt.vreplication where workflow=%a", + sqltypes.StringBindVariable(uuid), + ) + require.NoError(t, err) + + statusesMap := map[string]bool{} + for _, status := range expectStatuses { + statusesMap[status] = true + } + startTime := time.Now() + lastKnownStatus := "" + for time.Since(startTime) < timeout { + countMatchedShards := 0 + + for _, shard := range shards { + r, err := shard.Vttablets[0].VttabletProcess.QueryTablet(query, "", false) + require.NoError(t, err) + + for _, row := range r.Named().Rows { + lastKnownStatus = row["state"].ToString() + if row["workflow"].ToString() == uuid && statusesMap[lastKnownStatus] { + countMatchedShards++ + } + } + } + if countMatchedShards == len(shards) { + return lastKnownStatus + } + time.Sleep(1 * time.Second) + } + return lastKnownStatus +} From b7709e8af4ce8a7c4f633a1e67ec019fddde0a7f Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 12 Aug 2021 09:22:11 +0300 Subject: [PATCH 7/8] working endtoend tests with 2*PRS Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../onlineddl/vrepl/onlineddl_vrepl_test.go | 206 ++++++++++++------ 1 file changed, 135 insertions(+), 71 deletions(-) diff --git a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go index 7e955b09c10..f8e2c072451 100644 --- a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go +++ b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go @@ -85,7 +85,7 @@ var ( CREATE TABLE %s ( id bigint NOT NULL, test_val bigint unsigned NOT NULL DEFAULT 0, - online_ddl_create_col INT NOT NULL, + online_ddl_create_col INT NOT NULL DEFAULT 0, PRIMARY KEY (id) ) ENGINE=InnoDB;` onlineDDLDropTableStatement = ` @@ -282,6 +282,29 @@ func TestSchemaChange(t *testing.T) { time.Sleep(2 * time.Second) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed) }) + + t.Run("throttled and unthrottled migration", func(t *testing.T) { + insertRows(t, 2) + for i := range shards { + _, body, err := throttleApp(shards[i].Vttablets[0], throttlerAppName) + assert.NoError(t, err) + assert.Contains(t, body, throttlerAppName) + + defer unthrottleApp(shards[i].Vttablets[0], throttlerAppName) + } + uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "online", "vtgate", "test_val", true) + _ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 20*time.Second, schema.OnlineDDLStatusRunning) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning) + testRows(t) + for i := range shards { + _, body, err := unthrottleApp(shards[i].Vttablets[0], throttlerAppName) + assert.NoError(t, err) + assert.Contains(t, body, throttlerAppName) + } + _ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + }) + t.Run("failed migration", func(t *testing.T) { insertRows(t, 2) uuid := testOnlineDDLStatement(t, alterTableFailedStatement, "online", "vtgate", "vrepl_col", false) @@ -314,6 +337,112 @@ func TestSchemaChange(t *testing.T) { wg.Wait() onlineddl.CheckCancelAllMigrations(t, &vtParams, len(shards)*count) }) + + // reparent shard -80 to replica + // and then reparent it back to original state + // (two pretty much identical tests, the point is to end up with original state) + for currentPrimaryTabletIndex, reparentTabletIndex := range []int{1, 0} { + t.Run(fmt.Sprintf("PlannedReparentShard via throttling %d/2", (currentPrimaryTabletIndex+1)), func(t *testing.T) { + // resetRowCount() + insertRows(t, 2) + for i := range shards { + var body string + var err error + switch i { + case 0: + // this is the shard where we run PRS + _, body, err = throttleApp(shards[i].Vttablets[currentPrimaryTabletIndex], throttlerAppName) + defer unthrottleApp(shards[i].Vttablets[currentPrimaryTabletIndex], throttlerAppName) + case 1: + // no PRS on this shard + _, body, err = throttleApp(shards[i].Vttablets[0], throttlerAppName) + defer unthrottleApp(shards[i].Vttablets[0], throttlerAppName) + } + assert.NoError(t, err) + assert.Contains(t, body, throttlerAppName) + } + uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "online", "vtgate", "test_val", true) + + t.Run("wait for migration and vreplication to run", func(t *testing.T) { + _ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 20*time.Second, schema.OnlineDDLStatusRunning) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning) + time.Sleep(5 * time.Second) // wait for _vt.vreplication to be created + vreplStatus := onlineddl.WaitForVReplicationStatus(t, &vtParams, shards, uuid, 20*time.Second, "Copying") + require.Equal(t, "Copying", vreplStatus) + // again see that we're still 'running' + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning) + testRows(t) + }) + + t.Run("Check tablet", func(t *testing.T) { + // onlineddl.Executor marks this migration with its tablet alias + // reminder that onlineddl.Executor runs on the primary tablet. + rs := onlineddl.ReadMigrations(t, &vtParams, uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + shard := row["shard"].ToString() + tablet := row["tablet"].ToString() + + switch shard { + case "-80": + require.Equal(t, shards[0].Vttablets[currentPrimaryTabletIndex].Alias, tablet) + case "80-": + require.Equal(t, shards[1].Vttablets[0].Alias, tablet) + default: + require.NoError(t, fmt.Errorf("unexpected shard name: %s", shard)) + } + } + }) + t.Run("PRS shard -80", func(t *testing.T) { + // migration has started and is throttled. We now run PRS + err := clusterInstance.VtctlclientProcess.ExecuteCommand("PlannedReparentShard", "-keyspace_shard", keyspaceName+"/-80", "-new_master", shards[0].Vttablets[reparentTabletIndex].Alias) + require.NoError(t, err, "failed PRS: %v", err) + }) + + t.Run("unthrottle and expect completion", func(t *testing.T) { + for i := range shards { + var body string + var err error + switch i { + case 0: + // this is the shard where we run PRS + _, body, err = unthrottleApp(shards[i].Vttablets[currentPrimaryTabletIndex], throttlerAppName) + case 1: + // no PRS on this shard + _, body, err = unthrottleApp(shards[i].Vttablets[0], throttlerAppName) + } + assert.NoError(t, err) + assert.Contains(t, body, throttlerAppName) + } + + _ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + }) + + t.Run("Check tablet post PRS", func(t *testing.T) { + // onlineddl.Executor will find that a vrepl migration started in a different tablet. + // it will own the tablet and will update 'tablet' column in _vt.schema_migrations with its own + // (promoted primary) tablet alias. + rs := onlineddl.ReadMigrations(t, &vtParams, uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + shard := row["shard"].ToString() + tablet := row["tablet"].ToString() + + switch shard { + case "-80": + // PRS for this tablet, we promoted tablet[1] + require.Equal(t, shards[0].Vttablets[reparentTabletIndex].Alias, tablet) + case "80-": + // No PRS for this tablet + require.Equal(t, shards[1].Vttablets[0].Alias, tablet) + default: + require.NoError(t, fmt.Errorf("unexpected shard name: %s", shard)) + } + } + }) + }) + } t.Run("Online DROP, vtctl", func(t *testing.T) { uuid := testOnlineDDLStatement(t, onlineDDLDropTableStatement, "online", "vtctl", "", false) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) @@ -348,76 +477,11 @@ func TestSchemaChange(t *testing.T) { onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false) onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, true) }) - t.Run("PlannedReparentShard via throttling", func(t *testing.T) { - insertRows(t, 2) - for i := range shards { - _, body, err := throttleApp(shards[i].Vttablets[0], throttlerAppName) - assert.NoError(t, err) - assert.Contains(t, body, throttlerAppName) - - defer unthrottleApp(shards[i].Vttablets[0], throttlerAppName) - } - uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "online", "vtgate", "vrepl_col", true) - time.Sleep(2 * time.Second) - t.Run("verify running status", func(t *testing.T) { - _ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 20*time.Second, schema.OnlineDDLStatusRunning) - vreplStatus := onlineddl.WaitForVReplicationStatus(t, &vtParams, shards, uuid, 20*time.Second, "Copying") - require.Equal(t, "Copying", vreplStatus) - testRows(t) - }) - - t.Run("Check tablet", func(t *testing.T) { - rs := onlineddl.ReadMigrations(t, &vtParams, uuid) - require.NotNil(t, rs) - for _, row := range rs.Named().Rows { - shard := row["shard"].ToString() - tablet := row["tablet"].ToString() - - switch shard { - case "-80": - require.Equal(t, shards[0].Vttablets[0].Alias, tablet) - case "80-": - require.Equal(t, shards[1].Vttablets[0].Alias, tablet) - default: - require.NoError(t, fmt.Errorf("unexpected shard name: %s", shard)) - } - } - }) - t.Run("PRS", func(t *testing.T) { - // migration has started and is throttled. We now run PRS - err := clusterInstance.VtctlclientProcess.ExecuteCommand("PlannedReparentShard", "-keyspace_shard", keyspaceName+"/-80", "-new_master", shards[0].Vttablets[1].Alias) - require.NoError(t, err, "failed PRS: %v", err) - }) - - t.Run("unthrottle and wait for completion", func(t *testing.T) { - // unthrottle. The shard with new primary is unaffected by throttling, but the untouched shard us. We wish to let it proceeed. - for i := range shards { - _, body, err := unthrottleApp(shards[i].Vttablets[0], throttlerAppName) - assert.NoError(t, err) - assert.Contains(t, body, throttlerAppName) - } - _ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 130*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) - onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) - }) - t.Run("Check tablet post PRS", func(t *testing.T) { - rs := onlineddl.ReadMigrations(t, &vtParams, uuid) - require.NotNil(t, rs) - for _, row := range rs.Named().Rows { - shard := row["shard"].ToString() - tablet := row["tablet"].ToString() - - switch shard { - case "-80": - require.Equal(t, shards[0].Vttablets[1].Alias, tablet) - case "80-": - require.Equal(t, shards[1].Vttablets[0].Alias, tablet) - default: - require.NoError(t, fmt.Errorf("unexpected shard name: %s", shard)) - } - - } - }) - + t.Run("Online CREATE, vtctl", func(t *testing.T) { + uuid := testOnlineDDLStatement(t, onlineDDLCreateTableStatement, "online", "vtctl", "online_ddl_create_col", false) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false) + onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false) }) } From 5dc7e03c4018335028f844a7dd0b10fbd89dd81a Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 17 Aug 2021 09:25:54 +0300 Subject: [PATCH 8/8] typo Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/onlineddl/executor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index e17a7164cc5..e4a2d566c34 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -1337,7 +1337,7 @@ func (e *Executor) readPendingMigrationsUUIDs(ctx context.Context) (uuids []stri // terminateMigration attempts to interrupt and hard-stop a running migration func (e *Executor) terminateMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) (foundRunning bool, err error) { - // It's possible the killing the migratoin fails for whatever reason, in which case + // It's possible the killing the migration fails for whatever reason, in which case // the logic will retry killing it later on. // Whatever happens in this function, this executor stops owning the given migration. defer e.ownedRunningMigrations.Delete(onlineDDL.UUID)