Skip to content

Commit

Permalink
Merge pull request #5213 from lyonlai/ylai-190923-exclude-tables-in-w…
Browse files Browse the repository at this point in the history
…orkflow

Added exclude_tables flag in workflow and pass it down to vtworker
  • Loading branch information
tirsen authored Sep 24, 2019
2 parents 909d0c8 + b600193 commit 0459a0c
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 86 deletions.
15 changes: 14 additions & 1 deletion go/vt/workflow/resharding/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ func (hw *horizontalReshardingWorkflow) runCopySchema(ctx context.Context, t *wo
keyspace := t.Attributes["keyspace"]
sourceShard := t.Attributes["source_shard"]
destShard := t.Attributes["destination_shard"]
return hw.wr.CopySchemaShardFromShard(ctx, nil /* tableArray*/, nil /* excludeTableArray */, true, /*includeViews*/
excludeTables := strings.Split(t.Attributes["exclude_tables"], ",")
return hw.wr.CopySchemaShardFromShard(ctx, nil /* tableArray*/, excludeTables /* excludeTableArray */, true, /*includeViews*/
keyspace, sourceShard, keyspace, destShard, wrangler.DefaultWaitSlaveTimeout)
}

Expand All @@ -74,6 +75,7 @@ func (hw *horizontalReshardingWorkflow) runSplitClone(ctx context.Context, t *wo
useConsistentSnapshot := t.Attributes["use_consistent_snapshot"]

sourceKeyspaceShard := topoproto.KeyspaceShardString(keyspace, sourceShard)
excludeTables := t.Attributes["exclude_tables"]
// Reset the vtworker to avoid error if vtworker command has been called elsewhere.
// This is because vtworker class doesn't cleanup the environment after execution.
if _, err := automation.ExecuteVtworker(ctx, worker, []string{"Reset"}); err != nil {
Expand All @@ -84,6 +86,11 @@ func (hw *horizontalReshardingWorkflow) runSplitClone(ctx context.Context, t *wo
if useConsistentSnapshot != "" {
args = append(args, "--use_consistent_snapshot")
}

if excludeTables != "" {
args = append(args, "--exclude_tables="+excludeTables)
}

_, err := automation.ExecuteVtworker(hw.ctx, worker, args)
return err
}
Expand All @@ -100,6 +107,7 @@ func (hw *horizontalReshardingWorkflow) runSplitDiff(ctx context.Context, t *wor
destinationTabletType := t.Attributes["dest_tablet_type"]
worker := t.Attributes["vtworker"]
useConsistentSnapshot := t.Attributes["use_consistent_snapshot"]
excludeTables := t.Attributes["exclude_tables"]

if _, err := automation.ExecuteVtworker(hw.ctx, worker, []string{"Reset"}); err != nil {
return err
Expand All @@ -108,6 +116,11 @@ func (hw *horizontalReshardingWorkflow) runSplitDiff(ctx context.Context, t *wor
if useConsistentSnapshot != "" {
args = append(args, "--use_consistent_snapshot")
}

if excludeTables != "" {
args = append(args, "--exclude_tables="+excludeTables)
}

_, err := automation.ExecuteVtworker(ctx, worker, args)
return err
}
Expand Down
9 changes: 7 additions & 2 deletions go/vt/workflow/resharding/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string)
subFlags := flag.NewFlagSet(horizontalReshardingFactoryName, flag.ContinueOnError)
keyspace := subFlags.String("keyspace", "", "Name of keyspace to perform horizontal resharding")
vtworkersStr := subFlags.String("vtworkers", "", "A comma-separated list of vtworker addresses")
excludeTablesStr := subFlags.String("exclude_tables", "", "A comma-separated list of tables to exclude")
sourceShardsStr := subFlags.String("source_shards", "", "A comma-separated list of source shards")
destinationShardsStr := subFlags.String("destination_shards", "", "A comma-separated list of destination shards")
minHealthyRdonlyTablets := subFlags.String("min_healthy_rdonly_tablets", "1", "Minimum number of healthy RDONLY tablets required in source shards")
Expand All @@ -85,6 +86,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string)
}

vtworkers := strings.Split(*vtworkersStr, ",")
excludeTables := strings.Split(*excludeTablesStr, ",")
sourceShards := strings.Split(*sourceShardsStr, ",")
destinationShards := strings.Split(*destinationShardsStr, ",")
phaseEnableApprovals := parsePhaseEnableApprovals(*phaseEnableApprovalsStr)
Expand All @@ -110,7 +112,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string)
}

w.Name = fmt.Sprintf("Reshard shards %v into shards %v of keyspace %v.", *keyspace, *sourceShardsStr, *destinationShardsStr)
checkpoint, err := initCheckpoint(*keyspace, vtworkers, sourceShards, destinationShards, *minHealthyRdonlyTablets, *splitCmd, *splitDiffDestTabletType, useConsistentSnapshotArg)
checkpoint, err := initCheckpoint(*keyspace, vtworkers, excludeTables, sourceShards, destinationShards, *minHealthyRdonlyTablets, *splitCmd, *splitDiffDestTabletType, useConsistentSnapshotArg)
if err != nil {
return err
}
Expand Down Expand Up @@ -269,13 +271,14 @@ func validateWorkflow(m *workflow.Manager, keyspace string, vtworkers, sourceSha
}

// initCheckpoint initialize the checkpoint for the horizontal workflow.
func initCheckpoint(keyspace string, vtworkers, sourceShards, destinationShards []string, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType string, useConsistentSnapshot string) (*workflowpb.WorkflowCheckpoint, error) {
func initCheckpoint(keyspace string, vtworkers, excludeTables, sourceShards, destinationShards []string, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType string, useConsistentSnapshot string) (*workflowpb.WorkflowCheckpoint, error) {
tasks := make(map[string]*workflowpb.Task)
initTasks(tasks, phaseCopySchema, destinationShards, func(i int, shard string) map[string]string {
return map[string]string{
"keyspace": keyspace,
"source_shard": sourceShards[0],
"destination_shard": shard,
"exclude_tables": strings.Join(excludeTables, ","),
}
})
initTasks(tasks, phaseClone, sourceShards, func(i int, shard string) map[string]string {
Expand All @@ -286,6 +289,7 @@ func initCheckpoint(keyspace string, vtworkers, sourceShards, destinationShards
"split_cmd": splitCmd,
"vtworker": vtworkers[i],
"use_consistent_snapshot": useConsistentSnapshot,
"exclude_tables": strings.Join(excludeTables, ","),
}
})
initTasks(tasks, phaseWaitForFilteredReplication, destinationShards, func(i int, shard string) map[string]string {
Expand All @@ -301,6 +305,7 @@ func initCheckpoint(keyspace string, vtworkers, sourceShards, destinationShards
"dest_tablet_type": splitDiffDestTabletType,
"vtworker": vtworkers[i],
"use_consistent_snapshot": useConsistentSnapshot,
"exclude_tables": strings.Join(excludeTables, ","),
}
})
initTasks(tasks, phaseMigrateRdonly, sourceShards, func(i int, shard string) map[string]string {
Expand Down
40 changes: 27 additions & 13 deletions go/vt/workflow/resharding/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestSourceDestShards(t *testing.T) {
defer ctrl.Finish()

// Set up the fakeworkerclient. It is used at SplitClone and SplitDiff phase.
fakeVtworkerClient := setupFakeVtworker(testKeyspace, testVtworkers, false)
fakeVtworkerClient := setupFakeVtworker(testKeyspace, testVtworkers, false, "")
vtworkerclient.RegisterFactory("fake", fakeVtworkerClient.FakeVtworkerClientFactory)
defer vtworkerclient.UnregisterFactoryForTest("fake")

Expand Down Expand Up @@ -90,23 +90,28 @@ func TestSourceDestShards(t *testing.T) {

// TestHorizontalResharding runs the happy path of HorizontalReshardingWorkflow.
func TestHorizontalResharding(t *testing.T) {
testHorizontalReshardingWorkflow(t, false)
testHorizontalReshardingWorkflow(t, false, "")
}

// TestHorizontalReshardingWithConsistentSnapshot runs the happy path of HorizontalReshardingWorkflow with consistent snapshot.
func TestHorizontalReshardingWithConsistentSnapshot(t *testing.T) {
testHorizontalReshardingWorkflow(t, true)
testHorizontalReshardingWorkflow(t, true, "")
}

func testHorizontalReshardingWorkflow(t *testing.T, useConsistentSnapshot bool) {
// TestHorizontalReshardingWithExcludedTables runs the happy path of HorizontalReshardingWorkflow with excluded tables.
func TestHorizontalReshardingWithExcludedTables(t *testing.T) {
testHorizontalReshardingWorkflow(t, true, "table_a,table_b")
}

func testHorizontalReshardingWorkflow(t *testing.T, useConsistentSnapshot bool, excludeTables string) {
ctx := context.Background()
// Set up the mock wrangler. It is used for the CopySchema,
// WaitforFilteredReplication and Migrate phase.
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockWranglerInterface := setupMockWrangler(ctrl, testKeyspace)
// Set up the fakeworkerclient. It is used at SplitClone and SplitDiff phase.
fakeVtworkerClient := setupFakeVtworker(testKeyspace, testVtworkers, useConsistentSnapshot)
fakeVtworkerClient := setupFakeVtworker(testKeyspace, testVtworkers, useConsistentSnapshot, excludeTables)
vtworkerclient.RegisterFactory("fake", fakeVtworkerClient.FakeVtworkerClientFactory)
defer vtworkerclient.UnregisterFactoryForTest("fake")
// Initialize the topology.
Expand All @@ -120,6 +125,9 @@ func testHorizontalReshardingWorkflow(t *testing.T, useConsistentSnapshot bool)
if useConsistentSnapshot {
args = append(args, "-use_consistent_snapshot")
}
if excludeTables != "" {
args = append(args, "-exclude_tables="+excludeTables)
}
uuid, err := m.Create(ctx, horizontalReshardingFactoryName, args)
if err != nil {
t.Fatalf("cannot create resharding workflow: %v", err)
Expand Down Expand Up @@ -148,43 +156,49 @@ func testHorizontalReshardingWorkflow(t *testing.T, useConsistentSnapshot bool)
wg.Wait()
}

func setupFakeVtworker(keyspace, vtworkers string, useConsistentSnapshot bool) *fakevtworkerclient.FakeVtworkerClient {
func setupFakeVtworker(keyspace, vtworkers string, useConsistentSnapshot bool, excludeTables string) *fakevtworkerclient.FakeVtworkerClient {
flag.Set("vtworker_client_protocol", "fake")
fakeVtworkerClient := fakevtworkerclient.NewFakeVtworkerClient()
fakeVtworkerClient.RegisterResultForAddr(vtworkers, resetCommand(), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitCloneCommand(keyspace, useConsistentSnapshot), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitCloneCommand(keyspace, useConsistentSnapshot, excludeTables), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, resetCommand(), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "-80", useConsistentSnapshot), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "-80", useConsistentSnapshot, excludeTables), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, resetCommand(), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "80-", useConsistentSnapshot), "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, splitDiffCommand(keyspace, "80-", useConsistentSnapshot, excludeTables), "", nil)
return fakeVtworkerClient
}

func resetCommand() []string {
return []string{"Reset"}
}

func splitCloneCommand(keyspace string, useConsistentSnapshot bool) []string {
func splitCloneCommand(keyspace string, useConsistentSnapshot bool, excludeTables string) []string {
args := []string{"SplitClone", "--min_healthy_rdonly_tablets=2", keyspace + "/0"}
if useConsistentSnapshot {
args = append(args, "--use_consistent_snapshot")
}
if excludeTables != "" {
args = append(args, "--exclude_tables="+excludeTables)
}
return args
}

func splitDiffCommand(keyspace string, shardId string, useConsistentSnapshot bool) []string {
func splitDiffCommand(keyspace string, shardId string, useConsistentSnapshot bool, excludeTables string) []string {
args := []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", "--dest_tablet_type=RDONLY", keyspace + "/" + shardId}
if useConsistentSnapshot {
args = append(args, "--use_consistent_snapshot")
}
if excludeTables != "" {
args = append(args, "--exclude_tables="+excludeTables)
}
return args
}

func setupMockWrangler(ctrl *gomock.Controller, keyspace string) *MockReshardingWrangler {
mockWranglerInterface := NewMockReshardingWrangler(ctrl)
// Set the expected behaviors for mock wrangler.
mockWranglerInterface.EXPECT().CopySchemaShardFromShard(gomock.Any(), nil /* tableArray*/, nil /* excludeTableArray */, true /*includeViews*/, keyspace, "0", keyspace, "-80", wrangler.DefaultWaitSlaveTimeout).Return(nil)
mockWranglerInterface.EXPECT().CopySchemaShardFromShard(gomock.Any(), nil /* tableArray*/, nil /* excludeTableArray */, true /*includeViews*/, keyspace, "0", keyspace, "80-", wrangler.DefaultWaitSlaveTimeout).Return(nil)
mockWranglerInterface.EXPECT().CopySchemaShardFromShard(gomock.Any(), nil /* tableArray*/, gomock.Any() /* excludeTableArray */, true /*includeViews*/, keyspace, "0", keyspace, "-80", wrangler.DefaultWaitSlaveTimeout).Return(nil)
mockWranglerInterface.EXPECT().CopySchemaShardFromShard(gomock.Any(), nil /* tableArray*/, gomock.Any() /* excludeTableArray */, true /*includeViews*/, keyspace, "0", keyspace, "80-", wrangler.DefaultWaitSlaveTimeout).Return(nil)

mockWranglerInterface.EXPECT().WaitForFilteredReplication(gomock.Any(), keyspace, "-80", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil)
mockWranglerInterface.EXPECT().WaitForFilteredReplication(gomock.Any(), keyspace, "80-", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil)
Expand Down
7 changes: 6 additions & 1 deletion go/vt/workflow/reshardingworkflowgen/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string)
subFlags := flag.NewFlagSet(keyspaceReshardingFactoryName, flag.ContinueOnError)
keyspace := subFlags.String("keyspace", "", "Name of keyspace to perform horizontal resharding")
vtworkersStr := subFlags.String("vtworkers", "", "A comma-separated list of vtworker addresses")
excludeTablesStr := subFlags.String("exclude_tables", "", "A comma-separated list of tables to exclude")
minHealthyRdonlyTablets := subFlags.String("min_healthy_rdonly_tablets", "1", "Minimum number of healthy RDONLY tablets required in source shards")
splitCmd := subFlags.String("split_cmd", "SplitClone", "Split command to use to perform horizontal resharding (either SplitClone or LegacySplitClone)")
splitDiffDestTabletType := subFlags.String("split_diff_dest_tablet_type", "RDONLY", "Specifies tablet type to use in destination shards while performing SplitDiff operation")
Expand All @@ -74,6 +75,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string)
}

vtworkers := strings.Split(*vtworkersStr, ",")
excludeTables := strings.Split(*excludeTablesStr, ",")

w.Name = fmt.Sprintf("Keyspace reshard on %s", *keyspace)
shardsToSplit, err := findSourceAndDestinationShards(m.TopoServer(), *keyspace)
Expand All @@ -84,6 +86,7 @@ func (*Factory) Init(m *workflow.Manager, w *workflowpb.Workflow, args []string)
checkpoint, err := initCheckpoint(
*keyspace,
vtworkers,
excludeTables,
shardsToSplit,
*minHealthyRdonlyTablets,
*splitCmd,
Expand Down Expand Up @@ -191,7 +194,7 @@ func findSourceAndDestinationShards(ts *topo.Server, keyspace string) ([][][]str
}

// initCheckpoint initialize the checkpoint for keyspace reshard
func initCheckpoint(keyspace string, vtworkers []string, shardsToSplit [][][]string, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType, phaseEnableApprovals string, skipStartWorkflows bool, useConsistentSnapshot bool) (*workflowpb.WorkflowCheckpoint, error) {
func initCheckpoint(keyspace string, vtworkers, excludeTables []string, shardsToSplit [][][]string, minHealthyRdonlyTablets, splitCmd, splitDiffDestTabletType, phaseEnableApprovals string, skipStartWorkflows bool, useConsistentSnapshot bool) (*workflowpb.WorkflowCheckpoint, error) {
sourceShards := 0
destShards := 0
for _, shardToSplit := range shardsToSplit {
Expand Down Expand Up @@ -238,6 +241,7 @@ func initCheckpoint(keyspace string, vtworkers []string, shardsToSplit [][][]str
"workflows_count": fmt.Sprintf("%v", len(shardsToSplit)),
"keyspace": keyspace,
"use_consistent_snapshot": fmt.Sprintf("%v", useConsistentSnapshot),
"exclude_tables": fmt.Sprintf("%v", strings.Join(excludeTables, ",")),
},
}, nil
}
Expand Down Expand Up @@ -301,6 +305,7 @@ func (hw *reshardingWorkflowGen) workflowCreator(ctx context.Context, task *work
horizontalReshardingParams := []string{
"-keyspace=" + hw.keyspaceParam,
"-vtworkers=" + task.Attributes["vtworkers"],
"-exclude_tables=" + task.Attributes["exclude_tables"],
"-split_cmd=" + hw.splitCmdParam,
"-split_diff_dest_tablet_type=" + hw.splitDiffDestTabletTypeParam,
"-min_healthy_rdonly_tablets=" + hw.minHealthyRdonlyTabletsParam,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestWorfklowGenerator(t *testing.T) {
workflow.StartManager(m)
// Create the workflow.
vtworkersParameter := testVtworkers + "," + testVtworkers
uuid, err := m.Create(ctx, keyspaceReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-min_healthy_rdonly_tablets=2", "-use_consistent_snapshot=true"})
uuid, err := m.Create(ctx, keyspaceReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + vtworkersParameter, "-min_healthy_rdonly_tablets=2", "-use_consistent_snapshot=true", "-exclude_tables=table_a,table_b"})
if err != nil {
t.Fatalf("cannot create resharding workflow: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion web/vtctld2/app/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@
</head>
<body class="flex-column">
<vt-app-root class="flex-column flex-grow">Loading...</vt-app-root>
<script type="text/javascript" src="inline.js"></script><script type="text/javascript" src="styles.38b88af69dfd283498eb.bundle.js"></script><script type="text/javascript" src="main.f08922949ce1705e18fe.bundle.js"></script></body>
<script type="text/javascript" src="inline.js"></script><script type="text/javascript" src="styles.38b88af69dfd283498eb.bundle.js"></script><script type="text/javascript" src="main.38a1a560f8f628e31552.bundle.js"></script></body>
</html>
Loading

0 comments on commit 0459a0c

Please sign in to comment.