Skip to content

Commit

Permalink
Merge pull request #9537 from Phanatic/grpc-apply-schema
Browse files Browse the repository at this point in the history
RFC: Add ApplySchema to vtctlservice
  • Loading branch information
deepthi authored Feb 1, 2022
2 parents 67ee4c4 + 93c5d34 commit f906693
Show file tree
Hide file tree
Showing 36 changed files with 4,698 additions and 2,657 deletions.
4 changes: 2 additions & 2 deletions go/cmd/vtctld/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ func initSchema() {
}
ctx := context.Background()
wr := wrangler.New(logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
err = schemamanager.Run(
_, err = schemamanager.Run(
ctx,
controller,
schemamanager.NewTabletExecutor("vtctld/schema", wr, *schemaChangeReplicasTimeout),
schemamanager.NewTabletExecutor("vtctld/schema", wr.TopoServer(), wr.TabletManagerClient(), wr.Logger(), *schemaChangeReplicasTimeout),
)
if err != nil {
log.Errorf("Schema change failed, error: %v", err)
Expand Down
5 changes: 5 additions & 0 deletions go/test/endtoend/cluster/vtctlclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type VtctlClientParams struct {
MigrationContext string
SkipPreflight bool
UUIDList string
CallerId string
}

// InitShardPrimary executes vtctlclient command to make specified tablet the primary for the shard.
Expand Down Expand Up @@ -88,6 +89,10 @@ func (vtctlclient *VtctlClientProcess) ApplySchemaWithOutput(Keyspace string, SQ
if params.SkipPreflight {
args = append(args, "-skip_preflight")
}

if params.CallerId != "" {
args = append(args, "-caller_id", params.CallerId)
}
args = append(args, Keyspace)
return vtctlclient.ExecuteCommandWithOutput(args...)
}
Expand Down
74 changes: 58 additions & 16 deletions go/test/endtoend/onlineddl/ghost/onlineddl_ghost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,48 @@ func TestMain(m *testing.M) {
"-ddl_strategy", "gh-ost",
}

primaryTablet := *clusterInstance.Keyspaces[0].Shards[0].Vttablets[0]
replicaTablet := *clusterInstance.Keyspaces[0].Shards[0].Vttablets[1]

tableACLConfigJSON := path.Join("/tmp", "table_acl_config.json")
f, _ := os.Create(tableACLConfigJSON)
vtStaticACLJSON := `{
"table_groups":
[
{
"name": "vitess user groups",
"table_names_or_prefixes":
[
"%"
],
"readers":
[
"vitess-reader",
"vitess-writer",
"vitess-admin"
],
"writers":
[
"vitess-writer",
"vitess-writer-only",
"vitess-admin"
],
"admins":
[
"vitess-admin"
]
}
]
}`
f.WriteString(vtStaticACLJSON)
f.Close()

// start the tablets
for _, tablet := range []cluster.Vttablet{primaryTablet, replicaTablet} {
tablet.VttabletProcess.ExtraArgs = append(tablet.VttabletProcess.ExtraArgs, "-table-acl-config", tableACLConfigJSON, "-queryserver-config-strict-table-acl")
tablet.VttabletProcess.Setup()
}

if err := clusterInstance.StartTopo(); err != nil {
return 1, err
}
Expand Down Expand Up @@ -220,10 +262,10 @@ func TestSchemaChange(t *testing.T) {
assert.Equal(t, 2, len(shards))
testWithInitialSchema(t)
t.Run("create non_online", func(t *testing.T) {
_ = testOnlineDDLStatement(t, alterTableNormalStatement, string(schema.DDLStrategyDirect), "vtctl", "non_online")
_ = testOnlineDDLStatement(t, alterTableNormalStatement, string(schema.DDLStrategyDirect), "vtctl", "non_online", "")
})
t.Run("successful online alter, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, alterTableSuccessfulStatement, "gh-ost", "vtgate", "ghost_col")
uuid := testOnlineDDLStatement(t, alterTableSuccessfulStatement, "gh-ost", "vtgate", "ghost_col", "")
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false)
Expand All @@ -248,13 +290,13 @@ func TestSchemaChange(t *testing.T) {

})
t.Run("successful online alter, vtctl", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "gh-ost", "vtctl", "ghost_col")
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "gh-ost", "vtctl", "ghost_col", "")
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false)
})
t.Run("successful online alter, postponed, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "gh-ost -postpone-completion", "vtgate", "ghost_col")
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "gh-ost -postpone-completion", "vtgate", "ghost_col", "")
// Should be still running!
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning)
// Issue a complete and wait for successful completion
Expand All @@ -268,14 +310,14 @@ func TestSchemaChange(t *testing.T) {
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false)
})
t.Run("throttled migration", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, alterTableThrottlingStatement, "gh-ost --max-load=Threads_running=1", "vtgate", "ghost_col")
uuid := testOnlineDDLStatement(t, alterTableThrottlingStatement, "gh-ost --max-load=Threads_running=1", "vtgate", "ghost_col", "")
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, true)
time.Sleep(2 * time.Second)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed)
})
t.Run("failed migration", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, alterTableFailedStatement, "gh-ost", "vtgate", "ghost_col")
uuid := testOnlineDDLStatement(t, alterTableFailedStatement, "gh-ost", "vtgate", "ghost_col", "")
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, true)
Expand All @@ -294,54 +336,54 @@ func TestSchemaChange(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
_ = testOnlineDDLStatement(t, alterTableThrottlingStatement, "gh-ost --max-load=Threads_running=1", "vtgate", "ghost_col")
_ = testOnlineDDLStatement(t, alterTableThrottlingStatement, "gh-ost --max-load=Threads_running=1", "vtgate", "ghost_col", "")
}()
}
wg.Wait()
onlineddl.CheckCancelAllMigrations(t, &vtParams, len(shards)*count)
})
t.Run("Online DROP, vtctl", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLDropTableStatement, "gh-ost", "vtctl", "")
uuid := testOnlineDDLStatement(t, onlineDDLDropTableStatement, "gh-ost", "vtctl", "", "")
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false)
})
t.Run("Online CREATE, vtctl", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLCreateTableStatement, "gh-ost", "vtctl", "online_ddl_create_col")
uuid := testOnlineDDLStatement(t, onlineDDLCreateTableStatement, "gh-ost", "vtctl", "online_ddl_create_col", "")
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false)
})
t.Run("Online DROP TABLE IF EXISTS, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLDropTableIfExistsStatement, "gh-ost", "vtgate", "")
uuid := testOnlineDDLStatement(t, onlineDDLDropTableIfExistsStatement, "gh-ost", "vtgate", "", "")
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false)
// this table existed
checkTables(t, schema.OnlineDDLToGCUUID(uuid), 1)
})
t.Run("Online DROP TABLE IF EXISTS for nonexistent table, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLDropTableIfExistsStatement, "gh-ost", "vtgate", "")
uuid := testOnlineDDLStatement(t, onlineDDLDropTableIfExistsStatement, "gh-ost", "vtgate", "", "")
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false)
// this table did not exist
checkTables(t, schema.OnlineDDLToGCUUID(uuid), 0)
})
t.Run("Online DROP TABLE for nonexistent table, expect error, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLDropTableStatement, "gh-ost", "vtgate", "")
uuid := testOnlineDDLStatement(t, onlineDDLDropTableStatement, "gh-ost", "vtgate", "", "")
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, true)
})
t.Run("Online CREATE no PK table, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, noPKCreateTableStatement, "gh-ost", "vtgate", "online_ddl_create_col")
uuid := testOnlineDDLStatement(t, noPKCreateTableStatement, "gh-ost", "vtgate", "online_ddl_create_col", "")
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false)
})
t.Run("Fail ALTER for no PK table, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "gh-ost", "vtgate", "")
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "gh-ost", "vtgate", "", "")
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed)

expectedMessage := "No PRIMARY nor UNIQUE key found"
Expand Down Expand Up @@ -383,7 +425,7 @@ func testWithInitialSchema(t *testing.T) {
}

// testOnlineDDLStatement runs an online DDL, ALTER statement
func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy string, executeStrategy string, expectHint string) (uuid string) {
func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy string, executeStrategy string, expectHint string, callerId string) (uuid string) {
tableName := fmt.Sprintf("vt_onlineddl_test_%02d", 3)
sqlQuery := fmt.Sprintf(alterStatement, tableName)
if executeStrategy == "vtgate" {
Expand All @@ -393,7 +435,7 @@ func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy str
}
} else {
var err error
uuid, err = clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, sqlQuery, cluster.VtctlClientParams{DDLStrategy: ddlStrategy})
uuid, err = clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, sqlQuery, cluster.VtctlClientParams{DDLStrategy: ddlStrategy, CallerId: callerId})
assert.NoError(t, err)
}
uuid = strings.TrimSpace(uuid)
Expand Down
Loading

0 comments on commit f906693

Please sign in to comment.