Skip to content

Commit

Permalink
Merge pull request #5290 from lyonlai/ylai-20191010-multi-split-diff-…
Browse files Browse the repository at this point in the history
…in-workflow

MultiSplitDiff support in workflow
  • Loading branch information
systay authored Oct 11, 2019
2 parents b4992f4 + 37c586e commit 0ecab46
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 90 deletions.
29 changes: 24 additions & 5 deletions go/vt/workflow/resharding/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,17 @@ func createTaskID(phase workflow.PhaseType, shardName string) string {
func (hw *horizontalReshardingWorkflow) GetTasks(phase workflow.PhaseType) []*workflowpb.Task {
var shards []string
switch phase {
case phaseCopySchema, phaseWaitForFilteredReplication, phaseDiff:
case phaseCopySchema, phaseWaitForFilteredReplication:
shards = strings.Split(hw.checkpoint.Settings["destination_shards"], ",")
case phaseClone, phaseMigrateRdonly, phaseMigrateReplica, phaseMigrateMaster:
shards = strings.Split(hw.checkpoint.Settings["source_shards"], ",")
case phaseDiff:
switch hw.checkpoint.Settings["split_diff_cmd"] {
case "SplitDiff":
shards = strings.Split(hw.checkpoint.Settings["destination_shards"], ",")
case "MultiSplitDiff":
shards = strings.Split(hw.checkpoint.Settings["source_shards"], ",")
}
default:
log.Fatalf("BUG: unknown phase type: %v", phase)
}
Expand Down Expand Up @@ -82,15 +89,17 @@ func (hw *horizontalReshardingWorkflow) runSplitClone(ctx context.Context, t *wo
return err
}

args := []string{splitCmd, "--min_healthy_rdonly_tablets=" + minHealthyRdonlyTablets, sourceKeyspaceShard}
args := []string{splitCmd, "--min_healthy_rdonly_tablets=" + minHealthyRdonlyTablets}
if useConsistentSnapshot != "" {
args = append(args, "--use_consistent_snapshot")
}

if excludeTables != "" {
args = append(args, "--exclude_tables="+excludeTables)
args = append(args, fmt.Sprintf("--exclude_tables=%s", excludeTables))
}

args = append(args, sourceKeyspaceShard)

_, err := automation.ExecuteVtworker(hw.ctx, worker, args)
return err
}
Expand All @@ -103,7 +112,9 @@ func (hw *horizontalReshardingWorkflow) runWaitForFilteredReplication(ctx contex

func (hw *horizontalReshardingWorkflow) runSplitDiff(ctx context.Context, t *workflowpb.Task) error {
keyspace := t.Attributes["keyspace"]
splitDiffCmd := t.Attributes["split_diff_cmd"]
destShard := t.Attributes["destination_shard"]
sourceShard := t.Attributes["source_shard"]
destinationTabletType := t.Attributes["dest_tablet_type"]
worker := t.Attributes["vtworker"]
useConsistentSnapshot := t.Attributes["use_consistent_snapshot"]
Expand All @@ -112,13 +123,21 @@ func (hw *horizontalReshardingWorkflow) runSplitDiff(ctx context.Context, t *wor
if _, err := automation.ExecuteVtworker(hw.ctx, worker, []string{"Reset"}); err != nil {
return err
}
args := []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", "--dest_tablet_type=" + destinationTabletType, topoproto.KeyspaceShardString(keyspace, destShard)}
args := []string{splitDiffCmd}

if useConsistentSnapshot != "" {
args = append(args, "--use_consistent_snapshot")
}

if excludeTables != "" {
args = append(args, "--exclude_tables="+excludeTables)
args = append(args, fmt.Sprintf("--exclude_tables=%s", excludeTables))
}

switch splitDiffCmd {
case "SplitDiff":
args = append(args, "--min_healthy_rdonly_tablets=1", "--dest_tablet_type="+destinationTabletType, topoproto.KeyspaceShardString(keyspace, destShard))
case "MultiSplitDiff":
args = append(args, "--min_healthy_tablets=1", "--tablet_type="+destinationTabletType, topoproto.KeyspaceShardString(keyspace, sourceShard))
}

_, err := automation.ExecuteVtworker(ctx, worker, args)
Expand Down
69 changes: 51 additions & 18 deletions go/vt/workflow/resharding/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string)
sourceShardsStr := subFlags.String("source_shards", "", "A comma-separated list of source shards")
destinationShardsStr := subFlags.String("destination_shards", "", "A comma-separated list of destination shards")
minHealthyRdonlyTablets := subFlags.String("min_healthy_rdonly_tablets", "1", "Minimum number of healthy RDONLY tablets required in source shards")
skipSplitRatioCheck := subFlags.Bool("skip_split_ratio_check", false, "Skip validation on minimum number of healthy RDONLY tablets")
splitCmd := subFlags.String("split_cmd", "SplitClone", "Split command to use to perform horizontal resharding (either SplitClone or LegacySplitClone)")
splitDiffCmd := subFlags.String("split_diff_cmd", "SplitDiff", "Split diff command to use to perform horizontal resharding (either SplitDiff or MultiSplitDiff)")
splitDiffDestTabletType := subFlags.String("split_diff_dest_tablet_type", "RDONLY", "Specifies tablet type to use in destination shards while performing SplitDiff operation")
phaseEnaableApprovalsDesc := fmt.Sprintf("Comma separated phases that require explicit approval in the UI to execute. Phase names are: %v", strings.Join(WorkflowPhases(), ","))
phaseEnableApprovalsStr := subFlags.String("phase_enable_approvals", strings.Join(WorkflowPhases(), ","), phaseEnaableApprovalsDesc)
Expand All @@ -81,7 +83,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string)
if err := subFlags.Parse(args); err != nil {
return err
}
if *keyspace == "" || *vtworkersStr == "" || *minHealthyRdonlyTablets == "" || *splitCmd == "" {
if *keyspace == "" || *vtworkersStr == "" || *minHealthyRdonlyTablets == "" || *splitCmd == "" || *splitDiffCmd == "" {
return fmt.Errorf("keyspace name, min healthy rdonly tablets, split command, and vtworkers information must be provided for horizontal resharding")
}

Expand All @@ -106,13 +108,13 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string)
useConsistentSnapshotArg = "true"
}

err := validateWorkflow(m, *keyspace, vtworkers, sourceShards, destinationShards, *minHealthyRdonlyTablets)
err := validateWorkflow(m, *keyspace, vtworkers, sourceShards, destinationShards, *minHealthyRdonlyTablets, *skipSplitRatioCheck)
if err != nil {
return err
}

w.Name = fmt.Sprintf("Reshard shards %v into shards %v of keyspace %v.", *keyspace, *sourceShardsStr, *destinationShardsStr)
checkpoint, err := initCheckpoint(*keyspace, vtworkers, excludeTables, sourceShards, destinationShards, *minHealthyRdonlyTablets, *splitCmd, *splitDiffDestTabletType, useConsistentSnapshotArg)
checkpoint, err := initCheckpoint(*keyspace, vtworkers, excludeTables, sourceShards, destinationShards, *minHealthyRdonlyTablets, *splitCmd, *splitDiffCmd, *splitDiffDestTabletType, useConsistentSnapshotArg)
if err != nil {
return err
}
Expand Down Expand Up @@ -161,10 +163,12 @@ func (*Factory) Instantiate(m *workflow.Manager, w *workflowpb.Workflow, rootNod
Name: "WaitForFilteredReplication",
PathName: string(phaseWaitForFilteredReplication),
}

diffUINode := &workflow.Node{
Name: "SplitDiff",
Name: checkpoint.Settings["split_diff_cmd"],
PathName: string(phaseDiff),
}

migrateRdonlyUINode := &workflow.Node{
Name: "MigrateServedTypeRDONLY",
PathName: string(phaseMigrateRdonly),
Expand Down Expand Up @@ -200,9 +204,19 @@ func (*Factory) Instantiate(m *workflow.Manager, w *workflowpb.Workflow, rootNod
if err := createUINodes(hw.rootUINode, phaseWaitForFilteredReplication, destinationShards); err != nil {
return hw, err
}
if err := createUINodes(hw.rootUINode, phaseDiff, destinationShards); err != nil {
var shardsToUseForDiff []string

switch hw.checkpoint.Settings["split_diff_cmd"] {
case "SplitDiff":
shardsToUseForDiff = destinationShards
case "MultiSplitDiff":
shardsToUseForDiff = sourceShards
}

if err := createUINodes(hw.rootUINode, phaseDiff, shardsToUseForDiff); err != nil {
return hw, err
}

if err := createUINodes(hw.rootUINode, phaseMigrateRdonly, sourceShards); err != nil {
return hw, err
}
Expand Down Expand Up @@ -233,7 +247,7 @@ func createUINodes(rootNode *workflow.Node, phaseName workflow.PhaseType, shards
}

// validateWorkflow validates that workflow has valid input parameters.
func validateWorkflow(m *workflow.Manager, keyspace string, vtworkers, sourceShards, destinationShards []string, minHealthyRdonlyTablets string) error {
func validateWorkflow(m *workflow.Manager, keyspace string, vtworkers, sourceShards, destinationShards []string, minHealthyRdonlyTablets string, skipSplitRatioCheck bool) error {
if len(sourceShards) == 0 || len(destinationShards) == 0 {
return fmt.Errorf("invalid source or destination shards")
}
Expand All @@ -242,7 +256,7 @@ func validateWorkflow(m *workflow.Manager, keyspace string, vtworkers, sourceSha
}

splitRatio := len(destinationShards) / len(sourceShards)
if minHealthyRdonlyTabletsVal, err := strconv.Atoi(minHealthyRdonlyTablets); err != nil || minHealthyRdonlyTabletsVal < splitRatio {
if minHealthyRdonlyTabletsVal, err := strconv.Atoi(minHealthyRdonlyTablets); err != nil || (!skipSplitRatioCheck && minHealthyRdonlyTabletsVal < splitRatio) {
return fmt.Errorf("there are not enough rdonly tablets in source shards. You need at least %v, it got: %v", splitRatio, minHealthyRdonlyTablets)
}

Expand Down Expand Up @@ -271,7 +285,7 @@ func validateWorkflow(m *workflow.Manager, keyspace string, vtworkers, sourceSha
}

// initCheckpoint initialize the checkpoint for the horizontal workflow.
func initCheckpoint(keyspace string, vtworkers, excludeTables, sourceShards, destinationShards []string, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType string, useConsistentSnapshot string) (*workflowpb.WorkflowCheckpoint, error) {
func initCheckpoint(keyspace string, vtworkers, excludeTables, sourceShards, destinationShards []string, minHealthyRdonlyTablets, splitCmd, splitDiffCmd, splitDiffDestTabletType string, useConsistentSnapshot string) (*workflowpb.WorkflowCheckpoint, error) {
tasks := make(map[string]*workflowpb.Task)
initTasks(tasks, phaseCopySchema, destinationShards, func(i int, shard string) map[string]string {
return map[string]string{
Expand All @@ -298,16 +312,34 @@ func initCheckpoint(keyspace string, vtworkers, excludeTables, sourceShards, des
"destination_shard": shard,
}
})
initTasks(tasks, phaseDiff, destinationShards, func(i int, shard string) map[string]string {
return map[string]string{
"keyspace": keyspace,
"destination_shard": shard,
"dest_tablet_type": splitDiffDestTabletType,
"vtworker": vtworkers[i],
"use_consistent_snapshot": useConsistentSnapshot,
"exclude_tables": strings.Join(excludeTables, ","),
}
})

switch splitDiffCmd {
case "SplitDiff":
initTasks(tasks, phaseDiff, destinationShards, func(i int, shard string) map[string]string {
return map[string]string{
"keyspace": keyspace,
"destination_shard": shard,
"dest_tablet_type": splitDiffDestTabletType,
"split_diff_cmd": splitDiffCmd,
"vtworker": vtworkers[i],
"use_consistent_snapshot": useConsistentSnapshot,
"exclude_tables": strings.Join(excludeTables, ","),
}
})
case "MultiSplitDiff":
initTasks(tasks, phaseDiff, sourceShards, func(i int, shard string) map[string]string {
return map[string]string{
"keyspace": keyspace,
"source_shard": shard,
"dest_tablet_type": splitDiffDestTabletType,
"split_diff_cmd": splitDiffCmd,
"vtworker": vtworkers[i],
"use_consistent_snapshot": useConsistentSnapshot,
"exclude_tables": strings.Join(excludeTables, ","),
}
})
}

initTasks(tasks, phaseMigrateRdonly, sourceShards, func(i int, shard string) map[string]string {
return map[string]string{
"keyspace": keyspace,
Expand Down Expand Up @@ -336,6 +368,7 @@ func initCheckpoint(keyspace string, vtworkers, excludeTables, sourceShards, des
Settings: map[string]string{
"source_shards": strings.Join(sourceShards, ","),
"destination_shards": strings.Join(destinationShards, ","),
"split_diff_cmd": splitDiffCmd,
},
}, nil
}
Expand Down
48 changes: 35 additions & 13 deletions go/vt/workflow/resharding/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestSourceDestShards(t *testing.T) {
defer ctrl.Finish()

// Set up the fakeworkerclient. It is used at SplitClone and SplitDiff phase.
fakeVtworkerClient := setupFakeVtworker(testKeyspace, testVtworkers, false, "")
fakeVtworkerClient := setupFakeVtworker(testKeyspace, testVtworkers, false, "", "SplitDiff")
vtworkerclient.RegisterFactory("fake", fakeVtworkerClient.FakeVtworkerClientFactory)
defer vtworkerclient.UnregisterFactoryForTest("fake")

Expand Down Expand Up @@ -90,28 +90,32 @@ func TestSourceDestShards(t *testing.T) {

// TestHorizontalResharding runs the happy path of HorizontalReshardingWorkflow.
func TestHorizontalResharding(t *testing.T) {
testHorizontalReshardingWorkflow(t, false, "")
testHorizontalReshardingWorkflow(t, false, "", "SplitDiff")
}

// TestHorizontalReshardingWithConsistentSnapshot runs the happy path of HorizontalReshardingWorkflow with consistent snapshot.
func TestHorizontalReshardingWithConsistentSnapshot(t *testing.T) {
testHorizontalReshardingWorkflow(t, true, "")
testHorizontalReshardingWorkflow(t, true, "", "SplitDiff")
}

// TestHorizontalReshardingWithExcludedTables runs the happy path of HorizontalReshardingWorkflow with excluded tables.
func TestHorizontalReshardingWithExcludedTables(t *testing.T) {
testHorizontalReshardingWorkflow(t, true, "table_a,table_b")
testHorizontalReshardingWorkflow(t, true, "table_a,table_b", "SplitDiff")
}

func testHorizontalReshardingWorkflow(t *testing.T, useConsistentSnapshot bool, excludeTables string) {
func TestHorizontalReshardingWithMultiDiffCommand(t *testing.T) {
testHorizontalReshardingWorkflow(t, true, "table_a,table_b", "MultiSplitDiff")
}

func testHorizontalReshardingWorkflow(t *testing.T, useConsistentSnapshot bool, excludeTables, splitDiffCommand string) {
ctx := context.Background()
// Set up the mock wrangler. It is used for the CopySchema,
// WaitforFilteredReplication and Migrate phase.
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockWranglerInterface := setupMockWrangler(ctrl, testKeyspace)
// Set up the fakeworkerclient. It is used at SplitClone and SplitDiff phase.
fakeVtworkerClient := setupFakeVtworker(testKeyspace, testVtworkers, useConsistentSnapshot, excludeTables)
fakeVtworkerClient := setupFakeVtworker(testKeyspace, testVtworkers, useConsistentSnapshot, excludeTables, splitDiffCommand)
vtworkerclient.RegisterFactory("fake", fakeVtworkerClient.FakeVtworkerClientFactory)
defer vtworkerclient.UnregisterFactoryForTest("fake")
// Initialize the topology.
Expand All @@ -121,13 +125,14 @@ func testHorizontalReshardingWorkflow(t *testing.T, useConsistentSnapshot bool,
wg, _, cancel := workflow.StartManager(m)
// Create the workflow.
vtworkersParameter := testVtworkers + "," + testVtworkers
args := []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-phase_enable_approvals=", "-min_healthy_rdonly_tablets=2", "-source_shards=0", "-destination_shards=-80,80-"}
args := []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-phase_enable_approvals=", "-min_healthy_rdonly_tablets=2"}
if useConsistentSnapshot {
args = append(args, "-use_consistent_snapshot")
}
if excludeTables != "" {
args = append(args, "-exclude_tables="+excludeTables)
}
args = append(args, "-source_shards=0", "-destination_shards=-80,80-", "-split_diff_cmd="+splitDiffCommand)
uuid, err := m.Create(ctx, horizontalReshardingFactoryName, args)
if err != nil {
t.Fatalf("cannot create resharding workflow: %v", err)
Expand Down Expand Up @@ -156,15 +161,22 @@ func testHorizontalReshardingWorkflow(t *testing.T, useConsistentSnapshot bool,
wg.Wait()
}

func setupFakeVtworker(keyspace, vtworkers string, useConsistentSnapshot bool, excludeTables string) *fakevtworkerclient.FakeVtworkerClient {
func setupFakeVtworker(keyspace, vtworkers string, useConsistentSnapshot bool, excludeTables, splitDiffCmd string) *fakevtworkerclient.FakeVtworkerClient {
flag.Set("vtworker_client_protocol", "fake")
fakeVtworkerClient := fakevtworkerclient.NewFakeVtworkerClient()
fakeVtworkerClient.RegisterResultForAddr(vtworkers, resetCommand(), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitCloneCommand(keyspace, useConsistentSnapshot, excludeTables), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, resetCommand(), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "-80", useConsistentSnapshot, excludeTables), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, resetCommand(), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "80-", useConsistentSnapshot, excludeTables), "", nil)

switch splitDiffCmd {
case "SplitDiff":
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "-80", useConsistentSnapshot, excludeTables, splitDiffCmd), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "80-", useConsistentSnapshot, excludeTables, splitDiffCmd), "", nil)
case "MultiSplitDiff":
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "0", useConsistentSnapshot, excludeTables, splitDiffCmd), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "0", useConsistentSnapshot, excludeTables, splitDiffCmd), "", nil)
}
return fakeVtworkerClient
}

Expand All @@ -173,24 +185,34 @@ func resetCommand() []string {
}

func splitCloneCommand(keyspace string, useConsistentSnapshot bool, excludeTables string) []string {
args := []string{"SplitClone", "--min_healthy_rdonly_tablets=2", keyspace + "/0"}
args := []string{"SplitClone", "--min_healthy_rdonly_tablets=2"}
if useConsistentSnapshot {
args = append(args, "--use_consistent_snapshot")
}
if excludeTables != "" {
args = append(args, "--exclude_tables="+excludeTables)
}

args = append(args, keyspace+"/0")
return args
}

func splitDiffCommand(keyspace string, shardId string, useConsistentSnapshot bool, excludeTables string) []string {
args := []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", "--dest_tablet_type=RDONLY", keyspace + "/" + shardId}
func splitDiffCommand(keyspace string, shardId string, useConsistentSnapshot bool, excludeTables, splitDiffCommand string) []string {
args := []string{splitDiffCommand}
if useConsistentSnapshot {
args = append(args, "--use_consistent_snapshot")
}
if excludeTables != "" {
args = append(args, "--exclude_tables="+excludeTables)
}

switch splitDiffCommand {
case "SplitDiff":
args = append(args, "--min_healthy_rdonly_tablets=1", "--dest_tablet_type=RDONLY", keyspace+"/"+shardId)
case "MultiSplitDiff":
args = append(args, "--min_healthy_tablets=1", "--tablet_type=RDONLY", keyspace+"/"+shardId)
}

return args
}

Expand Down
Loading

0 comments on commit 0ecab46

Please sign in to comment.