Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OnlineDDL: avoid schema_migrations AUTO_INCREMENT gaps by pre-checking for existing migration #12169

Merged
merged 5 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ func TestSchemaChange(t *testing.T) {
t.Run("singleton", testSingleton)
t.Run("declarative", testDeclarative)
t.Run("foreign-keys", testForeignKeys)
t.Run("summary: validate sequential migration IDs", func(t *testing.T) {
onlineddl.ValidateSequentialMigrationIDs(t, &vtParams, shards)
})
}

func testScheduler(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,9 @@ func TestSchemaChange(t *testing.T) {
}
})
})
t.Run("summary: validate sequential migration IDs", func(t *testing.T) {
onlineddl.ValidateSequentialMigrationIDs(t, &vtParams, shards)
})
}

func insertRow(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,10 @@ func TestSchemaChange(t *testing.T) {
})
})
}

t.Run("summary: validate sequential migration IDs", func(t *testing.T) {
onlineddl.ValidateSequentialMigrationIDs(t, &vtParams, shards)
})
}

func testWithInitialSchema(t *testing.T) {
Expand Down
37 changes: 37 additions & 0 deletions go/test/endtoend/onlineddl/vtgate_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,3 +344,40 @@ func WaitForThrottledTimestamp(t *testing.T, vtParams *mysql.ConnParams, uuid st
t.Error("timeout waiting for last_throttled_timestamp to have nonempty value")
return
}

// ValidateSequentialMigrationIDs validates that schem_migrations.id column, which is an AUTO_INCREMENT, does
// not have gaps
func ValidateSequentialMigrationIDs(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard) {
r := VtgateExecQuery(t, vtParams, "show vitess_migrations", "")
shardMin := map[string]uint64{}
shardMax := map[string]uint64{}
shardCount := map[string]uint64{}

for _, row := range r.Named().Rows {
id := row.AsUint64("id", 0)
require.NotZero(t, id)

shard := row.AsString("shard", "")
require.NotEmpty(t, shard)

if _, ok := shardMin[shard]; !ok {
shardMin[shard] = id
shardMax[shard] = id
}
if id < shardMin[shard] {
shardMin[shard] = id
}
if id > shardMax[shard] {
shardMax[shard] = id
}
shardCount[shard]++
}
require.NotEmpty(t, shards)
assert.Equal(t, len(shards), len(shardMin))
assert.Equal(t, len(shards), len(shardMax))
assert.Equal(t, len(shards), len(shardCount))
for shard, count := range shardCount {
assert.NotZero(t, count)
assert.Equalf(t, count, shardMax[shard]-shardMin[shard]+1, "mismatch: shared=%v, count=%v, min=%v, max=%v", shard, count, shardMin[shard], shardMax[shard])
}
}
165 changes: 94 additions & 71 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ type Executor struct {

initMutex sync.Mutex
migrationMutex sync.Mutex
submitMutex sync.Mutex // used when submitting migrations
// ownedRunningMigrations lists UUIDs owned by this executor (consider this a map[string]bool)
// A UUID listed in this map stands for a migration that is executing, and that this executor can control.
// Migrations found to be running which are not listed in this map will either:
Expand Down Expand Up @@ -4586,14 +4587,62 @@ func (e *Executor) submittedMigrationConflictsWithPendingMigrationInSingletonCon
return true
}

// submitCallbackIfNonConflicting is called internally by SubmitMigration, and is given a callack to execute
// if the given migration does not conflict any terms. Specifically, this function looks for singleton or
// singleton-context conflicts.
// The call back can be an insertion of a new migration, or a retry of an existing migration, or whatnot.
func (e *Executor) submitCallbackIfNonConflicting(
ctx context.Context,
onlineDDL *schema.OnlineDDL,
callback func() (*sqltypes.Result, error),
) (
result *sqltypes.Result, err error,
) {
if !onlineDDL.StrategySetting().IsSingleton() && !onlineDDL.StrategySetting().IsSingletonContext() {
// not a singleton. No conflict
return callback()
}
// This is either singleton or singleton-context

e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()

pendingUUIDs, err := e.readPendingMigrationsUUIDs(ctx)
if err != nil {
return nil, err
}
switch {
case onlineDDL.StrategySetting().IsSingleton():
// We will reject this migration if there's any pending migration
if len(pendingUUIDs) > 0 {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "singleton migration rejected: found pending migrations [%s]", strings.Join(pendingUUIDs, ", "))
}
case onlineDDL.StrategySetting().IsSingletonContext():
// We will reject this migration if there's any pending migration within a different context
for _, pendingUUID := range pendingUUIDs {
pendingOnlineDDL, _, err := e.readMigration(ctx, pendingUUID)
if err != nil {
return nil, vterrors.Wrapf(err, "validateSingleton() migration: %s", pendingUUID)
}
if e.submittedMigrationConflictsWithPendingMigrationInSingletonContext(ctx, onlineDDL, pendingOnlineDDL) {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "singleton-context migration rejected: found pending migration: %s in different context: %s", pendingUUID, pendingOnlineDDL.MigrationContext)
}
// no conflict? continue looking for other pending migrations
}
}
// OK to go! We can go
return callback()
}

// SubmitMigration inserts a new migration request
func (e *Executor) SubmitMigration(
ctx context.Context,
stmt sqlparser.Statement,
) (result *sqltypes.Result, err error) {
) (*sqltypes.Result, error) {
if !e.isOpen {
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "online ddl is disabled")
}

log.Infof("SubmitMigration: request to submit migration with statement: %0.50s...", sqlparser.CanonicalString(stmt))
if ddlStmt, ok := stmt.(sqlparser.DDLStatement); ok {
// This validation should have taken place on submission. However, the query may have mutated
Expand All @@ -4607,6 +4656,43 @@ func (e *Executor) SubmitMigration(
if err != nil {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Error submitting migration %s: %v", sqlparser.String(stmt), err)
}

if err := e.initSchema(ctx); err != nil {
log.Error(err)
return nil, err
}

// The logic below has multiple steps. We hence protect the rest of the code with a mutex, only used by this function.
e.submitMutex.Lock()
defer e.submitMutex.Unlock()

// Is there already a migration by this same UUID?
storedMigration, _, err := e.readMigration(ctx, onlineDDL.UUID)
if err != nil && err != ErrMigrationNotFound {
return nil, vterrors.Wrapf(err, "while checking whether migration %s exists", onlineDDL.UUID)
}
if storedMigration != nil {
log.Infof("SubmitMigration: migration %s already exists with migration_context=%s, table=%s", onlineDDL.UUID, storedMigration.MigrationContext, onlineDDL.Table)
// A migration already exists with the same UUID. This is fine, we allow re-submitting migrations
// with the same UUID, as we provide idempotency.
// So we will _mostly_ ignore the request: we will not submit a new migration. However, we will do
// these things:

// 1. Check that the requested submmited migration macthes the existing one's migration-context, otherwise
// this doesn't seem right, not the idempotency we were looking for
if storedMigration.MigrationContext != onlineDDL.MigrationContext {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "migration rejected: found migration %s with different context: %s than submmitted migration's context: %s", onlineDDL.UUID, storedMigration.MigrationContext, onlineDDL.MigrationContext)
}
// 2. Possibly, the existing migration is in 'failed' or 'cancelled' state, in which case this
// resubmission should retry the migration.
return e.submitCallbackIfNonConflicting(
ctx, onlineDDL,
func() (*sqltypes.Result, error) { return e.RetryMigration(ctx, onlineDDL.UUID) },
)
}

// OK, this is a new UUID

_, actionStr, err := onlineDDL.GetActionStr()
if err != nil {
return nil, err
Expand All @@ -4616,7 +4702,7 @@ func (e *Executor) SubmitMigration(
revertedUUID, _ := onlineDDL.GetRevertUUID() // Empty value if the migration is not actually a REVERT. Safe to ignore error.
retainArtifactsSeconds := int64((retainOnlineDDLTables).Seconds())
_, allowConcurrentMigration := e.allowConcurrentMigration(onlineDDL)
query, err := sqlparser.ParseAndBind(sqlInsertMigration,
submitQuery, err := sqlparser.ParseAndBind(sqlInsertMigration,
sqltypes.StringBindVariable(onlineDDL.UUID),
sqltypes.StringBindVariable(e.keyspace),
sqltypes.StringBindVariable(e.shard),
Expand All @@ -4639,80 +4725,17 @@ func (e *Executor) SubmitMigration(
if err != nil {
return nil, err
}

if err := e.initSchema(ctx); err != nil {
log.Error(err)
return nil, err
}

if onlineDDL.StrategySetting().IsSingleton() || onlineDDL.StrategySetting().IsSingletonContext() {
// we need two wrap some logic within a mutex, so as to make sure we can't submit the migration whilst
// another query submits a conflicting migration
validateSingleton := func() error {
// We wrap everything in a function because we want the mutex released. We will need to reaquire it later on
// for other bookkeeping work.
e.migrationMutex.Lock()
defer e.migrationMutex.Unlock()

pendingUUIDs, err := e.readPendingMigrationsUUIDs(ctx)
if err != nil {
return err
}
switch {
case onlineDDL.StrategySetting().IsSingleton():
// We will reject this migration if there's any pending migration
if len(pendingUUIDs) > 0 {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "singleton migration rejected: found pending migrations [%s]", strings.Join(pendingUUIDs, ", "))
}
case onlineDDL.StrategySetting().IsSingletonContext():
// We will reject this migration if there's any pending migration within a different context
for _, pendingUUID := range pendingUUIDs {
pendingOnlineDDL, _, err := e.readMigration(ctx, pendingUUID)
if err != nil {
return vterrors.Wrapf(err, "validateSingleton() migration: %s", pendingUUID)
}
if e.submittedMigrationConflictsWithPendingMigrationInSingletonContext(ctx, onlineDDL, pendingOnlineDDL) {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "singleton-context migration rejected: found pending migration: %s in different context: %s", pendingUUID, pendingOnlineDDL.MigrationContext)
}
// no conflict? continue looking for other pending migrations
}
}
// OK to go! We can submit the migration:
result, err = e.execQuery(ctx, query)
return err
}
if err := validateSingleton(); err != nil {
return nil, vterrors.Wrapf(err, "SubmitMigration %v", onlineDDL.UUID)
}
// mutex aquired and released within validateSingleton(). We are now mutex free
} else {
// not a singleton. We submit the migration:
result, err = e.execQuery(ctx, query)
if err != nil {
return nil, err
}
result, err := e.submitCallbackIfNonConflicting(
ctx, onlineDDL,
func() (*sqltypes.Result, error) { return e.execQuery(ctx, submitQuery) },
)
if err != nil {
return nil, vterrors.Wrapf(err, "submitting migration %v", onlineDDL.UUID)
}
log.Infof("SubmitMigration: migration %s submitted", onlineDDL.UUID)

defer e.triggerNextCheckInterval()

// The query was a INSERT IGNORE because we allow a recurring submission of same migration.
// However, let's validate that the duplication (identified via UUID) was intentional.
storedMigration, _, err := e.readMigration(ctx, onlineDDL.UUID)
if err != nil {
return nil, vterrors.Wrapf(err, "unexpected error reading written migration %v", onlineDDL.UUID)
}
if storedMigration.MigrationContext != onlineDDL.MigrationContext {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "migration rejected: found migration %s with different context: %s than submmitted migration's context: %s", onlineDDL.UUID, storedMigration.MigrationContext, onlineDDL.MigrationContext)
}

// Finally, possibly this migration already existed, and this is a resubmission of same UUID.
// possibly, the existing migration is in 'failed' or 'cancelled' state, in which case this
// resubmission should retry the migration.
if _, err := e.RetryMigration(ctx, onlineDDL.UUID); err != nil {
return result, err
}

return result, nil
}

Expand Down