Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MultiSplitDiff support in workflow #5290

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions go/vt/workflow/resharding/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,17 @@ func createTaskID(phase workflow.PhaseType, shardName string) string {
func (hw *horizontalReshardingWorkflow) GetTasks(phase workflow.PhaseType) []*workflowpb.Task {
var shards []string
switch phase {
case phaseCopySchema, phaseWaitForFilteredReplication, phaseDiff:
case phaseCopySchema, phaseWaitForFilteredReplication:
shards = strings.Split(hw.checkpoint.Settings["destination_shards"], ",")
case phaseClone, phaseMigrateRdonly, phaseMigrateReplica, phaseMigrateMaster:
shards = strings.Split(hw.checkpoint.Settings["source_shards"], ",")
case phaseDiff:
switch hw.checkpoint.Settings["split_diff_cmd"] {
case "SplitDiff":
shards = strings.Split(hw.checkpoint.Settings["destination_shards"], ",")
case "MultiSplitDiff":
shards = strings.Split(hw.checkpoint.Settings["source_shards"], ",")
}
default:
log.Fatalf("BUG: unknown phase type: %v", phase)
}
Expand Down Expand Up @@ -82,15 +89,17 @@ func (hw *horizontalReshardingWorkflow) runSplitClone(ctx context.Context, t *wo
return err
}

args := []string{splitCmd, "--min_healthy_rdonly_tablets=" + minHealthyRdonlyTablets, sourceKeyspaceShard}
args := []string{splitCmd, "--min_healthy_rdonly_tablets=" + minHealthyRdonlyTablets}
if useConsistentSnapshot != "" {
args = append(args, "--use_consistent_snapshot")
}

if excludeTables != "" {
args = append(args, "--exclude_tables="+excludeTables)
args = append(args, fmt.Sprintf("--exclude_tables=%s", excludeTables))
}

args = append(args, sourceKeyspaceShard)

_, err := automation.ExecuteVtworker(hw.ctx, worker, args)
return err
}
Expand All @@ -103,7 +112,9 @@ func (hw *horizontalReshardingWorkflow) runWaitForFilteredReplication(ctx contex

func (hw *horizontalReshardingWorkflow) runSplitDiff(ctx context.Context, t *workflowpb.Task) error {
keyspace := t.Attributes["keyspace"]
splitDiffCmd := t.Attributes["split_diff_cmd"]
destShard := t.Attributes["destination_shard"]
sourceShard := t.Attributes["source_shard"]
destinationTabletType := t.Attributes["dest_tablet_type"]
worker := t.Attributes["vtworker"]
useConsistentSnapshot := t.Attributes["use_consistent_snapshot"]
Expand All @@ -112,13 +123,21 @@ func (hw *horizontalReshardingWorkflow) runSplitDiff(ctx context.Context, t *wor
if _, err := automation.ExecuteVtworker(hw.ctx, worker, []string{"Reset"}); err != nil {
return err
}
args := []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", "--dest_tablet_type=" + destinationTabletType, topoproto.KeyspaceShardString(keyspace, destShard)}
args := []string{splitDiffCmd}

if useConsistentSnapshot != "" {
args = append(args, "--use_consistent_snapshot")
}

if excludeTables != "" {
args = append(args, "--exclude_tables="+excludeTables)
args = append(args, fmt.Sprintf("--exclude_tables=%s", excludeTables))
}

switch splitDiffCmd {
case "SplitDiff":
args = append(args, "--min_healthy_rdonly_tablets=1", "--dest_tablet_type="+destinationTabletType, topoproto.KeyspaceShardString(keyspace, destShard))
case "MultiSplitDiff":
args = append(args, "--min_healthy_tablets=1", "--tablet_type="+destinationTabletType, topoproto.KeyspaceShardString(keyspace, sourceShard))
}

_, err := automation.ExecuteVtworker(ctx, worker, args)
Expand Down
69 changes: 51 additions & 18 deletions go/vt/workflow/resharding/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string)
sourceShardsStr := subFlags.String("source_shards", "", "A comma-separated list of source shards")
destinationShardsStr := subFlags.String("destination_shards", "", "A comma-separated list of destination shards")
minHealthyRdonlyTablets := subFlags.String("min_healthy_rdonly_tablets", "1", "Minimum number of healthy RDONLY tablets required in source shards")
skipSplitRatioCheck := subFlags.Bool("skip_split_ratio_check", false, "Skip validation on minimum number of healthy RDONLY tablets")
splitCmd := subFlags.String("split_cmd", "SplitClone", "Split command to use to perform horizontal resharding (either SplitClone or LegacySplitClone)")
splitDiffCmd := subFlags.String("split_diff_cmd", "SplitDiff", "Split diff command to use to perform horizontal resharding (either SplitDiff or MultiSplitDiff)")
splitDiffDestTabletType := subFlags.String("split_diff_dest_tablet_type", "RDONLY", "Specifies tablet type to use in destination shards while performing SplitDiff operation")
phaseEnaableApprovalsDesc := fmt.Sprintf("Comma separated phases that require explicit approval in the UI to execute. Phase names are: %v", strings.Join(WorkflowPhases(), ","))
phaseEnableApprovalsStr := subFlags.String("phase_enable_approvals", strings.Join(WorkflowPhases(), ","), phaseEnaableApprovalsDesc)
Expand All @@ -81,7 +83,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string)
if err := subFlags.Parse(args); err != nil {
return err
}
if *keyspace == "" || *vtworkersStr == "" || *minHealthyRdonlyTablets == "" || *splitCmd == "" {
if *keyspace == "" || *vtworkersStr == "" || *minHealthyRdonlyTablets == "" || *splitCmd == "" || *splitDiffCmd == "" {
return fmt.Errorf("keyspace name, min healthy rdonly tablets, split command, and vtworkers information must be provided for horizontal resharding")
}

Expand All @@ -106,13 +108,13 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string)
useConsistentSnapshotArg = "true"
}

err := validateWorkflow(m, *keyspace, vtworkers, sourceShards, destinationShards, *minHealthyRdonlyTablets)
err := validateWorkflow(m, *keyspace, vtworkers, sourceShards, destinationShards, *minHealthyRdonlyTablets, *skipSplitRatioCheck)
if err != nil {
return err
}

w.Name = fmt.Sprintf("Reshard shards %v into shards %v of keyspace %v.", *keyspace, *sourceShardsStr, *destinationShardsStr)
checkpoint, err := initCheckpoint(*keyspace, vtworkers, excludeTables, sourceShards, destinationShards, *minHealthyRdonlyTablets, *splitCmd, *splitDiffDestTabletType, useConsistentSnapshotArg)
checkpoint, err := initCheckpoint(*keyspace, vtworkers, excludeTables, sourceShards, destinationShards, *minHealthyRdonlyTablets, *splitCmd, *splitDiffCmd, *splitDiffDestTabletType, useConsistentSnapshotArg)
if err != nil {
return err
}
Expand Down Expand Up @@ -161,10 +163,12 @@ func (*Factory) Instantiate(m *workflow.Manager, w *workflowpb.Workflow, rootNod
Name: "WaitForFilteredReplication",
PathName: string(phaseWaitForFilteredReplication),
}

diffUINode := &workflow.Node{
Name: "SplitDiff",
Name: checkpoint.Settings["split_diff_cmd"],
PathName: string(phaseDiff),
}

migrateRdonlyUINode := &workflow.Node{
Name: "MigrateServedTypeRDONLY",
PathName: string(phaseMigrateRdonly),
Expand Down Expand Up @@ -200,9 +204,19 @@ func (*Factory) Instantiate(m *workflow.Manager, w *workflowpb.Workflow, rootNod
if err := createUINodes(hw.rootUINode, phaseWaitForFilteredReplication, destinationShards); err != nil {
return hw, err
}
if err := createUINodes(hw.rootUINode, phaseDiff, destinationShards); err != nil {
var shardsToUseForDiff []string

switch hw.checkpoint.Settings["split_diff_cmd"] {
case "SplitDiff":
shardsToUseForDiff = destinationShards
case "MultiSplitDiff":
shardsToUseForDiff = sourceShards
}

if err := createUINodes(hw.rootUINode, phaseDiff, shardsToUseForDiff); err != nil {
return hw, err
}

if err := createUINodes(hw.rootUINode, phaseMigrateRdonly, sourceShards); err != nil {
return hw, err
}
Expand Down Expand Up @@ -233,7 +247,7 @@ func createUINodes(rootNode *workflow.Node, phaseName workflow.PhaseType, shards
}

// validateWorkflow validates that workflow has valid input parameters.
func validateWorkflow(m *workflow.Manager, keyspace string, vtworkers, sourceShards, destinationShards []string, minHealthyRdonlyTablets string) error {
func validateWorkflow(m *workflow.Manager, keyspace string, vtworkers, sourceShards, destinationShards []string, minHealthyRdonlyTablets string, skipSplitRatioCheck bool) error {
if len(sourceShards) == 0 || len(destinationShards) == 0 {
return fmt.Errorf("invalid source or destination shards")
}
Expand All @@ -242,7 +256,7 @@ func validateWorkflow(m *workflow.Manager, keyspace string, vtworkers, sourceSha
}

splitRatio := len(destinationShards) / len(sourceShards)
if minHealthyRdonlyTabletsVal, err := strconv.Atoi(minHealthyRdonlyTablets); err != nil || minHealthyRdonlyTabletsVal < splitRatio {
if minHealthyRdonlyTabletsVal, err := strconv.Atoi(minHealthyRdonlyTablets); err != nil || (!skipSplitRatioCheck && minHealthyRdonlyTabletsVal < splitRatio) {
return fmt.Errorf("there are not enough rdonly tablets in source shards. You need at least %v, it got: %v", splitRatio, minHealthyRdonlyTablets)
}

Expand Down Expand Up @@ -271,7 +285,7 @@ func validateWorkflow(m *workflow.Manager, keyspace string, vtworkers, sourceSha
}

// initCheckpoint initialize the checkpoint for the horizontal workflow.
func initCheckpoint(keyspace string, vtworkers, excludeTables, sourceShards, destinationShards []string, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType string, useConsistentSnapshot string) (*workflowpb.WorkflowCheckpoint, error) {
func initCheckpoint(keyspace string, vtworkers, excludeTables, sourceShards, destinationShards []string, minHealthyRdonlyTablets, splitCmd, splitDiffCmd, splitDiffDestTabletType string, useConsistentSnapshot string) (*workflowpb.WorkflowCheckpoint, error) {
tasks := make(map[string]*workflowpb.Task)
initTasks(tasks, phaseCopySchema, destinationShards, func(i int, shard string) map[string]string {
return map[string]string{
Expand All @@ -298,16 +312,34 @@ func initCheckpoint(keyspace string, vtworkers, excludeTables, sourceShards, des
"destination_shard": shard,
}
})
initTasks(tasks, phaseDiff, destinationShards, func(i int, shard string) map[string]string {
return map[string]string{
"keyspace": keyspace,
"destination_shard": shard,
"dest_tablet_type": splitDiffDestTabletType,
"vtworker": vtworkers[i],
"use_consistent_snapshot": useConsistentSnapshot,
"exclude_tables": strings.Join(excludeTables, ","),
}
})

switch splitDiffCmd {
case "SplitDiff":
initTasks(tasks, phaseDiff, destinationShards, func(i int, shard string) map[string]string {
return map[string]string{
"keyspace": keyspace,
"destination_shard": shard,
"dest_tablet_type": splitDiffDestTabletType,
"split_diff_cmd": splitDiffCmd,
"vtworker": vtworkers[i],
"use_consistent_snapshot": useConsistentSnapshot,
"exclude_tables": strings.Join(excludeTables, ","),
}
})
case "MultiSplitDiff":
initTasks(tasks, phaseDiff, sourceShards, func(i int, shard string) map[string]string {
return map[string]string{
"keyspace": keyspace,
"source_shard": shard,
"dest_tablet_type": splitDiffDestTabletType,
"split_diff_cmd": splitDiffCmd,
"vtworker": vtworkers[i],
"use_consistent_snapshot": useConsistentSnapshot,
"exclude_tables": strings.Join(excludeTables, ","),
}
})
}

initTasks(tasks, phaseMigrateRdonly, sourceShards, func(i int, shard string) map[string]string {
return map[string]string{
"keyspace": keyspace,
Expand Down Expand Up @@ -336,6 +368,7 @@ func initCheckpoint(keyspace string, vtworkers, excludeTables, sourceShards, des
Settings: map[string]string{
"source_shards": strings.Join(sourceShards, ","),
"destination_shards": strings.Join(destinationShards, ","),
"split_diff_cmd": splitDiffCmd,
},
}, nil
}
Expand Down
48 changes: 35 additions & 13 deletions go/vt/workflow/resharding/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestSourceDestShards(t *testing.T) {
defer ctrl.Finish()

// Set up the fakeworkerclient. It is used at SplitClone and SplitDiff phase.
fakeVtworkerClient := setupFakeVtworker(testKeyspace, testVtworkers, false, "")
fakeVtworkerClient := setupFakeVtworker(testKeyspace, testVtworkers, false, "", "SplitDiff")
vtworkerclient.RegisterFactory("fake", fakeVtworkerClient.FakeVtworkerClientFactory)
defer vtworkerclient.UnregisterFactoryForTest("fake")

Expand Down Expand Up @@ -90,28 +90,32 @@ func TestSourceDestShards(t *testing.T) {

// TestHorizontalResharding runs the happy path of HorizontalReshardingWorkflow.
func TestHorizontalResharding(t *testing.T) {
testHorizontalReshardingWorkflow(t, false, "")
testHorizontalReshardingWorkflow(t, false, "", "SplitDiff")
}

// TestHorizontalReshardingWithConsistentSnapshot runs the happy path of HorizontalReshardingWorkflow with consistent snapshot.
func TestHorizontalReshardingWithConsistentSnapshot(t *testing.T) {
testHorizontalReshardingWorkflow(t, true, "")
testHorizontalReshardingWorkflow(t, true, "", "SplitDiff")
}

// TestHorizontalReshardingWithExcludedTables runs the happy path of HorizontalReshardingWorkflow with excluded tables.
func TestHorizontalReshardingWithExcludedTables(t *testing.T) {
testHorizontalReshardingWorkflow(t, true, "table_a,table_b")
testHorizontalReshardingWorkflow(t, true, "table_a,table_b", "SplitDiff")
}

func testHorizontalReshardingWorkflow(t *testing.T, useConsistentSnapshot bool, excludeTables string) {
func TestHorizontalReshardingWithMultiDiffCommand(t *testing.T) {
testHorizontalReshardingWorkflow(t, true, "table_a,table_b", "MultiSplitDiff")
}

func testHorizontalReshardingWorkflow(t *testing.T, useConsistentSnapshot bool, excludeTables, splitDiffCommand string) {
ctx := context.Background()
// Set up the mock wrangler. It is used for the CopySchema,
// WaitforFilteredReplication and Migrate phase.
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockWranglerInterface := setupMockWrangler(ctrl, testKeyspace)
// Set up the fakeworkerclient. It is used at SplitClone and SplitDiff phase.
fakeVtworkerClient := setupFakeVtworker(testKeyspace, testVtworkers, useConsistentSnapshot, excludeTables)
fakeVtworkerClient := setupFakeVtworker(testKeyspace, testVtworkers, useConsistentSnapshot, excludeTables, splitDiffCommand)
vtworkerclient.RegisterFactory("fake", fakeVtworkerClient.FakeVtworkerClientFactory)
defer vtworkerclient.UnregisterFactoryForTest("fake")
// Initialize the topology.
Expand All @@ -121,13 +125,14 @@ func testHorizontalReshardingWorkflow(t *testing.T, useConsistentSnapshot bool,
wg, _, cancel := workflow.StartManager(m)
// Create the workflow.
vtworkersParameter := testVtworkers + "," + testVtworkers
args := []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-phase_enable_approvals=", "-min_healthy_rdonly_tablets=2", "-source_shards=0", "-destination_shards=-80,80-"}
args := []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-phase_enable_approvals=", "-min_healthy_rdonly_tablets=2"}
if useConsistentSnapshot {
args = append(args, "-use_consistent_snapshot")
}
if excludeTables != "" {
args = append(args, "-exclude_tables="+excludeTables)
}
args = append(args, "-source_shards=0", "-destination_shards=-80,80-", "-split_diff_cmd="+splitDiffCommand)
uuid, err := m.Create(ctx, horizontalReshardingFactoryName, args)
if err != nil {
t.Fatalf("cannot create resharding workflow: %v", err)
Expand Down Expand Up @@ -156,15 +161,22 @@ func testHorizontalReshardingWorkflow(t *testing.T, useConsistentSnapshot bool,
wg.Wait()
}

func setupFakeVtworker(keyspace, vtworkers string, useConsistentSnapshot bool, excludeTables string) *fakevtworkerclient.FakeVtworkerClient {
func setupFakeVtworker(keyspace, vtworkers string, useConsistentSnapshot bool, excludeTables, splitDiffCmd string) *fakevtworkerclient.FakeVtworkerClient {
flag.Set("vtworker_client_protocol", "fake")
fakeVtworkerClient := fakevtworkerclient.NewFakeVtworkerClient()
fakeVtworkerClient.RegisterResultForAddr(vtworkers, resetCommand(), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitCloneCommand(keyspace, useConsistentSnapshot, excludeTables), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, resetCommand(), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "-80", useConsistentSnapshot, excludeTables), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, resetCommand(), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "80-", useConsistentSnapshot, excludeTables), "", nil)

switch splitDiffCmd {
case "SplitDiff":
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "-80", useConsistentSnapshot, excludeTables, splitDiffCmd), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "80-", useConsistentSnapshot, excludeTables, splitDiffCmd), "", nil)
case "MultiSplitDiff":
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "0", useConsistentSnapshot, excludeTables, splitDiffCmd), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "0", useConsistentSnapshot, excludeTables, splitDiffCmd), "", nil)
}
return fakeVtworkerClient
}

Expand All @@ -173,24 +185,34 @@ func resetCommand() []string {
}

func splitCloneCommand(keyspace string, useConsistentSnapshot bool, excludeTables string) []string {
args := []string{"SplitClone", "--min_healthy_rdonly_tablets=2", keyspace + "/0"}
args := []string{"SplitClone", "--min_healthy_rdonly_tablets=2"}
if useConsistentSnapshot {
args = append(args, "--use_consistent_snapshot")
}
if excludeTables != "" {
args = append(args, "--exclude_tables="+excludeTables)
}

args = append(args, keyspace+"/0")
return args
}

func splitDiffCommand(keyspace string, shardId string, useConsistentSnapshot bool, excludeTables string) []string {
args := []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", "--dest_tablet_type=RDONLY", keyspace + "/" + shardId}
func splitDiffCommand(keyspace string, shardId string, useConsistentSnapshot bool, excludeTables, splitDiffCommand string) []string {
args := []string{splitDiffCommand}
if useConsistentSnapshot {
args = append(args, "--use_consistent_snapshot")
}
if excludeTables != "" {
args = append(args, "--exclude_tables="+excludeTables)
}

switch splitDiffCommand {
case "SplitDiff":
args = append(args, "--min_healthy_rdonly_tablets=1", "--dest_tablet_type=RDONLY", keyspace+"/"+shardId)
case "MultiSplitDiff":
args = append(args, "--min_healthy_tablets=1", "--tablet_type=RDONLY", keyspace+"/"+shardId)
}

return args
}

Expand Down
Loading