Skip to content

Commit

Permalink
OnlineDDL: avoid schema_migrations AUTO_INCREMENT gaps by pre-checkin…
Browse files Browse the repository at this point in the history
…g for existing migration (#12169)

* OnlineDDL: avoid schema_migrations AUTO_INCREMENT gaps by pre-checking for existing migration

Signed-off-by: Shlomi Noach <[email protected]>

* another refactor: extracted submitCallbackIfNonConflicting(), and the logic applies both to migration submission as well as RetryMigration()

Signed-off-by: Shlomi Noach <[email protected]>

* ValidateSequentialMigrationIDs

Signed-off-by: Shlomi Noach <[email protected]>

* refactor test location

Signed-off-by: Shlomi Noach <[email protected]>

---------

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Jan 31, 2023
1 parent c28b333 commit 3f55e40
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ func TestSchemaChange(t *testing.T) {
t.Run("singleton", testSingleton)
t.Run("declarative", testDeclarative)
t.Run("foreign-keys", testForeignKeys)
t.Run("summary: validate sequential migration IDs", func(t *testing.T) {
onlineddl.ValidateSequentialMigrationIDs(t, &vtParams, shards)
})
}

func testScheduler(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,9 @@ func TestSchemaChange(t *testing.T) {
}
})
})
t.Run("summary: validate sequential migration IDs", func(t *testing.T) {
onlineddl.ValidateSequentialMigrationIDs(t, &vtParams, shards)
})
}

func insertRow(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,10 @@ func TestSchemaChange(t *testing.T) {
})
})
}

t.Run("summary: validate sequential migration IDs", func(t *testing.T) {
onlineddl.ValidateSequentialMigrationIDs(t, &vtParams, shards)
})
}

func testWithInitialSchema(t *testing.T) {
Expand Down
37 changes: 37 additions & 0 deletions go/test/endtoend/onlineddl/vtgate_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,3 +344,40 @@ func WaitForThrottledTimestamp(t *testing.T, vtParams *mysql.ConnParams, uuid st
t.Error("timeout waiting for last_throttled_timestamp to have nonempty value")
return
}

// ValidateSequentialMigrationIDs validates that schem_migrations.id column, which is an AUTO_INCREMENT, does
// not have gaps
func ValidateSequentialMigrationIDs(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard) {
r := VtgateExecQuery(t, vtParams, "show vitess_migrations", "")
shardMin := map[string]uint64{}
shardMax := map[string]uint64{}
shardCount := map[string]uint64{}

for _, row := range r.Named().Rows {
id := row.AsUint64("id", 0)
require.NotZero(t, id)

shard := row.AsString("shard", "")
require.NotEmpty(t, shard)

if _, ok := shardMin[shard]; !ok {
shardMin[shard] = id
shardMax[shard] = id
}
if id < shardMin[shard] {
shardMin[shard] = id
}
if id > shardMax[shard] {
shardMax[shard] = id
}
shardCount[shard]++
}
require.NotEmpty(t, shards)
assert.Equal(t, len(shards), len(shardMin))
assert.Equal(t, len(shards), len(shardMax))
assert.Equal(t, len(shards), len(shardCount))
for shard, count := range shardCount {
assert.NotZero(t, count)
assert.Equalf(t, count, shardMax[shard]-shardMin[shard]+1, "mismatch: shared=%v, count=%v, min=%v, max=%v", shard, count, shardMin[shard], shardMax[shard])
}
}
165 changes: 94 additions & 71 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ type Executor struct {

initMutex sync.Mutex
migrationMutex sync.Mutex
submitMutex sync.Mutex // used when submitting migrations
// ownedRunningMigrations lists UUIDs owned by this executor (consider this a map[string]bool)
// A UUID listed in this map stands for a migration that is executing, and that this executor can control.
// Migrations found to be running which are not listed in this map will either:
Expand Down Expand Up @@ -4606,14 +4607,62 @@ func (e *Executor) submittedMigrationConflictsWithPendingMigrationInSingletonCon
return true
}

// submitCallbackIfNonConflicting is called internally by SubmitMigration, and is given a callack to execute
// if the given migration does not conflict any terms. Specifically, this function looks for singleton or
// singleton-context conflicts.
// The call back can be an insertion of a new migration, or a retry of an existing migration, or whatnot.
func (e *Executor) submitCallbackIfNonConflicting(
ctx context.Context,
onlineDDL *schema.OnlineDDL,
callback func() (*sqltypes.Result, error),
) (
result *sqltypes.Result, err error,
) {
if !onlineDDL.StrategySetting().IsSingleton() && !onlineDDL.StrategySetting().IsSingletonContext() {
// not a singleton. No conflict
return callback()
}
// This is either singleton or singleton-context

e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()

pendingUUIDs, err := e.readPendingMigrationsUUIDs(ctx)
if err != nil {
return nil, err
}
switch {
case onlineDDL.StrategySetting().IsSingleton():
// We will reject this migration if there's any pending migration
if len(pendingUUIDs) > 0 {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "singleton migration rejected: found pending migrations [%s]", strings.Join(pendingUUIDs, ", "))
}
case onlineDDL.StrategySetting().IsSingletonContext():
// We will reject this migration if there's any pending migration within a different context
for _, pendingUUID := range pendingUUIDs {
pendingOnlineDDL, _, err := e.readMigration(ctx, pendingUUID)
if err != nil {
return nil, vterrors.Wrapf(err, "validateSingleton() migration: %s", pendingUUID)
}
if e.submittedMigrationConflictsWithPendingMigrationInSingletonContext(ctx, onlineDDL, pendingOnlineDDL) {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "singleton-context migration rejected: found pending migration: %s in different context: %s", pendingUUID, pendingOnlineDDL.MigrationContext)
}
// no conflict? continue looking for other pending migrations
}
}
// OK to go! We can go
return callback()
}

// SubmitMigration inserts a new migration request
func (e *Executor) SubmitMigration(
ctx context.Context,
stmt sqlparser.Statement,
) (result *sqltypes.Result, err error) {
) (*sqltypes.Result, error) {
if !e.isOpen {
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "online ddl is disabled")
}

log.Infof("SubmitMigration: request to submit migration with statement: %0.50s...", sqlparser.CanonicalString(stmt))
if ddlStmt, ok := stmt.(sqlparser.DDLStatement); ok {
// This validation should have taken place on submission. However, the query may have mutated
Expand All @@ -4627,6 +4676,43 @@ func (e *Executor) SubmitMigration(
if err != nil {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Error submitting migration %s: %v", sqlparser.String(stmt), err)
}

if err := e.initSchema(ctx); err != nil {
log.Error(err)
return nil, err
}

// The logic below has multiple steps. We hence protect the rest of the code with a mutex, only used by this function.
e.submitMutex.Lock()
defer e.submitMutex.Unlock()

// Is there already a migration by this same UUID?
storedMigration, _, err := e.readMigration(ctx, onlineDDL.UUID)
if err != nil && err != ErrMigrationNotFound {
return nil, vterrors.Wrapf(err, "while checking whether migration %s exists", onlineDDL.UUID)
}
if storedMigration != nil {
log.Infof("SubmitMigration: migration %s already exists with migration_context=%s, table=%s", onlineDDL.UUID, storedMigration.MigrationContext, onlineDDL.Table)
// A migration already exists with the same UUID. This is fine, we allow re-submitting migrations
// with the same UUID, as we provide idempotency.
// So we will _mostly_ ignore the request: we will not submit a new migration. However, we will do
// these things:

// 1. Check that the requested submmited migration macthes the existing one's migration-context, otherwise
// this doesn't seem right, not the idempotency we were looking for
if storedMigration.MigrationContext != onlineDDL.MigrationContext {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "migration rejected: found migration %s with different context: %s than submmitted migration's context: %s", onlineDDL.UUID, storedMigration.MigrationContext, onlineDDL.MigrationContext)
}
// 2. Possibly, the existing migration is in 'failed' or 'cancelled' state, in which case this
// resubmission should retry the migration.
return e.submitCallbackIfNonConflicting(
ctx, onlineDDL,
func() (*sqltypes.Result, error) { return e.RetryMigration(ctx, onlineDDL.UUID) },
)
}

// OK, this is a new UUID

_, actionStr, err := onlineDDL.GetActionStr()
if err != nil {
return nil, err
Expand All @@ -4636,7 +4722,7 @@ func (e *Executor) SubmitMigration(
revertedUUID, _ := onlineDDL.GetRevertUUID() // Empty value if the migration is not actually a REVERT. Safe to ignore error.
retainArtifactsSeconds := int64((retainOnlineDDLTables).Seconds())
_, allowConcurrentMigration := e.allowConcurrentMigration(onlineDDL)
query, err := sqlparser.ParseAndBind(sqlInsertMigration,
submitQuery, err := sqlparser.ParseAndBind(sqlInsertMigration,
sqltypes.StringBindVariable(onlineDDL.UUID),
sqltypes.StringBindVariable(e.keyspace),
sqltypes.StringBindVariable(e.shard),
Expand All @@ -4659,80 +4745,17 @@ func (e *Executor) SubmitMigration(
if err != nil {
return nil, err
}

if err := e.initSchema(ctx); err != nil {
log.Error(err)
return nil, err
}

if onlineDDL.StrategySetting().IsSingleton() || onlineDDL.StrategySetting().IsSingletonContext() {
// we need two wrap some logic within a mutex, so as to make sure we can't submit the migration whilst
// another query submits a conflicting migration
validateSingleton := func() error {
// We wrap everything in a function because we want the mutex released. We will need to reaquire it later on
// for other bookkeeping work.
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()

pendingUUIDs, err := e.readPendingMigrationsUUIDs(ctx)
if err != nil {
return err
}
switch {
case onlineDDL.StrategySetting().IsSingleton():
// We will reject this migration if there's any pending migration
if len(pendingUUIDs) > 0 {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "singleton migration rejected: found pending migrations [%s]", strings.Join(pendingUUIDs, ", "))
}
case onlineDDL.StrategySetting().IsSingletonContext():
// We will reject this migration if there's any pending migration within a different context
for _, pendingUUID := range pendingUUIDs {
pendingOnlineDDL, _, err := e.readMigration(ctx, pendingUUID)
if err != nil {
return vterrors.Wrapf(err, "validateSingleton() migration: %s", pendingUUID)
}
if e.submittedMigrationConflictsWithPendingMigrationInSingletonContext(ctx, onlineDDL, pendingOnlineDDL) {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "singleton-context migration rejected: found pending migration: %s in different context: %s", pendingUUID, pendingOnlineDDL.MigrationContext)
}
// no conflict? continue looking for other pending migrations
}
}
// OK to go! We can submit the migration:
result, err = e.execQuery(ctx, query)
return err
}
if err := validateSingleton(); err != nil {
return nil, vterrors.Wrapf(err, "SubmitMigration %v", onlineDDL.UUID)
}
// mutex aquired and released within validateSingleton(). We are now mutex free
} else {
// not a singleton. We submit the migration:
result, err = e.execQuery(ctx, query)
if err != nil {
return nil, err
}
result, err := e.submitCallbackIfNonConflicting(
ctx, onlineDDL,
func() (*sqltypes.Result, error) { return e.execQuery(ctx, submitQuery) },
)
if err != nil {
return nil, vterrors.Wrapf(err, "submitting migration %v", onlineDDL.UUID)
}
log.Infof("SubmitMigration: migration %s submitted", onlineDDL.UUID)

defer e.triggerNextCheckInterval()

// The query was a INSERT IGNORE because we allow a recurring submission of same migration.
// However, let's validate that the duplication (identified via UUID) was intentional.
storedMigration, _, err := e.readMigration(ctx, onlineDDL.UUID)
if err != nil {
return nil, vterrors.Wrapf(err, "unexpected error reading written migration %v", onlineDDL.UUID)
}
if storedMigration.MigrationContext != onlineDDL.MigrationContext {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "migration rejected: found migration %s with different context: %s than submmitted migration's context: %s", onlineDDL.UUID, storedMigration.MigrationContext, onlineDDL.MigrationContext)
}

// Finally, possibly this migration already existed, and this is a resubmission of same UUID.
// possibly, the existing migration is in 'failed' or 'cancelled' state, in which case this
// resubmission should retry the migration.
if _, err := e.RetryMigration(ctx, onlineDDL.UUID); err != nil {
return result, err
}

return result, nil
}

Expand Down

0 comments on commit 3f55e40

Please sign in to comment.