Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Online DDL: support migration cut-over backoff and forced cut-over #14546

Merged
merged 39 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
57667a7
Online DDL: backoff for vitess cut-over intervals
shlomi-noach Oct 24, 2023
906e7fc
resolve conflict
shlomi-noach Nov 9, 2023
100eb76
refactor reviewVReplRunningMigration()
shlomi-noach Nov 9, 2023
0c92e95
DDL Strategy: support --force-cut-over-after=<duration> flag
shlomi-noach Nov 16, 2023
08c83b3
schema_migrations: support force_cutover column
shlomi-noach Nov 16, 2023
cb4bcf0
Updte ready_to_complete: only update ready_to_complete_timestamp on f…
shlomi-noach Nov 16, 2023
abf999f
detect (heuristic) queries running on a given table, and detect trans…
shlomi-noach Nov 16, 2023
bd2868b
validate --force-cut-over-after
shlomi-noach Nov 16, 2023
c3f3ccc
shouldCutOverAccordingToBackoff(): backoff and force-cutover logic; k…
shlomi-noach Nov 19, 2023
8e7d8ac
sqlparser: support ALTER VITESS_MIGRATION '...' FORCE_CUTOVER
shlomi-noach Nov 19, 2023
c821fc3
Implement ALTER VITESS_MIGRATION '...' FORCE_CUTOVER in query executo…
shlomi-noach Nov 19, 2023
a0d980f
endtoend: test 'ALTER VITESS_MIGRATION ... FORCE_CUTOVER' statement, …
shlomi-noach Nov 19, 2023
26d7809
endtoend: validate 'ALTER VITESS_MIGRATION ... FORCE_CUTOVER' kills t…
shlomi-noach Nov 19, 2023
1c002c5
FORCE_CUTOVER triggers scheduler next checks
shlomi-noach Nov 19, 2023
9a0a82b
proto: ForceCutOverSchemaMigration, ForceCutOverSchemaMigrationReques…
shlomi-noach Nov 19, 2023
9eac122
nuance test changes
shlomi-noach Nov 19, 2023
738cd7b
rename function
shlomi-noach Nov 19, 2023
24da375
vtctl gRPC: implement CleanupSchemaMigration
shlomi-noach Nov 19, 2023
359c6fb
implement vtctldclient force_cutover
shlomi-noach Nov 19, 2023
0589d4a
resolved conflict
shlomi-noach Nov 19, 2023
804b684
support 'ALTER VITESS_MIGRATION FORCE_CUTOVER ALL'
shlomi-noach Nov 19, 2023
769053b
force cutover only possible on MySQL 8.0
shlomi-noach Nov 19, 2023
7995987
resolved conflict
shlomi-noach Nov 26, 2023
7ed323c
resolved conflict
shlomi-noach Nov 27, 2023
5240429
Merge branch 'onlineddl-cutover-backoff' of github.com:planetscale/vi…
shlomi-noach Nov 27, 2023
baae9f2
punctuation
shlomi-noach Nov 30, 2023
c787863
validate: --force-cut-over-after is only valid in 'vitess' strategy.
shlomi-noach Nov 30, 2023
71fb82b
simplify test expectations
shlomi-noach Nov 30, 2023
7b7413d
-1 for maxrows
shlomi-noach Nov 30, 2023
7192e96
rename as killTableLockHoldersAndAccessors, add comments
shlomi-noach Nov 30, 2023
92aeac1
endtoend: use context with timeout
shlomi-noach Nov 30, 2023
478fcad
added unit test
shlomi-noach Nov 30, 2023
9c2929b
OnlineDDL command named force-cutover
shlomi-noach Nov 30, 2023
10868b9
Update go/cmd/vtctldclient/command/onlineddl.go
shlomi-noach Nov 30, 2023
2fd3850
span name
shlomi-noach Dec 4, 2023
848cafb
include UUID in log
shlomi-noach Dec 4, 2023
849d8ae
reorder proto alphabetically
shlomi-noach Dec 4, 2023
839f772
resolved conflict
shlomi-noach Dec 5, 2023
892fd23
resolved conflict
shlomi-noach Dec 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions go/cmd/vtctldclient/command/onlineddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ var (
Args: cobra.ExactArgs(2),
RunE: commandOnlineDDLUnthrottle,
}
OnlineDDLForceCutOver = &cobra.Command{
Use: "force-cutover <keyspace> <uuid|all>",
Short: "Mark a given schema migration, or all pending migrations, for forced cut over.",
Example: "OnlineDDL force-cutover test_keyspace 82fa54ac_e83e_11ea_96b7_f875a4d24e90",
DisableFlagsInUseLine: true,
Args: cobra.ExactArgs(2),
RunE: commandOnlineDDLForceCutOver,
}
OnlineDDLShow = &cobra.Command{
Use: "show",
Short: "Display information about online DDL operations.",
Expand Down Expand Up @@ -184,6 +192,30 @@ func commandOnlineDDLCleanup(cmd *cobra.Command, args []string) error {
return nil
}

func commandOnlineDDLForceCutOver(cmd *cobra.Command, args []string) error {
keyspace, uuid, err := analyzeOnlineDDLCommandWithUuidOrAllArgument(cmd)
if err != nil {
return err
}
cli.FinishedParsing(cmd)

resp, err := client.ForceCutOverSchemaMigration(commandCtx, &vtctldatapb.ForceCutOverSchemaMigrationRequest{
Keyspace: keyspace,
Uuid: uuid,
})
if err != nil {
return err
}

data, err := cli.MarshalJSON(resp)
if err != nil {
return err
}

fmt.Printf("%s\n", data)
mattlord marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

func commandOnlineDDLComplete(cmd *cobra.Command, args []string) error {
keyspace, uuid, err := analyzeOnlineDDLCommandWithUuidOrAllArgument(cmd)
if err != nil {
Expand Down Expand Up @@ -393,6 +425,7 @@ func init() {
OnlineDDL.AddCommand(OnlineDDLRetry)
OnlineDDL.AddCommand(OnlineDDLThrottle)
OnlineDDL.AddCommand(OnlineDDLUnthrottle)
OnlineDDL.AddCommand(OnlineDDLForceCutOver)

OnlineDDLShow.Flags().BoolVar(&onlineDDLShowArgs.JSON, "json", false, "Output JSON instead of human-readable table.")
OnlineDDLShow.Flags().StringVar(&onlineDDLShowArgs.OrderStr, "order", "asc", "Sort the results by `id` property of the Schema migration.")
Expand Down
1 change: 1 addition & 0 deletions go/mysql/flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const (
DynamicRedoLogCapacityFlavorCapability // supported in MySQL 8.0.30 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-30.html
DisableRedoLogFlavorCapability // supported in MySQL 8.0.21 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-21.html
CheckConstraintsCapability // supported in MySQL 8.0.16 and above: https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-16.html
PerformanceSchemaDataLocksTableCapability
)

const (
Expand Down
2 changes: 2 additions & 0 deletions go/mysql/flavor_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,8 @@ func (mysqlFlavor80) supportsCapability(serverVersion string, capability FlavorC
return ServerVersionAtLeast(serverVersion, 8, 0, 21)
case CheckConstraintsCapability:
return ServerVersionAtLeast(serverVersion, 8, 0, 16)
case PerformanceSchemaDataLocksTableCapability:
return true, nil
mattlord marked this conversation as resolved.
Show resolved Hide resolved
default:
return false, nil
}
Expand Down
10 changes: 10 additions & 0 deletions go/mysql/flavor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,16 @@ func TestGetFlavor(t *testing.T) {
capability: CheckConstraintsCapability,
isCapable: true,
},
{
version: "5.7.38",
capability: PerformanceSchemaDataLocksTableCapability,
isCapable: false,
},
{
version: "8.0.20",
capability: PerformanceSchemaDataLocksTableCapability,
isCapable: true,
},
}
for _, tc := range testcases {
name := fmt.Sprintf("%s %v", tc.version, tc.capability)
Expand Down
21 changes: 11 additions & 10 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,11 +449,7 @@ func (vttablet *VttabletProcess) CreateDB(keyspace string) error {

// QueryTablet lets you execute a query in this tablet and get the result
func (vttablet *VttabletProcess) QueryTablet(query string, keyspace string, useDb bool) (*sqltypes.Result, error) {
if !useDb {
keyspace = ""
}
dbParams := NewConnParams(vttablet.DbPort, vttablet.DbPassword, path.Join(vttablet.Directory, "mysql.sock"), keyspace)
conn, err := vttablet.conn(&dbParams)
conn, err := vttablet.TabletConn(keyspace, useDb)
if err != nil {
return nil, err
}
Expand All @@ -464,11 +460,7 @@ func (vttablet *VttabletProcess) QueryTablet(query string, keyspace string, useD
// QueryTabletMultiple lets you execute multiple queries -- without any
// results -- against the tablet.
func (vttablet *VttabletProcess) QueryTabletMultiple(queries []string, keyspace string, useDb bool) error {
if !useDb {
keyspace = ""
}
dbParams := NewConnParams(vttablet.DbPort, vttablet.DbPassword, path.Join(vttablet.Directory, "mysql.sock"), keyspace)
conn, err := vttablet.conn(&dbParams)
conn, err := vttablet.TabletConn(keyspace, useDb)
if err != nil {
return err
}
Expand All @@ -484,6 +476,15 @@ func (vttablet *VttabletProcess) QueryTabletMultiple(queries []string, keyspace
return nil
}

// TabletConn opens a MySQL connection on this tablet
func (vttablet *VttabletProcess) TabletConn(keyspace string, useDb bool) (*mysql.Conn, error) {
if !useDb {
keyspace = ""
}
dbParams := NewConnParams(vttablet.DbPort, vttablet.DbPassword, path.Join(vttablet.Directory, "mysql.sock"), keyspace)
return vttablet.conn(&dbParams)
}

func (vttablet *VttabletProcess) defaultConn(dbname string) (*mysql.Conn, error) {
dbParams := mysql.ConnParams{
Uname: "vt_dba",
Expand Down
157 changes: 157 additions & 0 deletions go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,29 @@ func waitForReadyToComplete(t *testing.T, uuid string, expected bool) {
}
}

func waitForMessage(t *testing.T, uuid string, messageSubstring string) {
ctx, cancel := context.WithTimeout(context.Background(), normalWaitTime)
defer cancel()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
message := row.AsString("message", "")
if strings.Contains(message, messageSubstring) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to make it case insensitive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why we should? If the test knows what to expect, then it should expect the exact string.

return
}
}
select {
case <-ticker.C:
case <-ctx.Done():
}
require.NoError(t, ctx.Err())
}
}

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()
Expand Down Expand Up @@ -366,6 +389,9 @@ func testScheduler(t *testing.T) {
alterNonexistent = `
ALTER TABLE nonexistent FORCE
`
populateT1Statement = `
insert into t1_test values (1, 'new_row')
`
)

testReadTimestamp := func(t *testing.T, uuid string, timestampColumn string) (timestamp string) {
Expand Down Expand Up @@ -490,6 +516,109 @@ func testScheduler(t *testing.T) {
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
})
})

t.Run("Postpone completion ALTER", func(t *testing.T) {
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" --postpone-completion", "vtgate", "", "", true)) // skip wait

t.Run("wait for t1 running", func(t *testing.T) {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning)
mattlord marked this conversation as resolved.
Show resolved Hide resolved
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
})
t.Run("check postpone_completion", func(t *testing.T) {
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
postponeCompletion := row.AsInt64("postpone_completion", 0)
assert.Equal(t, int64(1), postponeCompletion)
}
})
t.Run("complete", func(t *testing.T) {
onlineddl.CheckCompleteMigration(t, &vtParams, shards, t1uuid, true)
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
})
t.Run("check no postpone_completion", func(t *testing.T) {
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
postponeCompletion := row.AsInt64("postpone_completion", 0)
assert.Equal(t, int64(0), postponeCompletion)
}
})
})

forceCutoverCapable, err := capableOf(mysql.PerformanceSchemaDataLocksTableCapability) // 8.0
require.NoError(t, err)
if forceCutoverCapable {
t.Run("force_cutover", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), extendedWaitTime*2)
defer cancel()

t.Run("populate t1_test", func(t *testing.T) {
onlineddl.VtgateExecQuery(t, &vtParams, populateT1Statement, "")
})
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" --postpone-completion", "vtgate", "", "", true)) // skip wait

t.Run("wait for t1 running", func(t *testing.T) {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
})
commitTransactionChan := make(chan any)
transactionErrorChan := make(chan error)
t.Run("locking table rows", func(t *testing.T) {
go runInTransaction(t, ctx, shards[0].Vttablets[0], "select * from t1_test for update", commitTransactionChan, transactionErrorChan)
})
t.Run("check no force_cutover", func(t *testing.T) {
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
forceCutOver := row.AsInt64("force_cutover", 0)
assert.Equal(t, int64(0), forceCutOver) // disabled
}
})
t.Run("attempt to complete", func(t *testing.T) {
onlineddl.CheckCompleteMigration(t, &vtParams, shards, t1uuid, true)
})
t.Run("cut-over fail due to timeout", func(t *testing.T) {
waitForMessage(t, t1uuid, "due to context deadline exceeded")
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusRunning)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
})
t.Run("force_cutover", func(t *testing.T) {
onlineddl.CheckForceMigrationCutOver(t, &vtParams, shards, t1uuid, true)
})
t.Run("check force_cutover", func(t *testing.T) {
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
forceCutOver := row.AsInt64("force_cutover", 0)
assert.Equal(t, int64(1), forceCutOver) // enabled
}
})
t.Run("expect completion", func(t *testing.T) {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
})
t.Run("expect transaction failure", func(t *testing.T) {
select {
case commitTransactionChan <- true: //good
case <-ctx.Done():
assert.Fail(t, ctx.Err().Error())
}
// Transaction will now attempt to commit. But we expect our "force_cutover" to have terminated
// the transaction's connection.
select {
case err := <-transactionErrorChan:
assert.ErrorContains(t, err, "broken pipe")
case <-ctx.Done():
assert.Fail(t, ctx.Err().Error())
}
})
})
}
t.Run("ALTER both tables non-concurrent", func(t *testing.T) {
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy, "vtgate", "", "", true)) // skip wait
t2uuid = testOnlineDDLStatement(t, createParams(trivialAlterT2Statement, ddlStrategy, "vtgate", "", "", true)) // skip wait
Expand Down Expand Up @@ -2400,3 +2529,31 @@ func getCreateTableStatement(t *testing.T, tablet *cluster.Vttablet, tableName s
statement = queryResult.Rows[0][1].ToString()
return statement
}

func runInTransaction(t *testing.T, ctx context.Context, tablet *cluster.Vttablet, query string, commitTransactionChan chan any, transactionErrorChan chan error) error {
conn, err := tablet.VttabletProcess.TabletConn(keyspaceName, true)
require.NoError(t, err)
defer conn.Close()

_, err = conn.ExecuteFetch("begin", 0, false)
require.NoError(t, err)

_, err = conn.ExecuteFetch(query, 10000, false)
require.NoError(t, err)

if commitTransactionChan != nil {
// Wait for instruction to commit
select {
case <-commitTransactionChan:
// good
case <-ctx.Done():
assert.Fail(t, ctx.Err().Error())
}
}

_, err = conn.ExecuteFetch("commit", 0, false)
if transactionErrorChan != nil {
transactionErrorChan <- err
}
return err
}
15 changes: 15 additions & 0 deletions go/test/endtoend/onlineddl/vtgate_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,21 @@ func CheckLaunchAllMigrations(t *testing.T, vtParams *mysql.ConnParams, expectCo
}
}

// CheckForceMigrationCutOver marks a migration for forced cut-over, and expects success by counting affected rows.
func CheckForceMigrationCutOver(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, expectPossible bool) {
query, err := sqlparser.ParseAndBind("alter vitess_migration %a force_cutover",
sqltypes.StringBindVariable(uuid),
)
require.NoError(t, err)
r := VtgateExecQuery(t, vtParams, query, "")

if expectPossible {
assert.Equal(t, len(shards), int(r.RowsAffected))
} else {
assert.Equal(t, int(0), int(r.RowsAffected))
}
}

// CheckMigrationStatus verifies that the migration indicated by given UUID has the given expected status
func CheckMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, expectStatuses ...schema.OnlineDDLStatus) bool {
query, err := sqlparser.ParseAndBind("show vitess_migrations like %a",
Expand Down
Loading
Loading