Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication DryRun: Report current dry run results for v2 commands #7255

Merged
merged 3 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 9 additions & 17 deletions go/test/endtoend/vreplication/vreplication_test_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,14 @@ package vreplication
var dryRunResultsSwitchWritesCustomerShard = []string{
"Lock keyspace product",
"Lock keyspace customer",
"Stop writes on keyspace product, tables customer:",
"Stop writes on keyspace product, tables [customer]:",
"/ Keyspace product, Shard 0 at Position",
"Wait for VReplication on stopped streams to catchup for upto 30s",
"Create reverse replication workflow p2c_reverse",
"Create journal entries on source databases",
"Enable writes on keyspace customer tables customer",
"Enable writes on keyspace customer tables [customer]",
"Switch routing from keyspace product to keyspace customer",
"Following rules will be deleted:",
" customer.customer@rdonly => customer.customer",
" customer.customer@replica => customer.customer",
" customer@rdonly => customer.customer",
" customer@replica => customer.customer",
" product.customer@rdonly => customer.customer",
" product.customer@replica => customer.customer",
"Following rules will be added:",
" customer => customer.customer",
" product.customer => customer.customer",
"Routing rules for tables [customer] will be updated",
"SwitchWrites completed, freeze and delete vreplication streams on:",
" tablet 200 ",
" tablet 300 ",
Expand All @@ -50,7 +41,8 @@ var dryRunResultsSwitchWritesCustomerShard = []string{

var dryRunResultsReadCustomerShard = []string{
"Lock keyspace product",
"Switch reads for tables customer to keyspace customer",
"Switch reads for tables [customer] to keyspace customer for tablet types [REPLICA]",
"Routing rules for tables [customer] will be updated",
"Unlock keyspace product",
}

Expand All @@ -65,13 +57,13 @@ var dryRunResultsSwitchWritesM2m3 = []string{
"/ Id 4 Keyspace customer Shard -80 Rules rules:<match:\"msales\" filter:\"select mname as merchant_name, count(*) as kount, sum(price) as amount from orders where in_keyrange(mname, 'merchant.md5', '80-') group by merchant_name\" > at Position ",
"/ Id 5 Keyspace customer Shard 80- Rules rules:<match:\"msales\" filter:\"select mname as merchant_name, count(*) as kount, sum(price) as amount from orders where in_keyrange(mname, 'merchant.md5', '-80') group by merchant_name\" > at Position ",
"/ Id 5 Keyspace customer Shard 80- Rules rules:<match:\"msales\" filter:\"select mname as merchant_name, count(*) as kount, sum(price) as amount from orders where in_keyrange(mname, 'merchant.md5', '80-') group by merchant_name\" > at Position ",
"Stop writes on keyspace merchant, tables /.*:",
"Stop writes on keyspace merchant, tables [/.*]:",
"/ Keyspace merchant, Shard -80 at Position",
"/ Keyspace merchant, Shard 80- at Position",
"Wait for VReplication on stopped streams to catchup for upto 30s",
"Create reverse replication workflow m2m3_reverse",
"Create journal entries on source databases",
"Enable writes on keyspace merchant tables /.*",
"Enable writes on keyspace merchant tables [/.*]",
"Switch routing from keyspace merchant to keyspace merchant",
"IsMasterServing will be set to false for:",
" Shard -80, Tablet 400 ",
Expand Down Expand Up @@ -99,7 +91,7 @@ var dryRunResultsDropSourcesDropCustomerShard = []string{
"Lock keyspace customer",
"Dropping these tables from the database and removing them from the vschema for keyspace product:",
" Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer",
"Blacklisted tables customer will be removed from:",
"Blacklisted tables [customer] will be removed from:",
" Keyspace product Shard 0 Tablet 100",
"Delete reverse vreplication streams on source:",
" Keyspace product Shard 0 Workflow p2c_reverse DbName vt_product Tablet 100",
Expand All @@ -116,7 +108,7 @@ var dryRunResultsDropSourcesRenameCustomerShard = []string{
"Lock keyspace customer",
"Renaming these tables from the database and removing them from the vschema for keyspace product:",
" Keyspace product Shard 0 DbName vt_product Tablet 100 Table customer",
"Blacklisted tables customer will be removed from:",
"Blacklisted tables [customer] will be removed from:",
" Keyspace product Shard 0 Tablet 100",
"Delete reverse vreplication streams on source:",
" Keyspace product Shard 0 Workflow p2c_reverse DbName vt_product Tablet 100",
Expand Down
28 changes: 22 additions & 6 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1968,8 +1968,8 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
workflowType wrangler.VReplicationWorkflowType) error {

cells := subFlags.String("cells", "", "Cell(s) or CellAlias(es) (comma-separated) to replicate from.")
tabletTypes := subFlags.String("tablet_types", "", "Source tablet types to replicate from (e.g. master, replica, rdonly). Defaults to -vreplication_tablet_type parameter value for the tablet, which has the default value of replica.")
dryRun := subFlags.Bool("dry_run", false, "Does a dry run of SwitchReads and only reports the actions to be taken")
tabletTypes := subFlags.String("tablet_types", "master,replica,rdonly", "Source tablet types to replicate from (e.g. master, replica, rdonly). Defaults to -vreplication_tablet_type parameter value for the tablet, which has the default value of replica.")
dryRun := subFlags.Bool("dry_run", false, "Does a dry run of SwitchReads and only reports the actions to be taken. -dry_run is only supported for SwitchTraffic, ReverseTraffic and Complete.")
timeout := subFlags.Duration("timeout", 30*time.Second, "Specifies the maximum time to wait, in seconds, for vreplication to catch up on master migrations. The migration will be cancelled on a timeout.")
reverseReplication := subFlags.Bool("reverse_replication", true, "Also reverse the replication")
keepData := subFlags.Bool("keep_data", false, "Do not drop tables or shards (if true, only vreplication artifacts are cleaned up)")
Expand All @@ -1986,7 +1986,6 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla

_ = subFlags.Bool("v2", true, "")

_ = dryRun //TODO: add dry run functionality
if err := subFlags.Parse(args); err != nil {
return err
}
Expand Down Expand Up @@ -2139,6 +2138,16 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
return printDetails()

}

if *dryRun {
switch action {
case vReplicationWorkflowActionSwitchTraffic, vReplicationWorkflowActionReverseTraffic, vReplicationWorkflowActionComplete:
default:
return fmt.Errorf("-dry_run is only supported for SwitchTraffic, ReverseTraffic and Complete, not for %s", originalAction)
}
}

var dryRunResults *[]string
startState := wf.CachedState()
switch action {
case vReplicationWorkflowActionShow:
Expand Down Expand Up @@ -2207,11 +2216,11 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
}
}
case vReplicationWorkflowActionSwitchTraffic:
err = wf.SwitchTraffic(wrangler.DirectionForward)
dryRunResults, err = wf.SwitchTraffic(wrangler.DirectionForward)
case vReplicationWorkflowActionReverseTraffic:
err = wf.ReverseTraffic()
dryRunResults, err = wf.ReverseTraffic()
case vReplicationWorkflowActionComplete:
err = wf.Complete()
dryRunResults, err = wf.Complete()
case vReplicationWorkflowActionCancel:
err = wf.Cancel()
case vReplicationWorkflowActionGetState:
Expand All @@ -2224,6 +2233,13 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
log.Warningf(" %s error: %v", originalAction, wf)
return wrapError(wf, err)
}
if *dryRun {
if len(*dryRunResults) > 0 {
wr.Logger().Printf("Dry Run results for %s run at %s\nParameters: %s\n\n", time.RFC822, originalAction, strings.Join(args, " "))
wr.Logger().Printf("%s\n", strings.Join(*dryRunResults, "\n"))
return nil
}
}
wr.Logger().Printf("%s was successful\nStart State: %s\nCurrent State: %s\n\n",
originalAction, startState, wf.CurrentState())
return nil
Expand Down
40 changes: 14 additions & 26 deletions go/vt/wrangler/switcher_dry_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,14 @@ func (dr *switcherDryRun) switchTableReads(ctx context.Context, cells []string,
if direction == DirectionBackward {
ks = dr.ts.sourceKeyspace
}
dr.drLog.Log(fmt.Sprintf("Switch reads for tables %s to keyspace %s", strings.Join(dr.ts.tables, ","), ks))
var tabletTypes []string
for _, servedType := range servedTypes {
tabletTypes = append(tabletTypes, servedType.String())
}
tables := strings.Join(dr.ts.tables, ",")
dr.drLog.Log(fmt.Sprintf("Switch reads for tables [%s] to keyspace %s for tablet types [%s]",
tables, ks, strings.Join(tabletTypes, ",")))
dr.drLog.Log(fmt.Sprintf("Routing rules for tables [%s] will be updated", tables))
return nil
}

Expand All @@ -82,35 +89,16 @@ func (dr *switcherDryRun) createJournals(ctx context.Context, sourceWorkflows []
}

func (dr *switcherDryRun) allowTargetWrites(ctx context.Context) error {
dr.drLog.Log(fmt.Sprintf("Enable writes on keyspace %s tables %s", dr.ts.targetKeyspace, strings.Join(dr.ts.tables, ",")))
dr.drLog.Log(fmt.Sprintf("Enable writes on keyspace %s tables [%s]", dr.ts.targetKeyspace, strings.Join(dr.ts.tables, ",")))
return nil
}

func (dr *switcherDryRun) changeRouting(ctx context.Context) error {
rules, err := dr.ts.wr.getRoutingRules(ctx)
if err != nil {
return err
}
dr.drLog.Log(fmt.Sprintf("Switch routing from keyspace %s to keyspace %s", dr.ts.sourceKeyspace, dr.ts.targetKeyspace))
deleteLogs := make([]string, 0)
addLogs := make([]string, 0)
var deleteLogs, addLogs []string
if dr.ts.migrationType == binlogdatapb.MigrationType_TABLES {
for _, table := range dr.ts.tables {
for _, tabletType := range []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY} {
tt := strings.ToLower(tabletType.String())
deleteLogs = append(deleteLogs, fmt.Sprintf("\t%s => %s", table+"@"+tt, strings.Trim(rules[table+"@"+tt][0], "[]")))
deleteLogs = append(deleteLogs, fmt.Sprintf("\t%s => %s", dr.ts.targetKeyspace+"."+table+"@"+tt, strings.Trim(rules[dr.ts.targetKeyspace+"."+table+"@"+tt][0], "[]")))
deleteLogs = append(deleteLogs, fmt.Sprintf("\t%s => %s", dr.ts.sourceKeyspace+"."+table+"@"+tt, strings.Trim(rules[dr.ts.sourceKeyspace+"."+table+"@"+tt][0], "[]")))
}
addLogs = append(addLogs, fmt.Sprintf("\t%s => %s", table, dr.ts.targetKeyspace+"."+table))
addLogs = append(addLogs, fmt.Sprintf("\t%s => %s", dr.ts.sourceKeyspace+"."+table, dr.ts.targetKeyspace+"."+table))
}
if len(deleteLogs) > 0 {
dr.drLog.Log("Following rules will be deleted:")
dr.drLog.LogSlice(deleteLogs)
dr.drLog.Log("Following rules will be added:")
dr.drLog.LogSlice(addLogs)
}
tables := strings.Join(dr.ts.tables, ",")
dr.drLog.Log(fmt.Sprintf("Routing rules for tables [%s] will be updated", tables))
return nil
}
deleteLogs = nil
Expand Down Expand Up @@ -198,7 +186,7 @@ func (dr *switcherDryRun) stopSourceWrites(ctx context.Context) error {
logs = append(logs, fmt.Sprintf("\tKeyspace %s, Shard %s at Position %s", dr.ts.sourceKeyspace, source.si.ShardName(), position))
}
if len(logs) > 0 {
dr.drLog.Log(fmt.Sprintf("Stop writes on keyspace %s, tables %s:", dr.ts.sourceKeyspace, strings.Join(dr.ts.tables, ",")))
dr.drLog.Log(fmt.Sprintf("Stop writes on keyspace %s, tables [%s]:", dr.ts.sourceKeyspace, strings.Join(dr.ts.tables, ",")))
dr.drLog.LogSlice(logs)
}
return nil
Expand Down Expand Up @@ -319,7 +307,7 @@ func (dr *switcherDryRun) dropSourceBlacklistedTables(ctx context.Context) error
logs = append(logs, fmt.Sprintf("\tKeyspace %s Shard %s Tablet %d", si.Keyspace(), si.ShardName(), si.MasterAlias.Uid))
}
if len(logs) > 0 {
dr.drLog.Log(fmt.Sprintf("Blacklisted tables %s will be removed from:", strings.Join(dr.ts.tables, ",")))
dr.drLog.Log(fmt.Sprintf("Blacklisted tables [%s] will be removed from:", strings.Join(dr.ts.tables, ",")))
dr.drLog.LogSlice(logs)
}
return nil
Expand Down
30 changes: 7 additions & 23 deletions go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ func TestTableMigrateOneToMany(t *testing.T) {
"Dropping these tables from the database and removing them from the vschema for keyspace ks1:",
" Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t1",
" Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t2",
"Blacklisted tables t1,t2 will be removed from:",
"Blacklisted tables [t1,t2] will be removed from:",
" Keyspace ks1 Shard 0 Tablet 10",
"Delete reverse vreplication streams on source:",
" Keyspace ks1 Shard 0 Workflow test_reverse DbName vt_ks1 Tablet 10",
Expand All @@ -888,7 +888,7 @@ func TestTableMigrateOneToMany(t *testing.T) {
"Renaming these tables from the database and removing them from the vschema for keyspace ks1:", " " +
"Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t1",
" Keyspace ks1 Shard 0 DbName vt_ks1 Tablet 10 Table t2",
"Blacklisted tables t1,t2 will be removed from:",
"Blacklisted tables [t1,t2] will be removed from:",
" Keyspace ks1 Shard 0 Tablet 10",
"Delete reverse vreplication streams on source:",
" Keyspace ks1 Shard 0 Workflow test_reverse DbName vt_ks1 Tablet 10",
Expand Down Expand Up @@ -930,37 +930,21 @@ func TestTableMigrateOneToManyDryRun(t *testing.T) {

wantdryRunReads := []string{
"Lock keyspace ks1",
"Switch reads for tables t1,t2 to keyspace ks2",
"Switch reads for tables [t1,t2] to keyspace ks2 for tablet types [RDONLY]",
"Routing rules for tables [t1,t2] will be updated",
"Unlock keyspace ks1",
}
wantdryRunWrites := []string{
"Lock keyspace ks1",
"Lock keyspace ks2",
"Stop writes on keyspace ks1, tables t1,t2:",
"Stop writes on keyspace ks1, tables [t1,t2]:",
"\tKeyspace ks1, Shard 0 at Position MariaDB/5-456-892",
"Wait for VReplication on stopped streams to catchup for upto 1s",
"Create reverse replication workflow test_reverse",
"Create journal entries on source databases",
"Enable writes on keyspace ks2 tables t1,t2",
"Enable writes on keyspace ks2 tables [t1,t2]",
"Switch routing from keyspace ks1 to keyspace ks2",
"Following rules will be deleted:",
" ks1.t1@rdonly => ks2.t1",
" ks1.t1@replica => ks2.t1",
" ks1.t2@rdonly => ks2.t2",
" ks1.t2@replica => ks2.t2",
" ks2.t1@rdonly => ks2.t1",
" ks2.t1@replica => ks2.t1",
" ks2.t2@rdonly => ks2.t2",
" ks2.t2@replica => ks2.t2",
" t1@rdonly => ks2.t1",
" t1@replica => ks2.t1",
" t2@rdonly => ks2.t2",
" t2@replica => ks2.t2",
"Following rules will be added:",
" ks1.t1 => ks2.t1",
" ks1.t2 => ks2.t2",
" t1 => ks2.t1",
" t2 => ks2.t2",
"Routing rules for tables [t1,t2] will be updated",
"SwitchWrites completed, freeze and delete vreplication streams on:",
" tablet 20",
" tablet 30",
Expand Down
Loading