Skip to content

Commit

Permalink
Report current dry run results for v2 commands
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Jan 15, 2021
1 parent 36110d8 commit a4e752d
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 98 deletions.
14 changes: 3 additions & 11 deletions go/test/endtoend/vreplication/vreplication_test_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,7 @@ var dryRunResultsSwitchWritesCustomerShard = []string{
"Create journal entries on source databases",
"Enable writes on keyspace customer tables customer",
"Switch routing from keyspace product to keyspace customer",
"Following rules will be deleted:",
" customer.customer@rdonly => customer.customer",
" customer.customer@replica => customer.customer",
" customer@rdonly => customer.customer",
" customer@replica => customer.customer",
" product.customer@rdonly => customer.customer",
" product.customer@replica => customer.customer",
"Following rules will be added:",
" customer => customer.customer",
" product.customer => customer.customer",
"Routing rules for tables customer will be updated",
"SwitchWrites completed, freeze and delete vreplication streams on:",
" tablet 200 ",
" tablet 300 ",
Expand All @@ -50,7 +41,8 @@ var dryRunResultsSwitchWritesCustomerShard = []string{

var dryRunResultsReadCustomerShard = []string{
"Lock keyspace product",
"Switch reads for tables customer to keyspace customer",
"Switch reads for tables customer to keyspace customer for tablet types REPLICA",
"Routing rules for tables customer will be updated",
"Unlock keyspace product",
}

Expand Down
26 changes: 21 additions & 5 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1968,7 +1968,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
workflowType wrangler.VReplicationWorkflowType) error {

cells := subFlags.String("cells", "", "Cell(s) or CellAlias(es) (comma-separated) to replicate from.")
tabletTypes := subFlags.String("tablet_types", "", "Source tablet types to replicate from (e.g. master, replica, rdonly). Defaults to -vreplication_tablet_type parameter value for the tablet, which has the default value of replica.")
tabletTypes := subFlags.String("tablet_types", "master,replica,rdonly", "Source tablet types to replicate from (e.g. master, replica, rdonly). Defaults to -vreplication_tablet_type parameter value for the tablet, which has the default value of replica.")
dryRun := subFlags.Bool("dry_run", false, "Does a dry run of SwitchReads and only reports the actions to be taken")
timeout := subFlags.Duration("timeout", 30*time.Second, "Specifies the maximum time to wait, in seconds, for vreplication to catch up on master migrations. The migration will be cancelled on a timeout.")
reverseReplication := subFlags.Bool("reverse_replication", true, "Also reverse the replication")
Expand All @@ -1986,7 +1986,6 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla

_ = subFlags.Bool("v2", true, "")

_ = dryRun //TODO: add dry run functionality
if err := subFlags.Parse(args); err != nil {
return err
}
Expand Down Expand Up @@ -2139,6 +2138,16 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
return printDetails()

}

if *dryRun {
switch action {
case vReplicationWorkflowActionSwitchTraffic, vReplicationWorkflowActionReverseTraffic, vReplicationWorkflowActionComplete:
default:
return fmt.Errorf("-dry_run is only supported for SwitchTraffic, ReverseTraffic and Complete, not for %s", originalAction)
}
}

var dryRunResults *[]string
startState := wf.CachedState()
switch action {
case vReplicationWorkflowActionShow:
Expand Down Expand Up @@ -2207,11 +2216,11 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
}
}
case vReplicationWorkflowActionSwitchTraffic:
err = wf.SwitchTraffic(wrangler.DirectionForward)
dryRunResults, err = wf.SwitchTraffic(wrangler.DirectionForward)
case vReplicationWorkflowActionReverseTraffic:
err = wf.ReverseTraffic()
dryRunResults, err = wf.ReverseTraffic()
case vReplicationWorkflowActionComplete:
err = wf.Complete()
dryRunResults, err = wf.Complete()
case vReplicationWorkflowActionCancel:
err = wf.Cancel()
case vReplicationWorkflowActionGetState:
Expand All @@ -2224,6 +2233,13 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
log.Warningf(" %s error: %v", originalAction, wf)
return wrapError(wf, err)
}
if *dryRun {
if len(*dryRunResults) > 0 {
wr.Logger().Printf("Dry Run results for %s run at %s\nParameters: %s\n\n", time.RFC822, originalAction, strings.Join(args, " "))
wr.Logger().Printf("%s\n", strings.Join(*dryRunResults, "\n"))
return nil
}
}
wr.Logger().Printf("%s was successful\nStart State: %s\nCurrent State: %s\n\n",
originalAction, startState, wf.CurrentState())
return nil
Expand Down
34 changes: 11 additions & 23 deletions go/vt/wrangler/switcher_dry_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,14 @@ func (dr *switcherDryRun) switchTableReads(ctx context.Context, cells []string,
if direction == DirectionBackward {
ks = dr.ts.sourceKeyspace
}
dr.drLog.Log(fmt.Sprintf("Switch reads for tables %s to keyspace %s", strings.Join(dr.ts.tables, ","), ks))
var tabletTypes []string
for _, servedType := range servedTypes {
tabletTypes = append(tabletTypes, servedType.String())
}
tables := strings.Join(dr.ts.tables, ",")
dr.drLog.Log(fmt.Sprintf("Switch reads for tables %s to keyspace %s for tablet types %s",
tables, ks, strings.Join(tabletTypes, ",")))
dr.drLog.Log(fmt.Sprintf("Routing rules for tables %s will be updated", tables))
return nil
}

Expand All @@ -87,30 +94,11 @@ func (dr *switcherDryRun) allowTargetWrites(ctx context.Context) error {
}

func (dr *switcherDryRun) changeRouting(ctx context.Context) error {
rules, err := dr.ts.wr.getRoutingRules(ctx)
if err != nil {
return err
}
dr.drLog.Log(fmt.Sprintf("Switch routing from keyspace %s to keyspace %s", dr.ts.sourceKeyspace, dr.ts.targetKeyspace))
deleteLogs := make([]string, 0)
addLogs := make([]string, 0)
var deleteLogs, addLogs []string
if dr.ts.migrationType == binlogdatapb.MigrationType_TABLES {
for _, table := range dr.ts.tables {
for _, tabletType := range []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY} {
tt := strings.ToLower(tabletType.String())
deleteLogs = append(deleteLogs, fmt.Sprintf("\t%s => %s", table+"@"+tt, strings.Trim(rules[table+"@"+tt][0], "[]")))
deleteLogs = append(deleteLogs, fmt.Sprintf("\t%s => %s", dr.ts.targetKeyspace+"."+table+"@"+tt, strings.Trim(rules[dr.ts.targetKeyspace+"."+table+"@"+tt][0], "[]")))
deleteLogs = append(deleteLogs, fmt.Sprintf("\t%s => %s", dr.ts.sourceKeyspace+"."+table+"@"+tt, strings.Trim(rules[dr.ts.sourceKeyspace+"."+table+"@"+tt][0], "[]")))
}
addLogs = append(addLogs, fmt.Sprintf("\t%s => %s", table, dr.ts.targetKeyspace+"."+table))
addLogs = append(addLogs, fmt.Sprintf("\t%s => %s", dr.ts.sourceKeyspace+"."+table, dr.ts.targetKeyspace+"."+table))
}
if len(deleteLogs) > 0 {
dr.drLog.Log("Following rules will be deleted:")
dr.drLog.LogSlice(deleteLogs)
dr.drLog.Log("Following rules will be added:")
dr.drLog.LogSlice(addLogs)
}
tables := strings.Join(dr.ts.tables, ",")
dr.drLog.Log(fmt.Sprintf("Routing rules for tables %s will be updated", tables))
return nil
}
deleteLogs = nil
Expand Down
22 changes: 3 additions & 19 deletions go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,8 @@ func TestTableMigrateOneToManyDryRun(t *testing.T) {

wantdryRunReads := []string{
"Lock keyspace ks1",
"Switch reads for tables t1,t2 to keyspace ks2",
"Switch reads for tables t1,t2 to keyspace ks2 for tablet types RDONLY",
"Routing rules for tables t1,t2 will be updated",
"Unlock keyspace ks1",
}
wantdryRunWrites := []string{
Expand All @@ -943,24 +944,7 @@ func TestTableMigrateOneToManyDryRun(t *testing.T) {
"Create journal entries on source databases",
"Enable writes on keyspace ks2 tables t1,t2",
"Switch routing from keyspace ks1 to keyspace ks2",
"Following rules will be deleted:",
" ks1.t1@rdonly => ks2.t1",
" ks1.t1@replica => ks2.t1",
" ks1.t2@rdonly => ks2.t2",
" ks1.t2@replica => ks2.t2",
" ks2.t1@rdonly => ks2.t1",
" ks2.t1@replica => ks2.t1",
" ks2.t2@rdonly => ks2.t2",
" ks2.t2@replica => ks2.t2",
" t1@rdonly => ks2.t1",
" t1@replica => ks2.t1",
" t2@rdonly => ks2.t2",
" t2@replica => ks2.t2",
"Following rules will be added:",
" ks1.t1 => ks2.t1",
" ks1.t2 => ks2.t2",
" t1 => ks2.t1",
" t2 => ks2.t2",
"Routing rules for tables t1,t2 will be updated",
"SwitchWrites completed, freeze and delete vreplication streams on:",
" tablet 20",
" tablet 30",
Expand Down
69 changes: 42 additions & 27 deletions go/vt/wrangler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,32 +225,41 @@ func (vrw *VReplicationWorkflow) GetStreamCount() (int64, int64, []*WorkflowErro
}

// SwitchTraffic switches traffic forward for tablet_types passed
func (vrw *VReplicationWorkflow) SwitchTraffic(direction TrafficSwitchDirection) error {
func (vrw *VReplicationWorkflow) SwitchTraffic(direction TrafficSwitchDirection) (*[]string, error) {
var dryRunResults []string
var rdDryRunResults, wrDryRunResults *[]string
var err error
if !vrw.Exists() {
return fmt.Errorf("workflow has not yet been started")
return nil, fmt.Errorf("workflow has not yet been started")
}
vrw.params.Direction = direction
hasReplica, hasRdonly, hasMaster, err := vrw.parseTabletTypes()
if err != nil {
return err
return nil, err
}
if hasReplica || hasRdonly {
if err := vrw.switchReads(); err != nil {
return err
if rdDryRunResults, err = vrw.switchReads(); err != nil {
return nil, err
}
}
if rdDryRunResults != nil {
dryRunResults = append(dryRunResults, *rdDryRunResults...)
}
if hasMaster {
if err := vrw.switchWrites(); err != nil {
return err
if wrDryRunResults, err = vrw.switchWrites(); err != nil {
return nil, err
}
}
return nil
if wrDryRunResults != nil {
dryRunResults = append(dryRunResults, *wrDryRunResults...)
}
return &dryRunResults, nil
}

// ReverseTraffic switches traffic backwards for tablet_types passed
func (vrw *VReplicationWorkflow) ReverseTraffic() error {
func (vrw *VReplicationWorkflow) ReverseTraffic() (*[]string, error) {
if !vrw.Exists() {
return fmt.Errorf("workflow has not yet been started")
return nil, fmt.Errorf("workflow has not yet been started")
}
return vrw.SwitchTraffic(DirectionBackward)
}
Expand All @@ -262,22 +271,24 @@ const (
)

// Complete cleans up a successful workflow
func (vrw *VReplicationWorkflow) Complete() error {
func (vrw *VReplicationWorkflow) Complete() (*[]string, error) {
ws := vrw.ws
if !ws.WritesSwitched || len(ws.ReplicaCellsNotSwitched) > 0 || len(ws.RdonlyCellsNotSwitched) > 0 {
return fmt.Errorf(ErrWorkflowNotFullySwitched)
return nil, fmt.Errorf(ErrWorkflowNotFullySwitched)
}
var renameTable TableRemovalType
if vrw.params.RenameTables {
renameTable = RenameTable
} else {
renameTable = DropTable
}
if _, err := vrw.wr.DropSources(vrw.ctx, vrw.ws.TargetKeyspace, vrw.ws.Workflow, renameTable, vrw.params.KeepData,
false, false); err != nil {
return err
var dryRunResults *[]string
var err error
if dryRunResults, err = vrw.wr.DropSources(vrw.ctx, vrw.ws.TargetKeyspace, vrw.ws.Workflow, renameTable, vrw.params.KeepData,
false, vrw.params.DryRun); err != nil {
return nil, err
}
return nil
return dryRunResults, nil
}

// Cancel deletes all artifacts from a workflow which has not yet been switched
Expand Down Expand Up @@ -347,24 +358,28 @@ func (vrw *VReplicationWorkflow) initReshard() error {
vrw.params.TargetShards, vrw.params.SkipSchemaCopy, vrw.params.Cells, vrw.params.TabletTypes)
}

func (vrw *VReplicationWorkflow) switchReads() error {
func (vrw *VReplicationWorkflow) switchReads() (*[]string, error) {
log.Infof("In VReplicationWorkflow.switchReads() for %+v", vrw)
var tabletTypes []topodatapb.TabletType
for _, tt := range vrw.getTabletTypes() {
if tt != topodatapb.TabletType_MASTER {
tabletTypes = append(tabletTypes, tt)
}
}

_, err := vrw.wr.SwitchReads(vrw.ctx, vrw.params.TargetKeyspace, vrw.params.Workflow, tabletTypes,
vrw.getCellsAsArray(), vrw.params.Direction, false)
var dryRunResults *[]string
var err error
dryRunResults, err = vrw.wr.SwitchReads(vrw.ctx, vrw.params.TargetKeyspace, vrw.params.Workflow, tabletTypes,
vrw.getCellsAsArray(), vrw.params.Direction, vrw.params.DryRun)
if err != nil {
return err
return nil, err
}
return nil
return dryRunResults, nil
}

func (vrw *VReplicationWorkflow) switchWrites() error {
func (vrw *VReplicationWorkflow) switchWrites() (*[]string, error) {
var journalID int64
var dryRunResults *[]string
var err error
log.Infof("In VReplicationWorkflow.switchWrites() for %+v", vrw)
if vrw.params.Direction == DirectionBackward {
keyspace := vrw.params.SourceKeyspace
Expand All @@ -373,13 +388,13 @@ func (vrw *VReplicationWorkflow) switchWrites() error {
vrw.params.Workflow = reverseName(vrw.params.Workflow)
log.Infof("In VReplicationWorkflow.switchWrites(reverse) for %+v", vrw)
}
journalID, _, err := vrw.wr.SwitchWrites(vrw.ctx, vrw.params.TargetKeyspace, vrw.params.Workflow, vrw.params.Timeout,
false, vrw.params.Direction == DirectionBackward, vrw.params.EnableReverseReplication, false)
journalID, dryRunResults, err = vrw.wr.SwitchWrites(vrw.ctx, vrw.params.TargetKeyspace, vrw.params.Workflow, vrw.params.Timeout,
false, vrw.params.Direction == DirectionBackward, vrw.params.EnableReverseReplication, vrw.params.DryRun)
if err != nil {
return err
return nil, err
}
log.Infof("switchWrites succeeded with journal id %s", journalID)
return nil
return dryRunResults, nil
}

// endregion
Expand Down
Loading

0 comments on commit a4e752d

Please sign in to comment.