From a4e752d9c2bde058581614d23791d114e7275c0b Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 5 Jan 2021 22:16:45 +0100 Subject: [PATCH] Report current dry run results for v2 commands Signed-off-by: Rohit Nayak --- .../vreplication/vreplication_test_env.go | 14 +--- go/vt/vtctl/vtctl.go | 26 +++++-- go/vt/wrangler/switcher_dry_run.go | 34 +++------ go/vt/wrangler/traffic_switcher_test.go | 22 +----- go/vt/wrangler/workflow.go | 69 +++++++++++-------- go/vt/wrangler/workflow_test.go | 40 +++++++---- 6 files changed, 107 insertions(+), 98 deletions(-) diff --git a/go/test/endtoend/vreplication/vreplication_test_env.go b/go/test/endtoend/vreplication/vreplication_test_env.go index 62e31c59e11..eb196227094 100644 --- a/go/test/endtoend/vreplication/vreplication_test_env.go +++ b/go/test/endtoend/vreplication/vreplication_test_env.go @@ -26,16 +26,7 @@ var dryRunResultsSwitchWritesCustomerShard = []string{ "Create journal entries on source databases", "Enable writes on keyspace customer tables customer", "Switch routing from keyspace product to keyspace customer", - "Following rules will be deleted:", - " customer.customer@rdonly => customer.customer", - " customer.customer@replica => customer.customer", - " customer@rdonly => customer.customer", - " customer@replica => customer.customer", - " product.customer@rdonly => customer.customer", - " product.customer@replica => customer.customer", - "Following rules will be added:", - " customer => customer.customer", - " product.customer => customer.customer", + "Routing rules for tables customer will be updated", "SwitchWrites completed, freeze and delete vreplication streams on:", " tablet 200 ", " tablet 300 ", @@ -50,7 +41,8 @@ var dryRunResultsSwitchWritesCustomerShard = []string{ var dryRunResultsReadCustomerShard = []string{ "Lock keyspace product", - "Switch reads for tables customer to keyspace customer", + "Switch reads for tables customer to keyspace customer for tablet types REPLICA", + "Routing rules for tables customer will be updated", "Unlock keyspace product", } diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index b4b964d1a26..adaa48db6f8 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -1968,7 +1968,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla workflowType wrangler.VReplicationWorkflowType) error { cells := subFlags.String("cells", "", "Cell(s) or CellAlias(es) (comma-separated) to replicate from.") - tabletTypes := subFlags.String("tablet_types", "", "Source tablet types to replicate from (e.g. master, replica, rdonly). Defaults to -vreplication_tablet_type parameter value for the tablet, which has the default value of replica.") + tabletTypes := subFlags.String("tablet_types", "master,replica,rdonly", "Source tablet types to replicate from (e.g. master, replica, rdonly). Defaults to -vreplication_tablet_type parameter value for the tablet, which has the default value of replica.") dryRun := subFlags.Bool("dry_run", false, "Does a dry run of SwitchReads and only reports the actions to be taken") timeout := subFlags.Duration("timeout", 30*time.Second, "Specifies the maximum time to wait, in seconds, for vreplication to catch up on master migrations. The migration will be cancelled on a timeout.") reverseReplication := subFlags.Bool("reverse_replication", true, "Also reverse the replication") @@ -1986,7 +1986,6 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla _ = subFlags.Bool("v2", true, "") - _ = dryRun //TODO: add dry run functionality if err := subFlags.Parse(args); err != nil { return err } @@ -2139,6 +2138,16 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla return printDetails() } + + if *dryRun { + switch action { + case vReplicationWorkflowActionSwitchTraffic, vReplicationWorkflowActionReverseTraffic, vReplicationWorkflowActionComplete: + default: + return fmt.Errorf("-dry_run is only supported for SwitchTraffic, ReverseTraffic and Complete, not for %s", originalAction) + } + } + + var dryRunResults *[]string startState := wf.CachedState() switch action { case vReplicationWorkflowActionShow: @@ -2207,11 +2216,11 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla } } case vReplicationWorkflowActionSwitchTraffic: - err = wf.SwitchTraffic(wrangler.DirectionForward) + dryRunResults, err = wf.SwitchTraffic(wrangler.DirectionForward) case vReplicationWorkflowActionReverseTraffic: - err = wf.ReverseTraffic() + dryRunResults, err = wf.ReverseTraffic() case vReplicationWorkflowActionComplete: - err = wf.Complete() + dryRunResults, err = wf.Complete() case vReplicationWorkflowActionCancel: err = wf.Cancel() case vReplicationWorkflowActionGetState: @@ -2224,6 +2233,13 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla log.Warningf(" %s error: %v", originalAction, wf) return wrapError(wf, err) } + if *dryRun { + if len(*dryRunResults) > 0 { + wr.Logger().Printf("Dry Run results for %s run at %s\nParameters: %s\n\n", time.RFC822, originalAction, strings.Join(args, " ")) + wr.Logger().Printf("%s\n", strings.Join(*dryRunResults, "\n")) + return nil + } + } wr.Logger().Printf("%s was successful\nStart State: %s\nCurrent State: %s\n\n", originalAction, startState, wf.CurrentState()) return nil diff --git a/go/vt/wrangler/switcher_dry_run.go b/go/vt/wrangler/switcher_dry_run.go index 9e6d756f366..9e1e101fc7c 100644 --- a/go/vt/wrangler/switcher_dry_run.go +++ b/go/vt/wrangler/switcher_dry_run.go @@ -68,7 +68,14 @@ func (dr *switcherDryRun) switchTableReads(ctx context.Context, cells []string, if direction == DirectionBackward { ks = dr.ts.sourceKeyspace } - dr.drLog.Log(fmt.Sprintf("Switch reads for tables %s to keyspace %s", strings.Join(dr.ts.tables, ","), ks)) + var tabletTypes []string + for _, servedType := range servedTypes { + tabletTypes = append(tabletTypes, servedType.String()) + } + tables := strings.Join(dr.ts.tables, ",") + dr.drLog.Log(fmt.Sprintf("Switch reads for tables %s to keyspace %s for tablet types %s", + tables, ks, strings.Join(tabletTypes, ","))) + dr.drLog.Log(fmt.Sprintf("Routing rules for tables %s will be updated", tables)) return nil } @@ -87,30 +94,11 @@ func (dr *switcherDryRun) allowTargetWrites(ctx context.Context) error { } func (dr *switcherDryRun) changeRouting(ctx context.Context) error { - rules, err := dr.ts.wr.getRoutingRules(ctx) - if err != nil { - return err - } dr.drLog.Log(fmt.Sprintf("Switch routing from keyspace %s to keyspace %s", dr.ts.sourceKeyspace, dr.ts.targetKeyspace)) - deleteLogs := make([]string, 0) - addLogs := make([]string, 0) + var deleteLogs, addLogs []string if dr.ts.migrationType == binlogdatapb.MigrationType_TABLES { - for _, table := range dr.ts.tables { - for _, tabletType := range []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY} { - tt := strings.ToLower(tabletType.String()) - deleteLogs = append(deleteLogs, fmt.Sprintf("\t%s => %s", table+"@"+tt, strings.Trim(rules[table+"@"+tt][0], "[]"))) - deleteLogs = append(deleteLogs, fmt.Sprintf("\t%s => %s", dr.ts.targetKeyspace+"."+table+"@"+tt, strings.Trim(rules[dr.ts.targetKeyspace+"."+table+"@"+tt][0], "[]"))) - deleteLogs = append(deleteLogs, fmt.Sprintf("\t%s => %s", dr.ts.sourceKeyspace+"."+table+"@"+tt, strings.Trim(rules[dr.ts.sourceKeyspace+"."+table+"@"+tt][0], "[]"))) - } - addLogs = append(addLogs, fmt.Sprintf("\t%s => %s", table, dr.ts.targetKeyspace+"."+table)) - addLogs = append(addLogs, fmt.Sprintf("\t%s => %s", dr.ts.sourceKeyspace+"."+table, dr.ts.targetKeyspace+"."+table)) - } - if len(deleteLogs) > 0 { - dr.drLog.Log("Following rules will be deleted:") - dr.drLog.LogSlice(deleteLogs) - dr.drLog.Log("Following rules will be added:") - dr.drLog.LogSlice(addLogs) - } + tables := strings.Join(dr.ts.tables, ",") + dr.drLog.Log(fmt.Sprintf("Routing rules for tables %s will be updated", tables)) return nil } deleteLogs = nil diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go index ba428fb66da..e5e40aaf79a 100644 --- a/go/vt/wrangler/traffic_switcher_test.go +++ b/go/vt/wrangler/traffic_switcher_test.go @@ -930,7 +930,8 @@ func TestTableMigrateOneToManyDryRun(t *testing.T) { wantdryRunReads := []string{ "Lock keyspace ks1", - "Switch reads for tables t1,t2 to keyspace ks2", + "Switch reads for tables t1,t2 to keyspace ks2 for tablet types RDONLY", + "Routing rules for tables t1,t2 will be updated", "Unlock keyspace ks1", } wantdryRunWrites := []string{ @@ -943,24 +944,7 @@ func TestTableMigrateOneToManyDryRun(t *testing.T) { "Create journal entries on source databases", "Enable writes on keyspace ks2 tables t1,t2", "Switch routing from keyspace ks1 to keyspace ks2", - "Following rules will be deleted:", - " ks1.t1@rdonly => ks2.t1", - " ks1.t1@replica => ks2.t1", - " ks1.t2@rdonly => ks2.t2", - " ks1.t2@replica => ks2.t2", - " ks2.t1@rdonly => ks2.t1", - " ks2.t1@replica => ks2.t1", - " ks2.t2@rdonly => ks2.t2", - " ks2.t2@replica => ks2.t2", - " t1@rdonly => ks2.t1", - " t1@replica => ks2.t1", - " t2@rdonly => ks2.t2", - " t2@replica => ks2.t2", - "Following rules will be added:", - " ks1.t1 => ks2.t1", - " ks1.t2 => ks2.t2", - " t1 => ks2.t1", - " t2 => ks2.t2", + "Routing rules for tables t1,t2 will be updated", "SwitchWrites completed, freeze and delete vreplication streams on:", " tablet 20", " tablet 30", diff --git a/go/vt/wrangler/workflow.go b/go/vt/wrangler/workflow.go index e297ab7453f..56dede198cb 100644 --- a/go/vt/wrangler/workflow.go +++ b/go/vt/wrangler/workflow.go @@ -225,32 +225,41 @@ func (vrw *VReplicationWorkflow) GetStreamCount() (int64, int64, []*WorkflowErro } // SwitchTraffic switches traffic forward for tablet_types passed -func (vrw *VReplicationWorkflow) SwitchTraffic(direction TrafficSwitchDirection) error { +func (vrw *VReplicationWorkflow) SwitchTraffic(direction TrafficSwitchDirection) (*[]string, error) { + var dryRunResults []string + var rdDryRunResults, wrDryRunResults *[]string + var err error if !vrw.Exists() { - return fmt.Errorf("workflow has not yet been started") + return nil, fmt.Errorf("workflow has not yet been started") } vrw.params.Direction = direction hasReplica, hasRdonly, hasMaster, err := vrw.parseTabletTypes() if err != nil { - return err + return nil, err } if hasReplica || hasRdonly { - if err := vrw.switchReads(); err != nil { - return err + if rdDryRunResults, err = vrw.switchReads(); err != nil { + return nil, err } } + if rdDryRunResults != nil { + dryRunResults = append(dryRunResults, *rdDryRunResults...) + } if hasMaster { - if err := vrw.switchWrites(); err != nil { - return err + if wrDryRunResults, err = vrw.switchWrites(); err != nil { + return nil, err } } - return nil + if wrDryRunResults != nil { + dryRunResults = append(dryRunResults, *wrDryRunResults...) + } + return &dryRunResults, nil } // ReverseTraffic switches traffic backwards for tablet_types passed -func (vrw *VReplicationWorkflow) ReverseTraffic() error { +func (vrw *VReplicationWorkflow) ReverseTraffic() (*[]string, error) { if !vrw.Exists() { - return fmt.Errorf("workflow has not yet been started") + return nil, fmt.Errorf("workflow has not yet been started") } return vrw.SwitchTraffic(DirectionBackward) } @@ -262,10 +271,10 @@ const ( ) // Complete cleans up a successful workflow -func (vrw *VReplicationWorkflow) Complete() error { +func (vrw *VReplicationWorkflow) Complete() (*[]string, error) { ws := vrw.ws if !ws.WritesSwitched || len(ws.ReplicaCellsNotSwitched) > 0 || len(ws.RdonlyCellsNotSwitched) > 0 { - return fmt.Errorf(ErrWorkflowNotFullySwitched) + return nil, fmt.Errorf(ErrWorkflowNotFullySwitched) } var renameTable TableRemovalType if vrw.params.RenameTables { @@ -273,11 +282,13 @@ func (vrw *VReplicationWorkflow) Complete() error { } else { renameTable = DropTable } - if _, err := vrw.wr.DropSources(vrw.ctx, vrw.ws.TargetKeyspace, vrw.ws.Workflow, renameTable, vrw.params.KeepData, - false, false); err != nil { - return err + var dryRunResults *[]string + var err error + if dryRunResults, err = vrw.wr.DropSources(vrw.ctx, vrw.ws.TargetKeyspace, vrw.ws.Workflow, renameTable, vrw.params.KeepData, + false, vrw.params.DryRun); err != nil { + return nil, err } - return nil + return dryRunResults, nil } // Cancel deletes all artifacts from a workflow which has not yet been switched @@ -347,7 +358,7 @@ func (vrw *VReplicationWorkflow) initReshard() error { vrw.params.TargetShards, vrw.params.SkipSchemaCopy, vrw.params.Cells, vrw.params.TabletTypes) } -func (vrw *VReplicationWorkflow) switchReads() error { +func (vrw *VReplicationWorkflow) switchReads() (*[]string, error) { log.Infof("In VReplicationWorkflow.switchReads() for %+v", vrw) var tabletTypes []topodatapb.TabletType for _, tt := range vrw.getTabletTypes() { @@ -355,16 +366,20 @@ func (vrw *VReplicationWorkflow) switchReads() error { tabletTypes = append(tabletTypes, tt) } } - - _, err := vrw.wr.SwitchReads(vrw.ctx, vrw.params.TargetKeyspace, vrw.params.Workflow, tabletTypes, - vrw.getCellsAsArray(), vrw.params.Direction, false) + var dryRunResults *[]string + var err error + dryRunResults, err = vrw.wr.SwitchReads(vrw.ctx, vrw.params.TargetKeyspace, vrw.params.Workflow, tabletTypes, + vrw.getCellsAsArray(), vrw.params.Direction, vrw.params.DryRun) if err != nil { - return err + return nil, err } - return nil + return dryRunResults, nil } -func (vrw *VReplicationWorkflow) switchWrites() error { +func (vrw *VReplicationWorkflow) switchWrites() (*[]string, error) { + var journalID int64 + var dryRunResults *[]string + var err error log.Infof("In VReplicationWorkflow.switchWrites() for %+v", vrw) if vrw.params.Direction == DirectionBackward { keyspace := vrw.params.SourceKeyspace @@ -373,13 +388,13 @@ func (vrw *VReplicationWorkflow) switchWrites() error { vrw.params.Workflow = reverseName(vrw.params.Workflow) log.Infof("In VReplicationWorkflow.switchWrites(reverse) for %+v", vrw) } - journalID, _, err := vrw.wr.SwitchWrites(vrw.ctx, vrw.params.TargetKeyspace, vrw.params.Workflow, vrw.params.Timeout, - false, vrw.params.Direction == DirectionBackward, vrw.params.EnableReverseReplication, false) + journalID, dryRunResults, err = vrw.wr.SwitchWrites(vrw.ctx, vrw.params.TargetKeyspace, vrw.params.Workflow, vrw.params.Timeout, + false, vrw.params.Direction == DirectionBackward, vrw.params.EnableReverseReplication, vrw.params.DryRun) if err != nil { - return err + return nil, err } log.Infof("switchWrites succeeded with journal id %s", journalID) - return nil + return dryRunResults, nil } // endregion diff --git a/go/vt/wrangler/workflow_test.go b/go/vt/wrangler/workflow_test.go index 0d236480db2..a9f3ebd5651 100644 --- a/go/vt/wrangler/workflow_test.go +++ b/go/vt/wrangler/workflow_test.go @@ -50,12 +50,16 @@ func getMoveTablesWorkflow(t *testing.T, cells, tabletTypes string) *VReplicatio return mtwf } +func testComplete(t *testing.T, vrwf *VReplicationWorkflow) error { + _, err := vrwf.Complete() + return err +} func TestReshardingWorkflowErrorsAndMisc(t *testing.T) { mtwf := getMoveTablesWorkflow(t, "cell1,cell2", "replica,rdonly") require.False(t, mtwf.Exists()) mtwf.ws = &workflowState{} require.True(t, mtwf.Exists()) - require.Errorf(t, mtwf.Complete(), ErrWorkflowNotFullySwitched) + require.Errorf(t, testComplete(t, mtwf), ErrWorkflowNotFullySwitched) mtwf.ws.WritesSwitched = true require.Errorf(t, mtwf.Cancel(), ErrWorkflowPartiallySwitched) @@ -165,12 +169,12 @@ func TestMoveTablesV2(t *testing.T) { tme.expectNoPreviousJournals() expectMoveTablesQueries(t, tme) tme.expectNoPreviousJournals() - require.NoError(t, wf.SwitchTraffic(DirectionForward)) + require.NoError(t, testSwitchForward(t, wf)) require.Equal(t, WorkflowStateAllSwitched, wf.CurrentState()) tme.expectNoPreviousJournals() tme.expectNoPreviousReverseJournals() - require.NoError(t, wf.ReverseTraffic()) + require.NoError(t, testReverse(t, wf)) require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState()) } @@ -211,7 +215,7 @@ func TestMoveTablesV2Complete(t *testing.T) { tme.expectNoPreviousJournals() expectMoveTablesQueries(t, tme) tme.expectNoPreviousJournals() - require.NoError(t, wf.SwitchTraffic(DirectionForward)) + require.NoError(t, testSwitchForward(t, wf)) require.Equal(t, WorkflowStateAllSwitched, wf.CurrentState()) //16 rules, 8 per table t1,t2 eg: t1,t1@replica,t1@rdonly,ks1.t1,ks1.t1@replica,ks1.t1@rdonly,ks2.t1@replica,ks2.t1@rdonly @@ -220,7 +224,7 @@ func TestMoveTablesV2Complete(t *testing.T) { require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks1", "t2")) require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks2", "t1")) require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks2", "t2")) - require.NoError(t, wf.Complete()) + require.NoError(t, testComplete(t, wf)) require.False(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks1", "t1")) require.False(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks1", "t2")) require.True(t, checkIfTableExistInVSchema(ctx, t, wf.wr.ts, "ks2", "t1")) @@ -229,6 +233,16 @@ func TestMoveTablesV2Complete(t *testing.T) { validateRoutingRuleCount(ctx, t, wf.wr.ts, 0) } +func testSwitchForward(t *testing.T, wf *VReplicationWorkflow) error { + _, err := wf.SwitchTraffic(DirectionForward) + return err +} + +func testReverse(t *testing.T, wf *VReplicationWorkflow) error { + _, err := wf.ReverseTraffic() + return err +} + func TestMoveTablesV2Partial(t *testing.T) { ctx := context.Background() p := &VReplicationWorkflowParams{ @@ -252,36 +266,36 @@ func TestMoveTablesV2Partial(t *testing.T) { tme.expectNoPreviousJournals() wf.params.TabletTypes = "replica" wf.params.Cells = "cell1" - require.NoError(t, wf.SwitchTraffic(DirectionForward)) + require.NoError(t, testSwitchForward(t, wf)) require.Equal(t, "Reads partially switched. Replica switched in cells: cell1. Rdonly not switched. Writes Not Switched", wf.CurrentState()) tme.expectNoPreviousJournals() wf.params.TabletTypes = "replica" wf.params.Cells = "cell2" - require.NoError(t, wf.SwitchTraffic(DirectionForward)) + require.NoError(t, testSwitchForward(t, wf)) require.Equal(t, "Reads partially switched. All Replica Reads Switched. Rdonly not switched. Writes Not Switched", wf.CurrentState()) tme.expectNoPreviousJournals() wf.params.TabletTypes = "rdonly" wf.params.Cells = "cell1,cell2" - require.NoError(t, wf.SwitchTraffic(DirectionForward)) + require.NoError(t, testSwitchForward(t, wf)) require.Equal(t, WorkflowStateReadsSwitched, wf.CurrentState()) tme.expectNoPreviousJournals() wf.params.TabletTypes = "replica,rdonly" - require.NoError(t, wf.SwitchTraffic(DirectionBackward)) + require.NoError(t, testReverse(t, wf)) require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState()) tme.expectNoPreviousJournals() wf.params.TabletTypes = "rdonly" wf.params.Cells = "cell1" - require.NoError(t, wf.SwitchTraffic(DirectionForward)) + require.NoError(t, testSwitchForward(t, wf)) require.Equal(t, "Reads partially switched. Replica not switched. Rdonly switched in cells: cell1. Writes Not Switched", wf.CurrentState()) tme.expectNoPreviousJournals() wf.params.TabletTypes = "rdonly" wf.params.Cells = "cell2" - require.NoError(t, wf.SwitchTraffic(DirectionForward)) + require.NoError(t, testSwitchForward(t, wf)) require.Equal(t, "Reads partially switched. Replica not switched. All Rdonly Reads Switched. Writes Not Switched", wf.CurrentState()) } @@ -345,9 +359,9 @@ func TestReshardV2(t *testing.T) { tme.expectNoPreviousJournals() expectReshardQueries(t, tme) tme.expectNoPreviousJournals() - require.NoError(t, wf.SwitchTraffic(DirectionForward)) + require.NoError(t, testSwitchForward(t, wf)) require.Equal(t, WorkflowStateAllSwitched, wf.CurrentState()) - require.NoError(t, wf.Complete()) + require.NoError(t, testComplete(t, wf)) si, err := wf.wr.ts.GetShard(ctx, "ks", "-40") require.Contains(t, err.Error(), "node doesn't exist") require.Nil(t, si)