Skip to content

Commit

Permalink
Merge pull request #7345 from planetscale/rn-backport-7255
Browse files Browse the repository at this point in the history
Backport #7255: VReplication DryRun: Report current dry run results for v2 commands
  • Loading branch information
rohit-nayak-ps authored Jan 22, 2021
2 parents 4cc1eac + 3e7c0fe commit c8f5d98
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 112 deletions.
26 changes: 9 additions & 17 deletions go/test/endtoend/vreplication/vreplication_test_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,14 @@ package vreplication
var dryRunResultsSwitchWritesCustomerShard = []string{
"Lock keyspace product",
"Lock keyspace customer",
"Stop writes on keyspace product, tables customer:",
"Stop writes on keyspace product, tables [customer]:",
"/ Keyspace product, Shard 0 at Position",
"Wait for VReplication on stopped streams to catchup for upto 30s",
"Create reverse replication workflow p2c_reverse",
"Create journal entries on source databases",
"Enable writes on keyspace customer tables customer",
"Enable writes on keyspace customer tables [customer]",
"Switch routing from keyspace product to keyspace customer",
"Following rules will be deleted:",
" customer.customer@rdonly => customer.customer",
" customer.customer@replica => customer.customer",
" customer@rdonly => customer.customer",
" customer@replica => customer.customer",
" product.customer@rdonly => customer.customer",
" product.customer@replica => customer.customer",
"Following rules will be added:",
" customer => customer.customer",
" product.customer => customer.customer",
"Routing rules for tables [customer] will be updated",
"SwitchWrites completed, freeze and delete vreplication streams on:",
" tablet 200 ",
" tablet 300 ",
Expand All @@ -50,7 +41,8 @@ var dryRunResultsSwitchWritesCustomerShard = []string{

var dryRunResultsReadCustomerShard = []string{
"Lock keyspace product",
"Switch reads for tables customer to keyspace customer",
"Switch reads for tables [customer] to keyspace customer for tablet types [REPLICA]",
"Routing rules for tables [customer] will be updated",
"Unlock keyspace product",
}

Expand All @@ -65,13 +57,13 @@ var dryRunResultsSwitchWritesM2m3 = []string{
"/ Id 4 Keyspace customer Shard -80 Rules rules:<match:\"msales\" filter:\"select mname as merchant_name, count(*) as kount, sum(price) as amount from orders where in_keyrange(mname, 'merchant.md5', '80-') group by merchant_name\" > at Position ",
"/ Id 5 Keyspace customer Shard 80- Rules rules:<match:\"msales\" filter:\"select mname as merchant_name, count(*) as kount, sum(price) as amount from orders where in_keyrange(mname, 'merchant.md5', '-80') group by merchant_name\" > at Position ",
"/ Id 5 Keyspace customer Shard 80- Rules rules:<match:\"msales\" filter:\"select mname as merchant_name, count(*) as kount, sum(price) as amount from orders where in_keyrange(mname, 'merchant.md5', '80-') group by merchant_name\" > at Position ",
"Stop writes on keyspace merchant, tables /.*:",
"Stop writes on keyspace merchant, tables [/.*]:",
"/ Keyspace merchant, Shard -80 at Position",
"/ Keyspace merchant, Shard 80- at Position",
"Wait for VReplication on stopped streams to catchup for upto 30s",
"Create reverse replication workflow m2m3_reverse",
"Create journal entries on source databases",
"Enable writes on keyspace merchant tables /.*",
"Enable writes on keyspace merchant tables [/.*]",
"Switch routing from keyspace merchant to keyspace merchant",
"IsMasterServing will be set to false for:",
" Shard -80, Tablet 400 ",
Expand Down Expand Up @@ -99,7 +91,7 @@ var dryRunResultsDropSourcesDropCustomerShard = []string{
"Lock keyspace customer",
"Dropping these tables from the database and removing them from the vschema for keyspace product:",
" Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer",
"Blacklisted tables customer will be removed from:",
"Blacklisted tables [customer] will be removed from:",
" Keyspace product Shard 0 Tablet 100",
"Delete reverse vreplication streams on source:",
" Keyspace product Shard 0 Workflow p2c_reverse DbName vt_product Tablet 100",
Expand All @@ -116,7 +108,7 @@ var dryRunResultsDropSourcesRenameCustomerShard = []string{
"Lock keyspace customer",
"Renaming these tables from the database and removing them from the vschema for keyspace product:",
" Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer",
"Blacklisted tables customer will be removed from:",
"Blacklisted tables [customer] will be removed from:",
" Keyspace product Shard 0 Tablet 100",
"Delete reverse vreplication streams on source:",
" Keyspace product Shard 0 Workflow p2c_reverse DbName vt_product Tablet 100",
Expand Down
28 changes: 22 additions & 6 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1968,8 +1968,8 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
workflowType wrangler.VReplicationWorkflowType) error {

cells := subFlags.String("cells", "", "Cell(s) or CellAlias(es) (comma-separated) to replicate from.")
tabletTypes := subFlags.String("tablet_types", "", "Source tablet types to replicate from (e.g. master, replica, rdonly). Defaults to -vreplication_tablet_type parameter value for the tablet, which has the default value of replica.")
dryRun := subFlags.Bool("dry_run", false, "Does a dry run of SwitchReads and only reports the actions to be taken")
tabletTypes := subFlags.String("tablet_types", "master,replica,rdonly", "Source tablet types to replicate from (e.g. master, replica, rdonly). Defaults to -vreplication_tablet_type parameter value for the tablet, which has the default value of replica.")
dryRun := subFlags.Bool("dry_run", false, "Does a dry run of SwitchReads and only reports the actions to be taken. -dry_run is only supported for SwitchTraffic, ReverseTraffic and Complete.")
timeout := subFlags.Duration("timeout", 30*time.Second, "Specifies the maximum time to wait, in seconds, for vreplication to catch up on master migrations. The migration will be cancelled on a timeout.")
reverseReplication := subFlags.Bool("reverse_replication", true, "Also reverse the replication")
keepData := subFlags.Bool("keep_data", false, "Do not drop tables or shards (if true, only vreplication artifacts are cleaned up)")
Expand All @@ -1986,7 +1986,6 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla

_ = subFlags.Bool("v2", true, "")

_ = dryRun //TODO: add dry run functionality
if err := subFlags.Parse(args); err != nil {
return err
}
Expand Down Expand Up @@ -2139,6 +2138,16 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
return printDetails()

}

if *dryRun {
switch action {
case vReplicationWorkflowActionSwitchTraffic, vReplicationWorkflowActionReverseTraffic, vReplicationWorkflowActionComplete:
default:
return fmt.Errorf("-dry_run is only supported for SwitchTraffic, ReverseTraffic and Complete, not for %s", originalAction)
}
}

var dryRunResults *[]string
startState := wf.CachedState()
switch action {
case vReplicationWorkflowActionShow:
Expand Down Expand Up @@ -2207,11 +2216,11 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
}
}
case vReplicationWorkflowActionSwitchTraffic:
err = wf.SwitchTraffic(wrangler.DirectionForward)
dryRunResults, err = wf.SwitchTraffic(wrangler.DirectionForward)
case vReplicationWorkflowActionReverseTraffic:
err = wf.ReverseTraffic()
dryRunResults, err = wf.ReverseTraffic()
case vReplicationWorkflowActionComplete:
err = wf.Complete()
dryRunResults, err = wf.Complete()
case vReplicationWorkflowActionCancel:
err = wf.Cancel()
case vReplicationWorkflowActionGetState:
Expand All @@ -2224,6 +2233,13 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
log.Warningf(" %s error: %v", originalAction, wf)
return wrapError(wf, err)
}
if *dryRun {
if len(*dryRunResults) > 0 {
wr.Logger().Printf("Dry Run results for %s run at %s\nParameters: %s\n\n", time.RFC822, originalAction, strings.Join(args, " "))
wr.Logger().Printf("%s\n", strings.Join(*dryRunResults, "\n"))
return nil
}
}
wr.Logger().Printf("%s was successful\nStart State: %s\nCurrent State: %s\n\n",
originalAction, startState, wf.CurrentState())
return nil
Expand Down
40 changes: 14 additions & 26 deletions go/vt/wrangler/switcher_dry_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,14 @@ func (dr *switcherDryRun) switchTableReads(ctx context.Context, cells []string,
if direction == DirectionBackward {
ks = dr.ts.sourceKeyspace
}
dr.drLog.Log(fmt.Sprintf("Switch reads for tables %s to keyspace %s", strings.Join(dr.ts.tables, ","), ks))
var tabletTypes []string
for _, servedType := range servedTypes {
tabletTypes = append(tabletTypes, servedType.String())
}
tables := strings.Join(dr.ts.tables, ",")
dr.drLog.Log(fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [%s]",
tables, ks, strings.Join(tabletTypes, ",")))
dr.drLog.Log(fmt.Sprintf("Routing rules for tables [%s] will be updated", tables))
return nil
}

Expand All @@ -82,35 +89,16 @@ func (dr *switcherDryRun) createJournals(ctx context.Context, sourceWorkflows []
}

func (dr *switcherDryRun) allowTargetWrites(ctx context.Context) error {
dr.drLog.Log(fmt.Sprintf("Enable writes on keyspace %s tables %s", dr.ts.targetKeyspace, strings.Join(dr.ts.tables, ",")))
dr.drLog.Log(fmt.Sprintf("Enable writes on keyspace %s tables [%s]", dr.ts.targetKeyspace, strings.Join(dr.ts.tables, ",")))
return nil
}

func (dr *switcherDryRun) changeRouting(ctx context.Context) error {
rules, err := dr.ts.wr.getRoutingRules(ctx)
if err != nil {
return err
}
dr.drLog.Log(fmt.Sprintf("Switch routing from keyspace %s to keyspace %s", dr.ts.sourceKeyspace, dr.ts.targetKeyspace))
deleteLogs := make([]string, 0)
addLogs := make([]string, 0)
var deleteLogs, addLogs []string
if dr.ts.migrationType == binlogdatapb.MigrationType_TABLES {
for _, table := range dr.ts.tables {
for _, tabletType := range []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY} {
tt := strings.ToLower(tabletType.String())
deleteLogs = append(deleteLogs, fmt.Sprintf("\t%s => %s", table+"@"+tt, strings.Trim(rules[table+"@"+tt][0], "[]")))
deleteLogs = append(deleteLogs, fmt.Sprintf("\t%s => %s", dr.ts.targetKeyspace+"."+table+"@"+tt, strings.Trim(rules[dr.ts.targetKeyspace+"."+table+"@"+tt][0], "[]")))
deleteLogs = append(deleteLogs, fmt.Sprintf("\t%s => %s", dr.ts.sourceKeyspace+"."+table+"@"+tt, strings.Trim(rules[dr.ts.sourceKeyspace+"."+table+"@"+tt][0], "[]")))
}
addLogs = append(addLogs, fmt.Sprintf("\t%s => %s", table, dr.ts.targetKeyspace+"."+table))
addLogs = append(addLogs, fmt.Sprintf("\t%s => %s", dr.ts.sourceKeyspace+"."+table, dr.ts.targetKeyspace+"."+table))
}
if len(deleteLogs) > 0 {
dr.drLog.Log("Following rules will be deleted:")
dr.drLog.LogSlice(deleteLogs)
dr.drLog.Log("Following rules will be added:")
dr.drLog.LogSlice(addLogs)
}
tables := strings.Join(dr.ts.tables, ",")
dr.drLog.Log(fmt.Sprintf("Routing rules for tables [%s] will be updated", tables))
return nil
}
deleteLogs = nil
Expand Down Expand Up @@ -198,7 +186,7 @@ func (dr *switcherDryRun) stopSourceWrites(ctx context.Context) error {
logs = append(logs, fmt.Sprintf("\tKeyspace %s, Shard %s at Position %s", dr.ts.sourceKeyspace, source.si.ShardName(), position))
}
if len(logs) > 0 {
dr.drLog.Log(fmt.Sprintf("Stop writes on keyspace %s, tables %s:", dr.ts.sourceKeyspace, strings.Join(dr.ts.tables, ",")))
dr.drLog.Log(fmt.Sprintf("Stop writes on keyspace %s, tables [%s]:", dr.ts.sourceKeyspace, strings.Join(dr.ts.tables, ",")))
dr.drLog.LogSlice(logs)
}
return nil
Expand Down Expand Up @@ -319,7 +307,7 @@ func (dr *switcherDryRun) dropSourceBlacklistedTables(ctx context.Context) error
logs = append(logs, fmt.Sprintf("\tKeyspace %s Shard %s Tablet %d", si.Keyspace(), si.ShardName(), si.MasterAlias.Uid))
}
if len(logs) > 0 {
dr.drLog.Log(fmt.Sprintf("Blacklisted tables %s will be removed from:", strings.Join(dr.ts.tables, ",")))
dr.drLog.Log(fmt.Sprintf("Blacklisted tables [%s] will be removed from:", strings.Join(dr.ts.tables, ",")))
dr.drLog.LogSlice(logs)
}
return nil
Expand Down
30 changes: 7 additions & 23 deletions go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ func TestTableMigrateOneToMany(t *testing.T) {
"Dropping these tables from the database and removing them from the vschema for keyspace ks1:",
" Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t1",
" Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t2",
"Blacklisted tables t1,t2 will be removed from:",
"Blacklisted tables [t1,t2] will be removed from:",
" Keyspace ks1 Shard 0 Tablet 10",
"Delete reverse vreplication streams on source:",
" Keyspace ks1 Shard 0 Workflow test_reverse DbName vt_ks1 Tablet 10",
Expand All @@ -888,7 +888,7 @@ func TestTableMigrateOneToMany(t *testing.T) {
"Renaming these tables from the database and removing them from the vschema for keyspace ks1:", " " +
"Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t1",
" Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t2",
"Blacklisted tables t1,t2 will be removed from:",
"Blacklisted tables [t1,t2] will be removed from:",
" Keyspace ks1 Shard 0 Tablet 10",
"Delete reverse vreplication streams on source:",
" Keyspace ks1 Shard 0 Workflow test_reverse DbName vt_ks1 Tablet 10",
Expand Down Expand Up @@ -930,37 +930,21 @@ func TestTableMigrateOneToManyDryRun(t *testing.T) {

wantdryRunReads := []string{
"Lock keyspace ks1",
"Switch reads for tables t1,t2 to keyspace ks2",
"Switch reads for tables [t1,t2] to keyspace ks2 for tablet types [RDONLY]",
"Routing rules for tables [t1,t2] will be updated",
"Unlock keyspace ks1",
}
wantdryRunWrites := []string{
"Lock keyspace ks1",
"Lock keyspace ks2",
"Stop writes on keyspace ks1, tables t1,t2:",
"Stop writes on keyspace ks1, tables [t1,t2]:",
"\tKeyspace ks1, Shard 0 at Position MariaDB/5-456-892",
"Wait for VReplication on stopped streams to catchup for upto 1s",
"Create reverse replication workflow test_reverse",
"Create journal entries on source databases",
"Enable writes on keyspace ks2 tables t1,t2",
"Enable writes on keyspace ks2 tables [t1,t2]",
"Switch routing from keyspace ks1 to keyspace ks2",
"Following rules will be deleted:",
" ks1.t1@rdonly => ks2.t1",
" ks1.t1@replica => ks2.t1",
" ks1.t2@rdonly => ks2.t2",
" ks1.t2@replica => ks2.t2",
" ks2.t1@rdonly => ks2.t1",
" ks2.t1@replica => ks2.t1",
" ks2.t2@rdonly => ks2.t2",
" ks2.t2@replica => ks2.t2",
" t1@rdonly => ks2.t1",
" t1@replica => ks2.t1",
" t2@rdonly => ks2.t2",
" t2@replica => ks2.t2",
"Following rules will be added:",
" ks1.t1 => ks2.t1",
" ks1.t2 => ks2.t2",
" t1 => ks2.t1",
" t2 => ks2.t2",
"Routing rules for tables [t1,t2] will be updated",
"SwitchWrites completed, freeze and delete vreplication streams on:",
" tablet 20",
" tablet 30",
Expand Down
Loading

0 comments on commit c8f5d98

Please sign in to comment.