Skip to content

Commit

Permalink
Merge pull request #7234 from planetscale/rn-wf-complete-drop-rr-upda…
Browse files Browse the repository at this point in the history
…te-vschema

MoveTables: delete routing rules and update vschema on Complete and Abort
  • Loading branch information
rohit-nayak-ps authored Jan 5, 2021
2 parents bf57b8c + 6f9acd3 commit 5b6dfb7
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 16 deletions.
8 changes: 4 additions & 4 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func reshardCustomer2to4Split(t *testing.T, cells []*Cell, sourceCellOrAlias str
func reshardMerchant2to3SplitMerge(t *testing.T) {
ksName := "merchant"
counts := map[string]int{"zone1-1600": 0, "zone1-1700": 2, "zone1-1800": 0}
reshard(t, ksName, "merchant", "m2m3", "-80,80-", "-40,40-c0,c0-", 1600, counts, dryrunresultsswitchwritesM2m3, nil, "")
reshard(t, ksName, "merchant", "m2m3", "-80,80-", "-40,40-c0,c0-", 1600, counts, dryRunResultsSwitchWritesM2m3, nil, "")
validateCount(t, vtgateConn, ksName, "merchant", 2)
query := "insert into merchant (mname, category) values('amazon', 'electronics')"
execVtgateQuery(t, vtgateConn, ksName, query)
Expand Down Expand Up @@ -381,7 +381,7 @@ func reshardCustomer3to1Merge(t *testing.T) { //to unsharded
reshard(t, ksName, "customer", "c3c1", "-60,60-c0,c0-", "0", 1500, counts, nil, nil, "")
}

func reshard(t *testing.T, ksName string, tableName string, workflow string, sourceShards string, targetShards string, tabletIDBase int, counts map[string]int, dryRunResultswitchWrites []string, cells []*Cell, sourceCellOrAlias string) {
func reshard(t *testing.T, ksName string, tableName string, workflow string, sourceShards string, targetShards string, tabletIDBase int, counts map[string]int, dryRunResultSwitchWrites []string, cells []*Cell, sourceCellOrAlias string) {
if cells == nil {
cells = []*Cell{defaultCell}
}
Expand Down Expand Up @@ -414,8 +414,8 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
}
vdiff(t, ksWorkflow)
switchReads(t, allCellNames, ksWorkflow)
if dryRunResultswitchWrites != nil {
switchWritesDryRun(t, ksWorkflow, dryRunResultswitchWrites)
if dryRunResultSwitchWrites != nil {
switchWritesDryRun(t, ksWorkflow, dryRunResultSwitchWrites)
}
switchWrites(t, ksWorkflow, false)
dropSources(t, ksWorkflow)
Expand Down
8 changes: 5 additions & 3 deletions go/test/endtoend/vreplication/vreplication_test_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var dryRunResultsReadCustomerShard = []string{
"Unlock keyspace product",
}

var dryrunresultsswitchwritesM2m3 = []string{
var dryRunResultsSwitchWritesM2m3 = []string{
"Lock keyspace merchant",
"Stop streams on keyspace merchant",
"/ Id 2 Keyspace customer Shard -80 Rules rules:<match:\"morders\" filter:\"select * from orders where in_keyrange(mname, 'merchant.md5', '-80')\" > at Position ",
Expand Down Expand Up @@ -97,7 +97,7 @@ var dryrunresultsswitchwritesM2m3 = []string{
var dryRunResultsDropSourcesDropCustomerShard = []string{
"Lock keyspace product",
"Lock keyspace customer",
"Dropping following tables:",
"Dropping following tables from the database and from the vschema for keyspace product:",
" Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer RemovalType DROP TABLE",
"Blacklisted tables customer will be removed from:",
" Keyspace product Shard 0 Tablet 100",
Expand All @@ -106,14 +106,15 @@ var dryRunResultsDropSourcesDropCustomerShard = []string{
"Delete vreplication streams on target:",
" Keyspace customer Shard -80 Workflow p2c DbName vt_customer Tablet 200",
" Keyspace customer Shard 80- Workflow p2c DbName vt_customer Tablet 300",
"Routing rules for participating tables will be deleted",
"Unlock keyspace customer",
"Unlock keyspace product",
}

var dryRunResultsDropSourcesRenameCustomerShard = []string{
"Lock keyspace product",
"Lock keyspace customer",
"Dropping following tables:",
"Dropping following tables from the database and from the vschema for keyspace product:",
" Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer RemovalType RENAME TABLE",
"Blacklisted tables customer will be removed from:",
" Keyspace product Shard 0 Tablet 100",
Expand All @@ -122,6 +123,7 @@ var dryRunResultsDropSourcesRenameCustomerShard = []string{
"Delete vreplication streams on target:",
" Keyspace customer Shard -80 Workflow p2c DbName vt_customer Tablet 200",
" Keyspace customer Shard 80- Workflow p2c DbName vt_customer Tablet 300",
"Routing rules for participating tables will be deleted",
"Unlock keyspace customer",
"Unlock keyspace product",
}
1 change: 1 addition & 0 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta
rules[targetKeyspace+"."+table] = toSource
rules[targetKeyspace+"."+table+"@replica"] = toSource
rules[targetKeyspace+"."+table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[sourceKeyspace+"."+table+"@replica"] = toSource
rules[sourceKeyspace+"."+table+"@rdonly"] = toSource
}
Expand Down
4 changes: 4 additions & 0 deletions go/vt/wrangler/switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type switcher struct {
wr *Wrangler
}

func (r *switcher) deleteRoutingRules(ctx context.Context) error {
return r.ts.deleteRoutingRules(ctx)
}

func (r *switcher) dropSourceBlacklistedTables(ctx context.Context) error {
return r.ts.dropSourceBlacklistedTables(ctx)
}
Expand Down
14 changes: 11 additions & 3 deletions go/vt/wrangler/switcher_dry_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ type switcherDryRun struct {
ts *trafficSwitcher
}

func (dr *switcherDryRun) deleteRoutingRules(ctx context.Context) error {
dr.drLog.Log("Routing rules for participating tables will be deleted")
return nil
}

func (dr *switcherDryRun) switchShardReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error {
sourceShards := make([]string, 0)
targetShards := make([]string, 0)
Expand Down Expand Up @@ -230,11 +235,13 @@ func (dr *switcherDryRun) removeSourceTables(ctx context.Context, removalType Ta
for _, source := range dr.ts.sources {
for _, tableName := range dr.ts.tables {
logs = append(logs, fmt.Sprintf("\tKeyspace %s Shard %s DbName %s Tablet %d Table %s RemovalType %s",
source.master.Keyspace, source.master.Shard, source.master.DbName(), source.master.Alias.Uid, tableName, TableRemovalType(removalType)))
source.master.Keyspace, source.master.Shard, source.master.DbName(), source.master.Alias.Uid, tableName,
removalType))
}
}
if len(logs) > 0 {
dr.drLog.Log("Dropping following tables:")
dr.drLog.Log(fmt.Sprintf("Dropping following tables from the database and from the vschema for keyspace %s:",
dr.ts.sourceKeyspace))
dr.drLog.LogSlice(logs)
}
return nil
Expand Down Expand Up @@ -328,7 +335,8 @@ func (dr *switcherDryRun) removeTargetTables(ctx context.Context) error {
}
}
if len(logs) > 0 {
dr.drLog.Log("Dropping following tables:")
dr.drLog.Log(fmt.Sprintf("Dropping following tables from the database and from the vschema for keyspace %s:",
dr.ts.targetKeyspace))
dr.drLog.LogSlice(logs)
}
return nil
Expand Down
2 changes: 2 additions & 0 deletions go/vt/wrangler/switcher_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,7 @@ type iswitcher interface {
dropTargetVReplicationStreams(ctx context.Context) error
removeTargetTables(ctx context.Context) error
dropTargetShards(ctx context.Context) error
deleteRoutingRules(ctx context.Context) error

logs() *[]string
}
57 changes: 53 additions & 4 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflow s
return ts.id, sw.logs(), nil
}

// DropTargets cleans up target tables, shards and blacklisted tables after a MoveTables/Reshard is completed
// DropTargets cleans up target tables, shards and blacklisted tables if a MoveTables/Reshard is aborted
func (wr *Wrangler) DropTargets(ctx context.Context, targetKeyspace, workflow string, keepData, dryRun bool) (*[]string, error) {
ts, err := wr.buildTrafficSwitcher(ctx, targetKeyspace, workflow)
if err != nil {
Expand Down Expand Up @@ -627,6 +627,10 @@ func (wr *Wrangler) dropArtifacts(ctx context.Context, sw iswitcher) error {
if err := sw.dropTargetVReplicationStreams(ctx); err != nil {
return err
}
if err := sw.deleteRoutingRules(ctx); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -676,6 +680,7 @@ func (wr *Wrangler) DropSources(ctx context.Context, targetKeyspace, workflow st
if err := sw.dropSourceBlacklistedTables(ctx); err != nil {
return nil, err
}

case binlogdatapb.MigrationType_SHARDS:
log.Infof("Removing shards")
if err := sw.dropSourceShards(ctx); err != nil {
Expand Down Expand Up @@ -1474,8 +1479,8 @@ func doValidateWorkflowHasCompleted(ctx context.Context, ts *trafficSwitcher) er

}

func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType TableRemovalType) error {
return ts.forAllSources(func(source *tsSource) error {
func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType TableRemovalType) (err error) {
err = ts.forAllSources(func(source *tsSource) error {
for _, tableName := range ts.tables {
query := fmt.Sprintf("drop table %s.%s", source.master.DbName(), tableName)
if removalType == DropTable {
Expand All @@ -1495,6 +1500,22 @@ func (ts *trafficSwitcher) removeSourceTables(ctx context.Context, removalType T
}
return nil
})
if err != nil {
return err
}

return ts.dropParticipatingTablesFromKeyspace(ctx, ts.sourceKeyspace)
}

func (ts *trafficSwitcher) dropParticipatingTablesFromKeyspace(ctx context.Context, keyspace string) error {
vschema, err := ts.wr.ts.GetVSchema(ctx, keyspace)
if err != nil {
return err
}
for _, tableName := range ts.tables {
delete(vschema.Tables, tableName)
}
return ts.wr.ts.SaveVSchema(ctx, keyspace, vschema)
}

// FIXME: even after dropSourceShards there are still entries in the topo, need to research and fix
Expand Down Expand Up @@ -1546,7 +1567,7 @@ func (ts *trafficSwitcher) dropSourceReverseVReplicationStreams(ctx context.Cont
}

func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error {
return ts.forAllTargets(func(target *tsTarget) error {
err := ts.forAllTargets(func(target *tsTarget) error {
for _, tableName := range ts.tables {
query := fmt.Sprintf("drop table %s.%s", target.master.DbName(), tableName)
ts.wr.Logger().Infof("Dropping table %s.%s\n", target.master.DbName(), tableName)
Expand All @@ -1560,6 +1581,12 @@ func (ts *trafficSwitcher) removeTargetTables(ctx context.Context) error {
}
return nil
})
if err != nil {
return err
}

return ts.dropParticipatingTablesFromKeyspace(ctx, ts.targetKeyspace)

}

func (ts *trafficSwitcher) dropTargetShards(ctx context.Context) error {
Expand All @@ -1575,6 +1602,28 @@ func (ts *trafficSwitcher) dropTargetShards(ctx context.Context) error {
})
}

func (ts *trafficSwitcher) deleteRoutingRules(ctx context.Context) error {
rules, err := ts.wr.getRoutingRules(ctx)
if err != nil {
return err
}
for _, table := range ts.tables {
delete(rules, table)
delete(rules, table+"@replica")
delete(rules, table+"@rdonly")
delete(rules, ts.targetKeyspace+"."+table)
delete(rules, ts.targetKeyspace+"."+table+"@replica")
delete(rules, ts.targetKeyspace+"."+table+"@rdonly")
delete(rules, ts.sourceKeyspace+"."+table)
delete(rules, ts.sourceKeyspace+"."+table+"@replica")
delete(rules, ts.sourceKeyspace+"."+table+"@rdonly")
}
if err := ts.wr.saveRoutingRules(ctx, rules); err != nil {
return err
}
return nil
}

func (wr *Wrangler) getRoutingRules(ctx context.Context) (map[string][]string, error) {
rrs, err := wr.ts.GetRoutingRules(ctx)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ func TestTableMigrateOneToMany(t *testing.T) {
wantdryRunDropSources := []string{
"Lock keyspace ks1",
"Lock keyspace ks2",
"Dropping following tables:",
"Dropping following tables from the database and from the vschema for keyspace ks1:",
" Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t1 RemovalType DROP TABLE",
" Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t2 RemovalType DROP TABLE",
"Blacklisted tables t1,t2 will be removed from:",
Expand All @@ -868,6 +868,7 @@ func TestTableMigrateOneToMany(t *testing.T) {
"Delete vreplication streams on target:",
" Keyspace ks2 Shard -80 Workflow test DbName vt_ks2 Tablet 20",
" Keyspace ks2 Shard 80- Workflow test DbName vt_ks2 Tablet 30",
"Routing rules for participating tables will be deleted",
"Unlock keyspace ks2",
"Unlock keyspace ks1",
}
Expand All @@ -884,7 +885,7 @@ func TestTableMigrateOneToMany(t *testing.T) {
wantdryRunRenameSources := []string{
"Lock keyspace ks1",
"Lock keyspace ks2",
"Dropping following tables:",
"Dropping following tables from the database and from the vschema for keyspace ks1:",
" Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t1 RemovalType RENAME TABLE",
" Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t2 RemovalType RENAME TABLE",
"Blacklisted tables t1,t2 will be removed from:",
Expand All @@ -894,6 +895,7 @@ func TestTableMigrateOneToMany(t *testing.T) {
"Delete vreplication streams on target:",
" Keyspace ks2 Shard -80 Workflow test DbName vt_ks2 Tablet 20",
" Keyspace ks2 Shard 80- Workflow test DbName vt_ks2 Tablet 30",
"Routing rules for participating tables will be deleted",
"Unlock keyspace ks2",
"Unlock keyspace ks1",
}
Expand Down
72 changes: 72 additions & 0 deletions go/vt/wrangler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ limitations under the License.
package wrangler

import (
"fmt"
"testing"

"vitess.io/vitess/go/vt/topo"

"github.com/stretchr/testify/require"
"golang.org/x/net/context"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -170,6 +173,61 @@ func TestMoveTablesV2(t *testing.T) {
require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState())
}

func validateRoutingRuleCount(ctx context.Context, t *testing.T, ts *topo.Server, cnt int) {
rr, err := ts.GetRoutingRules(ctx)
fmt.Printf("Rules %+v\n", rr.Rules)
require.NoError(t, err)
require.NotNil(t, rr)
rules := rr.Rules
require.Equal(t, cnt, len(rules))
}

func checkIfTableExistInVSchema(ctx context.Context, t *testing.T, ts *topo.Server, keyspace string, table string) bool {
vschema, err := ts.GetVSchema(ctx, keyspace)
require.NoError(t, err)
require.NotNil(t, vschema)
_, ok := vschema.Tables[table]
return ok
}

func TestMoveTablesV2Complete(t *testing.T) {
ctx := context.Background()
p := &VReplicationWorkflowParams{
Workflow: "test",
SourceKeyspace: "ks1",
TargetKeyspace: "ks2",
Tables: "t1,t2",
Cells: "cell1,cell2",
TabletTypes: "replica,rdonly,master",
Timeout: DefaultActionTimeout,
}
tme := newTestTableMigrater(ctx, t)
defer tme.stopTablets(t)
wf, err := tme.wr.NewVReplicationWorkflow(ctx, MoveTablesWorkflow, p)
require.NoError(t, err)
require.NotNil(t, wf)
require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState())
tme.expectNoPreviousJournals()
expectMoveTablesQueries(t, tme)
tme.expectNoPreviousJournals()
require.NoError(t, wf.SwitchTraffic(DirectionForward))
require.Equal(t, WorkflowStateAllSwitched, wf.CurrentState())

//16 rules, 8 per table t1,t2 eg: t1,t1@replica,t1@rdonly,ks1.t1,ks1.t1@replica,ks1.t1@rdonly,ks2.t1@replica,ks2.t1@rdonly
validateRoutingRuleCount(ctx, t, wf.wr.ts, 16)
require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks1", "t1"))
require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks1", "t2"))
require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks2", "t1"))
require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks2", "t2"))
require.NoError(t, wf.Complete())
require.False(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks1", "t1"))
require.False(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks1", "t2"))
require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks2", "t1"))
require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks2", "t2"))

validateRoutingRuleCount(ctx, t, wf.wr.ts, 0)
}

func TestMoveTablesV2Partial(t *testing.T) {
ctx := context.Background()
p := &VReplicationWorkflowParams{
Expand Down Expand Up @@ -246,7 +304,21 @@ func TestMoveTablesV2Abort(t *testing.T) {
require.NotNil(t, wf)
require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState())
expectMoveTablesQueries(t, tme)
validateRoutingRuleCount(ctx, t, wf.wr.ts, 4) // rules set up by test env

require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks1", "t1"))
require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks1", "t2"))
require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks2", "t1"))
require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks2", "t2"))

require.NoError(t, wf.Abort())

validateRoutingRuleCount(ctx, t, wf.wr.ts, 0)

require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks1", "t1"))
require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks1", "t2"))
require.False(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks2", "t1"))
require.False(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks2", "t2"))
}

func TestReshardV2(t *testing.T) {
Expand Down

0 comments on commit 5b6dfb7

Please sign in to comment.