From 0426aed3e4fc1a4e5d0f8136f841995e58f3cc8f Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 1 Jan 2021 12:55:02 +0100 Subject: [PATCH 1/4] Delete routing rules on workflow Complete Signed-off-by: Rohit Nayak --- .../vreplication/vreplication_test.go | 8 ++-- .../vreplication/vreplication_test_env.go | 4 +- go/vt/wrangler/materializer.go | 1 + go/vt/wrangler/switcher.go | 4 ++ go/vt/wrangler/switcher_dry_run.go | 5 +++ go/vt/wrangler/switcher_interface.go | 2 + go/vt/wrangler/traffic_switcher.go | 27 ++++++++++++ go/vt/wrangler/traffic_switcher_test.go | 2 + go/vt/wrangler/workflow_test.go | 43 +++++++++++++++++++ 9 files changed, 91 insertions(+), 5 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index adfdd93730e..5aef52fc141 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -319,7 +319,7 @@ func reshardCustomer2to4Split(t *testing.T, cells []*Cell, sourceCellOrAlias str func reshardMerchant2to3SplitMerge(t *testing.T) { ksName := "merchant" counts := map[string]int{"zone1-1600": 0, "zone1-1700": 2, "zone1-1800": 0} - reshard(t, ksName, "merchant", "m2m3", "-80,80-", "-40,40-c0,c0-", 1600, counts, dryrunresultsswitchwritesM2m3, nil, "") + reshard(t, ksName, "merchant", "m2m3", "-80,80-", "-40,40-c0,c0-", 1600, counts, dryRunResultsSwitchWritesM2m3, nil, "") validateCount(t, vtgateConn, ksName, "merchant", 2) query := "insert into merchant (mname, category) values('amazon', 'electronics')" execVtgateQuery(t, vtgateConn, ksName, query) @@ -381,7 +381,7 @@ func reshardCustomer3to1Merge(t *testing.T) { //to unsharded reshard(t, ksName, "customer", "c3c1", "-60,60-c0,c0-", "0", 1500, counts, nil, nil, "") } -func reshard(t *testing.T, ksName string, tableName string, workflow string, sourceShards string, targetShards string, tabletIDBase int, counts map[string]int, dryRunResultswitchWrites []string, cells []*Cell, sourceCellOrAlias string) { +func reshard(t *testing.T, ksName string, tableName string, workflow string, sourceShards string, targetShards string, tabletIDBase int, counts map[string]int, dryRunResultSwitchWrites []string, cells []*Cell, sourceCellOrAlias string) { if cells == nil { cells = []*Cell{defaultCell} } @@ -414,8 +414,8 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou } vdiff(t, ksWorkflow) switchReads(t, allCellNames, ksWorkflow) - if dryRunResultswitchWrites != nil { - switchWritesDryRun(t, ksWorkflow, dryRunResultswitchWrites) + if dryRunResultSwitchWrites != nil { + switchWritesDryRun(t, ksWorkflow, dryRunResultSwitchWrites) } switchWrites(t, ksWorkflow, false) dropSources(t, ksWorkflow) diff --git a/go/test/endtoend/vreplication/vreplication_test_env.go b/go/test/endtoend/vreplication/vreplication_test_env.go index 5d9f5951036..4d2b3ec3065 100644 --- a/go/test/endtoend/vreplication/vreplication_test_env.go +++ b/go/test/endtoend/vreplication/vreplication_test_env.go @@ -54,7 +54,7 @@ var dryRunResultsReadCustomerShard = []string{ "Unlock keyspace product", } -var dryrunresultsswitchwritesM2m3 = []string{ +var dryRunResultsSwitchWritesM2m3 = []string{ "Lock keyspace merchant", "Stop streams on keyspace merchant", "/ Id 2 Keyspace customer Shard -80 Rules rules: at Position ", @@ -106,6 +106,7 @@ var dryRunResultsDropSourcesDropCustomerShard = []string{ "Delete vreplication streams on target:", " Keyspace customer Shard -80 Workflow p2c DbName vt_customer Tablet 200", " Keyspace customer Shard 80- Workflow p2c DbName vt_customer Tablet 300", + "Routing rules for participating tables will be deleted", "Unlock keyspace customer", "Unlock keyspace product", } @@ -122,6 +123,7 @@ var dryRunResultsDropSourcesRenameCustomerShard = []string{ "Delete vreplication streams on target:", " Keyspace customer Shard -80 Workflow p2c DbName vt_customer Tablet 200", " Keyspace customer Shard 80- Workflow p2c DbName vt_customer Tablet 300", + "Routing rules for participating tables will be deleted", "Unlock keyspace customer", "Unlock keyspace product", } diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 9a19c918b16..1458a716b43 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -162,6 +162,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta rules[targetKeyspace+"."+table] = toSource rules[targetKeyspace+"."+table+"@replica"] = toSource rules[targetKeyspace+"."+table+"@rdonly"] = toSource + rules[targetKeyspace+"."+table] = toSource rules[sourceKeyspace+"."+table+"@replica"] = toSource rules[sourceKeyspace+"."+table+"@rdonly"] = toSource } diff --git a/go/vt/wrangler/switcher.go b/go/vt/wrangler/switcher.go index a44909146d0..1f895dd583e 100644 --- a/go/vt/wrangler/switcher.go +++ b/go/vt/wrangler/switcher.go @@ -31,6 +31,10 @@ type switcher struct { wr *Wrangler } +func (r *switcher) deleteRoutingRules(ctx context.Context) error { + return r.ts.deleteRoutingRules(ctx) +} + func (r *switcher) dropSourceBlacklistedTables(ctx context.Context) error { return r.ts.dropSourceBlacklistedTables(ctx) } diff --git a/go/vt/wrangler/switcher_dry_run.go b/go/vt/wrangler/switcher_dry_run.go index a2d37c65447..9820eef2945 100644 --- a/go/vt/wrangler/switcher_dry_run.go +++ b/go/vt/wrangler/switcher_dry_run.go @@ -37,6 +37,11 @@ type switcherDryRun struct { ts *trafficSwitcher } +func (dr *switcherDryRun) deleteRoutingRules(ctx context.Context) error { + dr.drLog.Log("Routing rules for participating tables will be deleted") + return nil +} + func (dr *switcherDryRun) switchShardReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error { sourceShards := make([]string, 0) targetShards := make([]string, 0) diff --git a/go/vt/wrangler/switcher_interface.go b/go/vt/wrangler/switcher_interface.go index 6b532ce8425..71b6104d3c1 100644 --- a/go/vt/wrangler/switcher_interface.go +++ b/go/vt/wrangler/switcher_interface.go @@ -48,5 +48,7 @@ type iswitcher interface { dropTargetVReplicationStreams(ctx context.Context) error removeTargetTables(ctx context.Context) error dropTargetShards(ctx context.Context) error + deleteRoutingRules(ctx context.Context) error + logs() *[]string } diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 6d909f535f7..923d61d50bf 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -627,6 +627,10 @@ func (wr *Wrangler) dropArtifacts(ctx context.Context, sw iswitcher) error { if err := sw.dropTargetVReplicationStreams(ctx); err != nil { return err } + if err := sw.deleteRoutingRules(ctx); err != nil { + return err + } + return nil } @@ -676,6 +680,7 @@ func (wr *Wrangler) DropSources(ctx context.Context, targetKeyspace, workflow st if err := sw.dropSourceBlacklistedTables(ctx); err != nil { return nil, err } + case binlogdatapb.MigrationType_SHARDS: log.Infof("Removing shards") if err := sw.dropSourceShards(ctx); err != nil { @@ -1575,6 +1580,28 @@ func (ts *trafficSwitcher) dropTargetShards(ctx context.Context) error { }) } +func (ts *trafficSwitcher) deleteRoutingRules(ctx context.Context) error { + rules, err := ts.wr.getRoutingRules(ctx) + if err != nil { + return err + } + for _, table := range ts.tables { + delete(rules, table) + delete(rules, table+"@replica") + delete(rules, table+"@rdonly") + delete(rules, ts.targetKeyspace+"."+table) + delete(rules, ts.targetKeyspace+"."+table+"@replica") + delete(rules, ts.targetKeyspace+"."+table+"@rdonly") + delete(rules, ts.sourceKeyspace+"."+table) + delete(rules, ts.sourceKeyspace+"."+table+"@replica") + delete(rules, ts.sourceKeyspace+"."+table+"@rdonly") + } + if err := ts.wr.saveRoutingRules(ctx, rules); err != nil { + return err + } + return nil +} + func (wr *Wrangler) getRoutingRules(ctx context.Context) (map[string][]string, error) { rrs, err := wr.ts.GetRoutingRules(ctx) if err != nil { diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go index 043d08230b6..49f412aa1c7 100644 --- a/go/vt/wrangler/traffic_switcher_test.go +++ b/go/vt/wrangler/traffic_switcher_test.go @@ -868,6 +868,7 @@ func TestTableMigrateOneToMany(t *testing.T) { "Delete vreplication streams on target:", " Keyspace ks2 Shard -80 Workflow test DbName vt_ks2 Tablet 20", " Keyspace ks2 Shard 80- Workflow test DbName vt_ks2 Tablet 30", + "Routing rules for participating tables will be deleted", "Unlock keyspace ks2", "Unlock keyspace ks1", } @@ -894,6 +895,7 @@ func TestTableMigrateOneToMany(t *testing.T) { "Delete vreplication streams on target:", " Keyspace ks2 Shard -80 Workflow test DbName vt_ks2 Tablet 20", " Keyspace ks2 Shard 80- Workflow test DbName vt_ks2 Tablet 30", + "Routing rules for participating tables will be deleted", "Unlock keyspace ks2", "Unlock keyspace ks1", } diff --git a/go/vt/wrangler/workflow_test.go b/go/vt/wrangler/workflow_test.go index 71cd2d9133f..0798e9d19c0 100644 --- a/go/vt/wrangler/workflow_test.go +++ b/go/vt/wrangler/workflow_test.go @@ -17,8 +17,11 @@ limitations under the License. package wrangler import ( + "fmt" "testing" + "vitess.io/vitess/go/vt/topo" + "github.com/stretchr/testify/require" "golang.org/x/net/context" "vitess.io/vitess/go/sqltypes" @@ -170,6 +173,46 @@ func TestMoveTablesV2(t *testing.T) { require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState()) } +func validateRoutingRuleCount(ctx context.Context, t *testing.T, ts *topo.Server, cnt int) { + rr, err := ts.GetRoutingRules(ctx) + fmt.Printf("Rules %+v\n", rr.Rules) + require.NoError(t, err) + require.NotNil(t, rr) + rules := rr.Rules + require.Equal(t, cnt, len(rules)) +} + +func TestMoveTablesV2Complete(t *testing.T) { + ctx := context.Background() + p := &VReplicationWorkflowParams{ + Workflow: "test", + SourceKeyspace: "ks1", + TargetKeyspace: "ks2", + Tables: "t1,t2", + Cells: "cell1,cell2", + TabletTypes: "replica,rdonly,master", + Timeout: DefaultActionTimeout, + } + tme := newTestTableMigrater(ctx, t) + defer tme.stopTablets(t) + wf, err := tme.wr.NewVReplicationWorkflow(ctx, MoveTablesWorkflow, p) + require.NoError(t, err) + require.NotNil(t, wf) + require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState()) + tme.expectNoPreviousJournals() + expectMoveTablesQueries(t, tme) + tme.expectNoPreviousJournals() + require.NoError(t, wf.SwitchTraffic(DirectionForward)) + require.Equal(t, WorkflowStateAllSwitched, wf.CurrentState()) + + //16 rules, 8 per table t1,t2 eg: t1,t1@replica,t1@rdonly,ks1.t1,ks1.t1@replica,ks1.t1@rdonly,ks2.t1@replica,ks2.t1@rdonly + validateRoutingRuleCount(ctx, t, wf.wr.ts, 16) + + require.NoError(t, wf.Complete()) + + validateRoutingRuleCount(ctx, t, wf.wr.ts, 0) +} + func TestMoveTablesV2Partial(t *testing.T) { ctx := context.Background() p := &VReplicationWorkflowParams{ From 7d2e87f03e03948975a1219734ffbd00552b1e0f Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 1 Jan 2021 13:02:47 +0100 Subject: [PATCH 2/4] Add test for Abort Signed-off-by: Rohit Nayak --- go/vt/wrangler/workflow_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go/vt/wrangler/workflow_test.go b/go/vt/wrangler/workflow_test.go index 0798e9d19c0..f2ee1c3df1f 100644 --- a/go/vt/wrangler/workflow_test.go +++ b/go/vt/wrangler/workflow_test.go @@ -289,7 +289,10 @@ func TestMoveTablesV2Abort(t *testing.T) { require.NotNil(t, wf) require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState()) expectMoveTablesQueries(t, tme) + validateRoutingRuleCount(ctx, t, wf.wr.ts, 4) // rules set up by test env + require.NoError(t, wf.Abort()) + validateRoutingRuleCount(ctx, t, wf.wr.ts, 0) } func TestReshardV2(t *testing.T) { From 39887a04b607e51f7ce1e41dcb3db9ee9dc9d01c Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 1 Jan 2021 16:38:22 +0100 Subject: [PATCH 3/4] Remove source tables from vschema on workflow Complete Signed-off-by: Rohit Nayak --- .../vreplication/vreplication_test_env.go | 4 ++-- go/vt/wrangler/switcher_dry_run.go | 6 ++++-- go/vt/wrangler/traffic_switcher.go | 19 ++++++++++++++++--- go/vt/wrangler/traffic_switcher_test.go | 4 ++-- go/vt/wrangler/workflow_test.go | 17 ++++++++++++++++- 5 files changed, 40 insertions(+), 10 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test_env.go b/go/test/endtoend/vreplication/vreplication_test_env.go index 4d2b3ec3065..f004142c2e2 100644 --- a/go/test/endtoend/vreplication/vreplication_test_env.go +++ b/go/test/endtoend/vreplication/vreplication_test_env.go @@ -97,7 +97,7 @@ var dryRunResultsSwitchWritesM2m3 = []string{ var dryRunResultsDropSourcesDropCustomerShard = []string{ "Lock keyspace product", "Lock keyspace customer", - "Dropping following tables:", + "Dropping following tables from the database and from the vschema for keyspace product:", " Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer RemovalType DROP TABLE", "Blacklisted tables customer will be removed from:", " Keyspace product Shard 0 Tablet 100", @@ -114,7 +114,7 @@ var dryRunResultsDropSourcesDropCustomerShard = []string{ var dryRunResultsDropSourcesRenameCustomerShard = []string{ "Lock keyspace product", "Lock keyspace customer", - "Dropping following tables:", + "Dropping following tables from the database and from the vschema for keyspace product:", " Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer RemovalType RENAME TABLE", "Blacklisted tables customer will be removed from:", " Keyspace product Shard 0 Tablet 100", diff --git a/go/vt/wrangler/switcher_dry_run.go b/go/vt/wrangler/switcher_dry_run.go index 9820eef2945..cb34d95c8ba 100644 --- a/go/vt/wrangler/switcher_dry_run.go +++ b/go/vt/wrangler/switcher_dry_run.go @@ -235,11 +235,13 @@ func (dr *switcherDryRun) removeSourceTables(ctx context.Context, removalType Ta for _, source := range dr.ts.sources { for _, tableName := range dr.ts.tables { logs = append(logs, fmt.Sprintf("\tKeyspace %s Shard %s DbName %s Tablet %d Table %s RemovalType %s", - source.master.Keyspace, source.master.Shard, source.master.DbName(), source.master.Alias.Uid, tableName, TableRemovalType(removalType))) + source.master.Keyspace, source.master.Shard, source.master.DbName(), source.master.Alias.Uid, tableName, + removalType)) } } if len(logs) > 0 { - dr.drLog.Log("Dropping following tables:") + dr.drLog.Log(fmt.Sprintf("Dropping following tables from the database and from the vschema for keyspace %s:", + dr.ts.sourceKeyspace)) dr.drLog.LogSlice(logs) } return nil diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 923d61d50bf..14668b4d1ef 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -567,7 +567,7 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflow s return ts.id, sw.logs(), nil } -// DropTargets cleans up target tables, shards and blacklisted tables after a MoveTables/Reshard is completed +// DropTargets cleans up target tables, shards and blacklisted tables if a MoveTables/Reshard is aborted func (wr *Wrangler) DropTargets(ctx context.Context, targetKeyspace, workflow string, keepData, dryRun bool) (*[]string, error) { ts, err := wr.buildTrafficSwitcher(ctx, targetKeyspace, workflow) if err != nil { @@ -1479,8 +1479,9 @@ func doValidateWorkflowHasCompleted(ctx context.Context, ts *trafficSwitcher) er } -func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType TableRemovalType) error { - return ts.forAllSources(func(source *tsSource) error { +func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType TableRemovalType) (err error) { + var vschema *vschemapb.Keyspace + err = ts.forAllSources(func(source *tsSource) error { for _, tableName := range ts.tables { query := fmt.Sprintf("drop table %s.%s", source.master.DbName(), tableName) if removalType == DropTable { @@ -1500,6 +1501,18 @@ func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType T } return nil }) + if err != nil { + return err + } + + vschema, err = ts.wr.ts.GetVSchema(ctx, ts.sourceKeyspace) + if err != nil { + return err + } + for _, tableName := range ts.tables { + delete(vschema.Tables, tableName) + } + return ts.wr.ts.SaveVSchema(ctx, ts.sourceKeyspace, vschema) } // FIXME: even after dropSourceShards there are still entries in the topo, need to research and fix diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go index 49f412aa1c7..6a2747788fa 100644 --- a/go/vt/wrangler/traffic_switcher_test.go +++ b/go/vt/wrangler/traffic_switcher_test.go @@ -858,7 +858,7 @@ func TestTableMigrateOneToMany(t *testing.T) { wantdryRunDropSources := []string{ "Lock keyspace ks1", "Lock keyspace ks2", - "Dropping following tables:", + "Dropping following tables from the database and from the vschema for keyspace ks1:", " Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t1 RemovalType DROP TABLE", " Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t2 RemovalType DROP TABLE", "Blacklisted tables t1,t2 will be removed from:", @@ -885,7 +885,7 @@ func TestTableMigrateOneToMany(t *testing.T) { wantdryRunRenameSources := []string{ "Lock keyspace ks1", "Lock keyspace ks2", - "Dropping following tables:", + "Dropping following tables from the database and from the vschema for keyspace ks1:", " Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t1 RemovalType RENAME TABLE", " Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t2 RemovalType RENAME TABLE", "Blacklisted tables t1,t2 will be removed from:", diff --git a/go/vt/wrangler/workflow_test.go b/go/vt/wrangler/workflow_test.go index f2ee1c3df1f..ab1c6740f78 100644 --- a/go/vt/wrangler/workflow_test.go +++ b/go/vt/wrangler/workflow_test.go @@ -182,6 +182,14 @@ func validateRoutingRuleCount(ctx context.Context, t *testing.T, ts *topo.Server require.Equal(t, cnt, len(rules)) } +func checkIfTableExistInVSchema(ctx context.Context, t *testing.T, ts *topo.Server, keyspace string, table string) bool { + vschema, err := ts.GetVSchema(ctx, keyspace) + require.NoError(t, err) + require.NotNil(t, vschema) + _, ok := vschema.Tables[table] + return ok +} + func TestMoveTablesV2Complete(t *testing.T) { ctx := context.Background() p := &VReplicationWorkflowParams{ @@ -207,8 +215,15 @@ func TestMoveTablesV2Complete(t *testing.T) { //16 rules, 8 per table t1,t2 eg: t1,t1@replica,t1@rdonly,ks1.t1,ks1.t1@replica,ks1.t1@rdonly,ks2.t1@replica,ks2.t1@rdonly validateRoutingRuleCount(ctx, t, wf.wr.ts, 16) - + require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks1", "t1")) + require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks1", "t2")) + require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks2", "t1")) + require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks2", "t2")) require.NoError(t, wf.Complete()) + require.False(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks1", "t1")) + require.False(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks1", "t2")) + require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks2", "t1")) + require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks2", "t2")) validateRoutingRuleCount(ctx, t, wf.wr.ts, 0) } From 6f9acd34ac956df2ab3d6588377fb0ff07bea4ba Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 1 Jan 2021 19:07:57 +0100 Subject: [PATCH 4/4] Update target vschema on MoveTables Abort Signed-off-by: Rohit Nayak --- go/vt/wrangler/switcher_dry_run.go | 3 ++- go/vt/wrangler/traffic_switcher.go | 17 +++++++++++++---- go/vt/wrangler/workflow_test.go | 11 +++++++++++ 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/go/vt/wrangler/switcher_dry_run.go b/go/vt/wrangler/switcher_dry_run.go index cb34d95c8ba..e74bbaf9ca0 100644 --- a/go/vt/wrangler/switcher_dry_run.go +++ b/go/vt/wrangler/switcher_dry_run.go @@ -335,7 +335,8 @@ func (dr *switcherDryRun) removeTargetTables(ctx context.Context) error { } } if len(logs) > 0 { - dr.drLog.Log("Dropping following tables:") + dr.drLog.Log(fmt.Sprintf("Dropping following tables from the database and from the vschema for keyspace %s:", + dr.ts.targetKeyspace)) dr.drLog.LogSlice(logs) } return nil diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 14668b4d1ef..cffab056a67 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -1480,7 +1480,6 @@ func doValidateWorkflowHasCompleted(ctx context.Context, ts *trafficSwitcher) er } func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType TableRemovalType) (err error) { - var vschema *vschemapb.Keyspace err = ts.forAllSources(func(source *tsSource) error { for _, tableName := range ts.tables { query := fmt.Sprintf("drop table %s.%s", source.master.DbName(), tableName) @@ -1505,14 +1504,18 @@ func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType T return err } - vschema, err = ts.wr.ts.GetVSchema(ctx, ts.sourceKeyspace) + return ts.dropParticipatingTablesFromKeyspace(ctx, ts.sourceKeyspace) +} + +func (ts *trafficSwitcher) dropParticipatingTablesFromKeyspace(ctx context.Context, keyspace string) error { + vschema, err := ts.wr.ts.GetVSchema(ctx, keyspace) if err != nil { return err } for _, tableName := range ts.tables { delete(vschema.Tables, tableName) } - return ts.wr.ts.SaveVSchema(ctx, ts.sourceKeyspace, vschema) + return ts.wr.ts.SaveVSchema(ctx, keyspace, vschema) } // FIXME: even after dropSourceShards there are still entries in the topo, need to research and fix @@ -1564,7 +1567,7 @@ func (ts *trafficSwitcher) dropSourceReverseVReplicationStreams(ctx context.Cont } func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error { - return ts.forAllTargets(func(target *tsTarget) error { + err := ts.forAllTargets(func(target *tsTarget) error { for _, tableName := range ts.tables { query := fmt.Sprintf("drop table %s.%s", target.master.DbName(), tableName) ts.wr.Logger().Infof("Dropping table %s.%s\n", target.master.DbName(), tableName) @@ -1578,6 +1581,12 @@ func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error { } return nil }) + if err != nil { + return err + } + + return ts.dropParticipatingTablesFromKeyspace(ctx, ts.targetKeyspace) + } func (ts *trafficSwitcher) dropTargetShards(ctx context.Context) error { diff --git a/go/vt/wrangler/workflow_test.go b/go/vt/wrangler/workflow_test.go index ab1c6740f78..93603a2c613 100644 --- a/go/vt/wrangler/workflow_test.go +++ b/go/vt/wrangler/workflow_test.go @@ -306,8 +306,19 @@ func TestMoveTablesV2Abort(t *testing.T) { expectMoveTablesQueries(t, tme) validateRoutingRuleCount(ctx, t, wf.wr.ts, 4) // rules set up by test env + require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks1", "t1")) + require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks1", "t2")) + require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks2", "t1")) + require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks2", "t2")) + require.NoError(t, wf.Abort()) + validateRoutingRuleCount(ctx, t, wf.wr.ts, 0) + + require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks1", "t1")) + require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks1", "t2")) + require.False(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks2", "t1")) + require.False(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks2", "t2")) } func TestReshardV2(t *testing.T) {