From 441a085c7370bdc4ca691f1fd63c60fa4195a2ea Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sat, 9 Jan 2021 09:18:54 +0100 Subject: [PATCH 1/8] Add master tablet type in addition to replica as default source tablet types Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletmanager/vreplication/engine.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index c507ee70f3c..406b49e8e55 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -72,7 +72,9 @@ func init() { withDDL = withddl.New(allddls) } -var tabletTypesStr = flag.String("vreplication_tablet_type", "REPLICA", "comma separated list of tablet types used as a source") +// this are the default tablet_types that will be used by the tablet picker to find sources for a vreplication stream +// it can be overridden by passing a different list to the MoveTables or Reshard commands +var tabletTypesStr = flag.String("vreplication_tablet_type", "MASTER,REPLICA", "comma separated list of tablet types used as a source") // waitRetryTime can be changed to a smaller value for tests. // A VReplication stream can be created by sending an insert statement From a31e3ada4bf76dd58b86e50f558ca515b14a0c3a Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sat, 9 Jan 2021 10:45:13 +0100 Subject: [PATCH 2/8] Add master tablet type in addition to replica as default source tablet types Signed-off-by: Rohit Nayak --- go.mod | 1 + go.sum | 2 + .../resharding_workflows_v2_test.go | 2 +- .../vreplication/vreplication_test.go | 4 +- go/vt/vtctl/vtctl.go | 1 + go/vt/vtgate/discoverygateway_test.go | 1 - go/vt/wrangler/traffic_switcher.go | 59 ++++++++++++++++++- go/vt/wrangler/traffic_switcher_test.go | 33 +++++------ go/vt/wrangler/workflow_test.go | 11 ++-- 9 files changed, 85 insertions(+), 29 deletions(-) diff --git a/go.mod b/go.mod index 4cb9a6c4e82..f074e7dbfc2 100644 --- a/go.mod +++ b/go.mod @@ -62,6 +62,7 @@ require ( github.com/martini-contrib/render v0.0.0-20150707142108-ec18f8345a11 github.com/mattn/go-sqlite3 v1.14.0 github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1 + github.com/mitchellh/go-ps v1.0.0 // indirect github.com/mitchellh/go-testing-interface v1.14.0 // indirect github.com/mitchellh/mapstructure v1.2.3 // indirect github.com/montanaflynn/stats v0.6.3 diff --git a/go.sum b/go.sum index 50b907dda98..efcf79e5849 100644 --- a/go.sum +++ b/go.sum @@ -488,6 +488,8 @@ github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXx github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/go-ps v1.0.0 h1:i6ampVEEF4wQFF+bkYfwYgY+F/uYJDktmvLPf7qIgjc= +github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg= github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/go-testing-interface v1.14.0 h1:/x0XQ6h+3U3nAyk1yx+bHPURrKa9sVVvYbuqZ7pIAtI= github.com/mitchellh/go-testing-interface v1.14.0/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8= diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index ccc287d132a..ba9a5a88112 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -523,7 +523,7 @@ func moveCustomerTableSwitchFlows(t *testing.T, cells []*Cell, sourceCellOrAlias switchWrites(t, ksWorkflow, false) validateWritesRouteToTarget(t) - switchWrites(t, ksWorkflow, true) + switchWrites(t, reverseKsWorkflow, true) validateWritesRouteToSource(t) validateReadsRouteToSource(t, "replica") diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index c6f722c5954..dde5228f238 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -637,7 +637,9 @@ func switchReadsDryRun(t *testing.T, cells, ksWorkflow string, dryRunResults []s } func switchReads(t *testing.T, cells, ksWorkflow string) { - output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cells, "-tablet_type=rdonly", ksWorkflow) + var output string + var err error + output, err = vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cells, "-tablet_type=rdonly", ksWorkflow) require.NoError(t, err, fmt.Sprintf("SwitchReads Error: %s: %s", err, output)) output, err = vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cells, "-tablet_type=replica", ksWorkflow) require.NoError(t, err, fmt.Sprintf("SwitchReads Error: %s: %s", err, output)) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 2d43dca3ba9..c2077f67eca 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -2200,6 +2200,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla case progress := <-progressCh: if progress.running == progress.total { wr.Logger().Printf("\nWorkflow started successfully with %d stream(s)\n", progress.total) + printDetails() return nil } wr.Logger().Printf("%d%% ... ", 100*progress.running/progress.total) diff --git a/go/vt/vtgate/discoverygateway_test.go b/go/vt/vtgate/discoverygateway_test.go index c91bc088b68..7acb8741f80 100644 --- a/go/vt/vtgate/discoverygateway_test.go +++ b/go/vt/vtgate/discoverygateway_test.go @@ -171,7 +171,6 @@ func TestDiscoveryGatewayWaitForTablets(t *testing.T) { }, }, } - dg := NewDiscoveryGateway(context.Background(), hc, srvTopo, "local", 2) // replica should only use local ones diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index e1fdb4aa6a9..d669f77ca41 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -25,6 +25,7 @@ import ( "strings" "sync" "time" + "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vtgate/evalengine" @@ -346,6 +347,39 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl return ts, ws, nil } +func (wr *Wrangler) doCellsHaveRdonlyTablets(ctx context.Context, cells []string) (bool, error) { + areAnyRdonly := func(tablets []*topo.TabletInfo) bool { + for _, tablet := range tablets { + if tablet.Type == topodatapb.TabletType_RDONLY { + return true + } + } + return false + } + + if len(cells) == 0 { + tablets, err := topotools.GetAllTabletsAcrossCells(ctx, wr.ts) + if err != nil { + return false, err + } + if areAnyRdonly(tablets) { + return true, nil + } + + } else { + for _, cell := range cells { + tablets, err := topotools.GetAllTablets(ctx, wr.ts, cell) + if err != nil { + return false, err + } + if areAnyRdonly(tablets) { + return true, nil + } + } + } + return false, nil +} + // SwitchReads is a generic way of switching read traffic for a resharding workflow. func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflow string, servedTypes []topodatapb.TabletType, cells []string, direction TrafficSwitchDirection, dryRun bool) (*[]string, error) { @@ -360,7 +394,8 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflow st wr.Logger().Errorf(errorMsg) return nil, fmt.Errorf(errorMsg) } - wr.Logger().Infof("SwitchReads: %s.%s tt %+v, cells %+v, workflow state: %+v", targetKeyspace, workflow, servedTypes, cells, ws) + log.Infof("SwitchReads: %s.%s tt %+v, cells %+v, workflow state: %+v", targetKeyspace, workflow, servedTypes, cells, ws) + var switchReplicas, switchRdonly bool for _, servedType := range servedTypes { if servedType != topodatapb.TabletType_REPLICA && servedType != topodatapb.TabletType_RDONLY { return nil, fmt.Errorf("tablet type must be REPLICA or RDONLY: %v", servedType) @@ -371,6 +406,26 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflow st if direction == DirectionBackward && servedType == topodatapb.TabletType_RDONLY && len(ws.RdonlyCellsSwitched) == 0 { return nil, fmt.Errorf("requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched") } + switch servedType { + case topodatapb.TabletType_REPLICA: + switchReplicas = true + case topodatapb.TabletType_RDONLY: + switchRdonly = true + } + } + + // if there are no rdonly tablets in the cells ask to switch rdonly tablets as well so that routing rules + // are updated for rdonly as well. Otherwise vitess will not know that the workflow has completed and will + // incorrectly report that not all reads have been switched. User currently is forced to switch non-existent rdonly tablets + if switchReplicas && !switchRdonly { + var err error + rdonlyTabletsExist, err := wr.doCellsHaveRdonlyTablets(ctx, cells) + if err != nil { + return nil, err + } + if !rdonlyTabletsExist { + servedTypes = append(servedTypes, topodatapb.TabletType_RDONLY) + } } // If journals exist notify user and fail @@ -380,7 +435,7 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflow st return nil, err } if journalsExist { - wr.Logger().Errorf("Found a previous journal entry for %d", ts.id) + log.Infof("Found a previous journal entry for %d", ts.id) } var sw iswitcher if dryRun { diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go index 5760220a8b8..092ce43fd18 100644 --- a/go/vt/wrangler/traffic_switcher_test.go +++ b/go/vt/wrangler/traffic_switcher_test.go @@ -177,12 +177,12 @@ func TestTableMigrateMainflow(t *testing.T) { "ks2.t1": {"ks1.t1"}, "t2": {"ks1.t2"}, "ks2.t2": {"ks1.t2"}, - "t1@rdonly": {"ks2.t1"}, - "ks2.t1@rdonly": {"ks2.t1"}, - "ks1.t1@rdonly": {"ks2.t1"}, - "t2@rdonly": {"ks2.t2"}, - "ks2.t2@rdonly": {"ks2.t2"}, - "ks1.t2@rdonly": {"ks2.t2"}, + "t1@rdonly": {"ks1.t1"}, + "ks2.t1@rdonly": {"ks1.t1"}, + "ks1.t1@rdonly": {"ks1.t1"}, + "t2@rdonly": {"ks1.t2"}, + "ks2.t2@rdonly": {"ks1.t2"}, + "ks1.t2@rdonly": {"ks1.t2"}, "t1@replica": {"ks1.t1"}, "ks2.t1@replica": {"ks1.t1"}, "ks1.t1@replica": {"ks1.t1"}, @@ -526,10 +526,10 @@ func TestShardMigrateMainflow(t *testing.T) { checkCellServedTypes(t, tme.ts, "ks:40-", "cell1", 2) checkCellServedTypes(t, tme.ts, "ks:-80", "cell1", 1) checkCellServedTypes(t, tme.ts, "ks:80-", "cell1", 1) - checkCellServedTypes(t, tme.ts, "ks:-40", "cell2", 2) - checkCellServedTypes(t, tme.ts, "ks:40-", "cell2", 2) - checkCellServedTypes(t, tme.ts, "ks:-80", "cell2", 1) - checkCellServedTypes(t, tme.ts, "ks:80-", "cell2", 1) + checkCellServedTypes(t, tme.ts, "ks:-40", "cell2", 1) + checkCellServedTypes(t, tme.ts, "ks:40-", "cell2", 1) + checkCellServedTypes(t, tme.ts, "ks:-80", "cell2", 2) + checkCellServedTypes(t, tme.ts, "ks:80-", "cell2", 2) verifyQueries(t, tme.allDBClients) tme.expectNoPreviousJournals() @@ -1764,7 +1764,7 @@ func checkCellRouting(t *testing.T, wr *Wrangler, cell string, want map[string][ got[rr.FromTable] = append(got[rr.FromTable], rr.ToTables...) } if !reflect.DeepEqual(got, want) { - t.Errorf("srv rules for cell %s:\n%v, want\n%v", cell, got, want) + t.Fatalf("ERROR: routing rules don't match for cell %s:got\n%v, want\n%v", cell, got, want) } } @@ -1799,10 +1799,8 @@ func checkServedTypes(t *testing.T, ts *topo.Server, keyspaceShard string, want if err != nil { t.Fatal(err) } - - if len(servedTypes) != want { - t.Errorf("shard %v has wrong served types: got: %v, want: %v", keyspaceShard, len(servedTypes), want) - } + require.Equal(t, want, len(servedTypes), fmt.Sprintf("shard %v has wrong served types: got: %v, want: %v", + keyspaceShard, len(servedTypes), want)) } func checkCellServedTypes(t *testing.T, ts *topo.Server, keyspaceShard, cell string, want int) { @@ -1823,9 +1821,8 @@ outer: } } } - if count != want { - t.Errorf("serving types for keyspaceShard %s, cell %s: %d, want %d", keyspaceShard, cell, count, want) - } + require.Equal(t, want, count, fmt.Sprintf("serving types for keyspaceShard %s, cell %s: %d, want %d", + keyspaceShard, cell, count, want)) } func checkIsMasterServing(t *testing.T, ts *topo.Server, keyspaceShard string, want bool) { diff --git a/go/vt/wrangler/workflow_test.go b/go/vt/wrangler/workflow_test.go index a9f3ebd5651..9e1c5c9cd4c 100644 --- a/go/vt/wrangler/workflow_test.go +++ b/go/vt/wrangler/workflow_test.go @@ -264,19 +264,19 @@ func TestMoveTablesV2Partial(t *testing.T) { expectMoveTablesQueries(t, tme) tme.expectNoPreviousJournals() - wf.params.TabletTypes = "replica" + wf.params.TabletTypes = "rdonly" wf.params.Cells = "cell1" require.NoError(t, testSwitchForward(t, wf)) require.Equal(t, "Reads partially switched. Replica switched in cells: cell1. Rdonly not switched. Writes Not Switched", wf.CurrentState()) tme.expectNoPreviousJournals() - wf.params.TabletTypes = "replica" + wf.params.TabletTypes = "rdonly" wf.params.Cells = "cell2" require.NoError(t, testSwitchForward(t, wf)) require.Equal(t, "Reads partially switched. All Replica Reads Switched. Rdonly not switched. Writes Not Switched", wf.CurrentState()) tme.expectNoPreviousJournals() - wf.params.TabletTypes = "rdonly" + wf.params.TabletTypes = "replica" wf.params.Cells = "cell1,cell2" require.NoError(t, testSwitchForward(t, wf)) require.Equal(t, WorkflowStateReadsSwitched, wf.CurrentState()) @@ -287,17 +287,16 @@ func TestMoveTablesV2Partial(t *testing.T) { require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState()) tme.expectNoPreviousJournals() - wf.params.TabletTypes = "rdonly" + wf.params.TabletTypes = "replica" wf.params.Cells = "cell1" require.NoError(t, testSwitchForward(t, wf)) require.Equal(t, "Reads partially switched. Replica not switched. Rdonly switched in cells: cell1. Writes Not Switched", wf.CurrentState()) tme.expectNoPreviousJournals() - wf.params.TabletTypes = "rdonly" + wf.params.TabletTypes = "replica" wf.params.Cells = "cell2" require.NoError(t, testSwitchForward(t, wf)) require.Equal(t, "Reads partially switched. Replica not switched. All Rdonly Reads Switched. Writes Not Switched", wf.CurrentState()) - } func TestMoveTablesV2Cancel(t *testing.T) { From 6dc481e007b35887e8a9e2d74596d8325a09583a Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sat, 9 Jan 2021 11:18:14 +0100 Subject: [PATCH 3/8] gofmted Signed-off-by: Rohit Nayak --- go/vt/wrangler/traffic_switcher.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index d669f77ca41..d380fd78835 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -25,6 +25,7 @@ import ( "strings" "sync" "time" + "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vtgate/evalengine" From 02ad0eb849598530694ec43dde7039569de74127 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sat, 9 Jan 2021 16:21:48 +0100 Subject: [PATCH 4/8] Rename workflow Start to Create Signed-off-by: Rohit Nayak --- .../resharding_workflows_v2_test.go | 18 +++++++++--------- go/vt/vtctl/vtctl.go | 10 +++++----- go/vt/wrangler/workflow.go | 16 ++++++++-------- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index ba9a5a88112..9541c154730 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -43,7 +43,7 @@ const ( ) const ( - workflowActionStart = "Start" + workflowActionCreate = "Create" workflowActionSwitchTraffic = "SwitchTraffic" workflowActionReverseTraffic = "ReverseTraffic" workflowActionComplete = "Complete" @@ -58,9 +58,9 @@ var ( currentWorkflowType wrangler.VReplicationWorkflowType ) -func reshard2Start(t *testing.T, sourceShards, targetShards string) error { +func createReshardWorkflow(t *testing.T, sourceShards, targetShards string) error { err := tstWorkflowExec(t, defaultCellName, workflowName, targetKs, targetKs, - "", workflowActionStart, "", sourceShards, targetShards) + "", workflowActionCreate, "", sourceShards, targetShards) require.NoError(t, err) time.Sleep(1 * time.Second) catchup(t, targetTab1, workflowName, "Reshard") @@ -69,12 +69,12 @@ func reshard2Start(t *testing.T, sourceShards, targetShards string) error { return nil } -func moveTables2Start(t *testing.T, tables string) error { +func createMoveTablesWorkflow(t *testing.T, tables string) error { if tables == "" { tables = tablesToMove } err := tstWorkflowExec(t, defaultCellName, workflowName, sourceKs, targetKs, - tables, workflowActionStart, "", "", "") + tables, workflowActionCreate, "", "", "") require.NoError(t, err) catchup(t, targetTab1, workflowName, "MoveTables") catchup(t, targetTab2, workflowName, "MoveTables") @@ -96,7 +96,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, } args = append(args, "-v2") switch action { - case workflowActionStart: + case workflowActionCreate: if currentWorkflowType == wrangler.MoveTablesWorkflow { args = append(args, "-source", sourceKs, "-tables", tables) } else { @@ -244,7 +244,7 @@ func testReshardV2Workflow(t *testing.T) { currentWorkflowType = wrangler.ReshardWorkflow createAdditionalCustomerShards(t, "-40,40-80,80-c0,c0-") - reshard2Start(t, "-80,80-", "-40,40-80,80-c0,c0-") + createReshardWorkflow(t, "-80,80-", "-40,40-80,80-c0,c0-") if !strings.Contains(lastOutput, "Workflow started successfully") { t.Fail() } @@ -259,7 +259,7 @@ func testMoveTablesV2Workflow(t *testing.T) { // test basic forward and reverse flows setupCustomerKeyspace(t) - moveTables2Start(t, "customer") + createMoveTablesWorkflow(t, "customer") if !strings.Contains(lastOutput, "Workflow started successfully") { t.Fail() } @@ -272,7 +272,7 @@ func testMoveTablesV2Workflow(t *testing.T) { output, _ := vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...) require.Contains(t, output, "No workflows found in keyspace customer") - moveTables2Start(t, "customer2") + createMoveTablesWorkflow(t, "customer2") output, _ = vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...) require.Contains(t, output, "Following workflow(s) found in keyspace customer: wf1") diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index c2077f67eca..fd05c15c3e5 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -1954,7 +1954,7 @@ func commandMoveTables(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla type VReplicationWorkflowAction string const ( - vReplicationWorkflowActionStart = "start" + vReplicationWorkflowActionCreate = "create" vReplicationWorkflowActionSwitchTraffic = "switchtraffic" vReplicationWorkflowActionReverseTraffic = "reversetraffic" vReplicationWorkflowActionComplete = "complete" @@ -2052,7 +2052,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla originalAction := action action = strings.ToLower(action) // allow users to input action in a case-insensitive manner switch action { - case vReplicationWorkflowActionStart: + case vReplicationWorkflowActionCreate: switch workflowType { case wrangler.MoveTablesWorkflow: if *sourceKeyspace == "" { @@ -2105,7 +2105,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla log.Warningf("NewVReplicationWorkflow returned error %+v", wf) return err } - if !wf.Exists() && action != vReplicationWorkflowActionStart { + if !wf.Exists() && action != vReplicationWorkflowActionCreate { return fmt.Errorf("workflow %s does not exist", ksWorkflow) } @@ -2154,8 +2154,8 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla return printDetails() case vReplicationWorkflowActionProgress: return printCopyProgress() - case vReplicationWorkflowActionStart: - err = wf.Start() + case vReplicationWorkflowActionCreate: + err = wf.Create() if err != nil { return err } diff --git a/go/vt/wrangler/workflow.go b/go/vt/wrangler/workflow.go index 56dede198cb..24a110653c5 100644 --- a/go/vt/wrangler/workflow.go +++ b/go/vt/wrangler/workflow.go @@ -26,7 +26,7 @@ const ( // Workflow state display strings const ( - WorkflowStateNotStarted = "Not Started" + WorkflowStateNotCreated = "Not Created" WorkflowStateNotSwitched = "Reads Not Switched. Writes Not Switched" WorkflowStateReadsSwitched = "All Reads Switched. Writes Not Switched" WorkflowStateWritesSwitched = "Reads Not Switched. Writes Switched" @@ -82,7 +82,7 @@ func (wr *Wrangler) NewVReplicationWorkflow(ctx context.Context, workflowType VR return nil, err } log.Infof("Workflow state is %+v", ws) - if ts != nil { //Other than on Start we need to get SourceKeyspace from the workflow + if ts != nil { //Other than on create we need to get SourceKeyspace from the workflow vrw.params.TargetKeyspace = ts.targetKeyspace vrw.params.Workflow = ts.workflow vrw.params.SourceKeyspace = ts.sourceKeyspace @@ -120,7 +120,7 @@ func (vrw *VReplicationWorkflow) stateAsString(ws *workflowState) string { var stateInfo []string s := "" if !vrw.Exists() { - stateInfo = append(stateInfo, WorkflowStateNotStarted) + stateInfo = append(stateInfo, WorkflowStateNotCreated) } else { if len(ws.RdonlyCellsNotSwitched) == 0 && len(ws.ReplicaCellsNotSwitched) == 0 && len(ws.ReplicaCellsSwitched) > 0 { s = "All Reads Switched" @@ -155,14 +155,14 @@ func (vrw *VReplicationWorkflow) stateAsString(ws *workflowState) string { return strings.Join(stateInfo, ". ") } -// Start initiates a workflow -func (vrw *VReplicationWorkflow) Start() error { +// Create initiates a workflow +func (vrw *VReplicationWorkflow) Create() error { var err error if vrw.Exists() { - return fmt.Errorf("workflow already exists found") + return fmt.Errorf("workflow already exists") } - if vrw.CachedState() != WorkflowStateNotStarted { - return fmt.Errorf("workflow has already been started, state is %s", vrw.CachedState()) + if vrw.CachedState() != WorkflowStateNotCreated { + return fmt.Errorf("workflow has already been created, state is %s", vrw.CachedState()) } switch vrw.workflowType { case MoveTablesWorkflow: From 58b00345c0628d3c44b16e243a32812e0293569b Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 14 Jan 2021 23:13:33 +0100 Subject: [PATCH 5/8] Check that workflow is not in copy phase when switching traffic Signed-off-by: Rohit Nayak --- go/vt/vtctl/vtctl.go | 5 ++++- go/vt/wrangler/workflow.go | 33 ++++++++++++++++++++++++++++++++- go/vt/wrangler/workflow_test.go | 18 ++++++++++++++++-- 3 files changed, 52 insertions(+), 4 deletions(-) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index fd05c15c3e5..05f7a10e8b5 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -2085,6 +2085,9 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla case vReplicationWorkflowActionSwitchTraffic, vReplicationWorkflowActionReverseTraffic: vrwp.Cells = *cells vrwp.TabletTypes = *tabletTypes + if vrwp.TabletTypes == "" { + vrwp.TabletTypes = "master,replica,rdonly" + } vrwp.Timeout = *timeout vrwp.EnableReverseReplication = *reverseReplication case vReplicationWorkflowActionCancel: @@ -2117,7 +2120,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla if copyProgress == nil { wr.Logger().Printf("\nCopy Completed.\n") } else { - wr.Logger().Printf("\nCopy Progress (approx.):\n") + wr.Logger().Printf("\nCopy Progress (approx):\n") var tables []string for table := range *copyProgress { tables = append(tables, table) diff --git a/go/vt/wrangler/workflow.go b/go/vt/wrangler/workflow.go index 24a110653c5..c1cf3469bdb 100644 --- a/go/vt/wrangler/workflow.go +++ b/go/vt/wrangler/workflow.go @@ -228,12 +228,24 @@ func (vrw *VReplicationWorkflow) GetStreamCount() (int64, int64, []*WorkflowErro func (vrw *VReplicationWorkflow) SwitchTraffic(direction TrafficSwitchDirection) (*[]string, error) { var dryRunResults []string var rdDryRunResults, wrDryRunResults *[]string + var isCopyInProgress bool var err error + var hasReplica, hasRdonly, hasMaster bool + if !vrw.Exists() { return nil, fmt.Errorf("workflow has not yet been started") } + + isCopyInProgress, err = vrw.IsCopyInProgress() + if err != nil { + return nil, err + } + if isCopyInProgress { + return nil, fmt.Errorf("cannot switch traffic at this time, copy is still in progress for this workflow") + } + vrw.params.Direction = direction - hasReplica, hasRdonly, hasMaster, err := vrw.parseTabletTypes() + hasReplica, hasRdonly, hasMaster, err = vrw.parseTabletTypes() if err != nil { return nil, err } @@ -410,6 +422,25 @@ type TableCopyProgress struct { // CopyProgress stores the TableCopyProgress for all tables still being copied type CopyProgress map[string]*TableCopyProgress +// IsCopyInProgress returns true if any table remains to be copied +func (vrw *VReplicationWorkflow) IsCopyInProgress() (bool, error) { + ctx := context.Background() + getTablesQuery := "select 1 from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = %d" + for _, target := range vrw.ts.targets { + for id := range target.sources { + query := fmt.Sprintf(getTablesQuery, id) + p3qr, err := vrw.wr.tmc.ExecuteFetchAsDba(ctx, target.master.Tablet, true, []byte(query), 1, false, false) + if err != nil { + return false, err + } + if len(p3qr.Rows) > 0 { + return true, nil + } + } + } + return false, nil +} + // GetCopyProgress returns the progress of all tables being copied in the workflow func (vrw *VReplicationWorkflow) GetCopyProgress() (*CopyProgress, error) { ctx := context.Background() diff --git a/go/vt/wrangler/workflow_test.go b/go/vt/wrangler/workflow_test.go index 9e1c5c9cd4c..95f2c61fb36 100644 --- a/go/vt/wrangler/workflow_test.go +++ b/go/vt/wrangler/workflow_test.go @@ -104,8 +104,9 @@ func TestCopyProgress(t *testing.T) { expectCopyProgressQueries(t, tme) - cp, err2 := wf.GetCopyProgress() - require.NoError(t, err2) + var cp *CopyProgress + cp, err = wf.GetCopyProgress() + require.NoError(t, err) log.Infof("CopyProgress is %+v,%+v", (*cp)["t1"], (*cp)["t2"]) require.Equal(t, int64(800), (*cp)["t1"].SourceRowCount) @@ -117,6 +118,11 @@ func TestCopyProgress(t *testing.T) { require.Equal(t, int64(400), (*cp)["t2"].TargetRowCount) require.Equal(t, int64(4000), (*cp)["t2"].SourceTableSize) require.Equal(t, int64(1000), (*cp)["t2"].TargetTableSize) + + var isCopyInProgress bool + isCopyInProgress, err = wf.IsCopyInProgress() + require.NoError(t, err) + require.True(t, isCopyInProgress) } func expectCopyProgressQueries(t *testing.T, tme *testMigraterEnv) { @@ -147,6 +153,14 @@ func expectCopyProgressQueries(t *testing.T, tme *testMigraterEnv) { "t2|1000|2000") db.AddQuery(query, result) + for _, id := range []int{1, 2} { + query = fmt.Sprintf("select 1 from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = %d", id) + result = sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "dummy", + "int64"), + "1") + db.AddQuery(query, result) + } } func TestMoveTablesV2(t *testing.T) { From 70ec3c5e38f3a3adc60eae1776e037313a8e7900 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 15 Jan 2021 00:05:04 +0100 Subject: [PATCH 6/8] Fix tests Signed-off-by: Rohit Nayak --- go/vt/wrangler/traffic_switcher_env_test.go | 5 +++-- go/vt/wrangler/workflow_test.go | 9 ++++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index b71dab625a1..2c8531b4e9a 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -242,10 +242,11 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe tme.wr = New(logutil.NewConsoleLogger(), tme.ts, tmclient.NewTabletManagerClient()) tme.sourceShards = sourceShards tme.targetShards = targetShards + tme.tmeDB = fakesqldb.New(t) tabletID := 10 for _, shard := range sourceShards { - tme.sourceMasters = append(tme.sourceMasters, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_MASTER, nil, TabletKeyspaceShard(t, "ks", shard))) + tme.sourceMasters = append(tme.sourceMasters, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_MASTER, tme.tmeDB, TabletKeyspaceShard(t, "ks", shard))) tabletID += 10 _, sourceKeyRange, err := topo.ValidateShardName(shard) @@ -261,7 +262,7 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe } for _, shard := range targetShards { - tme.targetMasters = append(tme.targetMasters, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_MASTER, nil, TabletKeyspaceShard(t, "ks", shard))) + tme.targetMasters = append(tme.targetMasters, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_MASTER, tme.tmeDB, TabletKeyspaceShard(t, "ks", shard))) tabletID += 10 _, targetKeyRange, err := topo.ValidateShardName(shard) diff --git a/go/vt/wrangler/workflow_test.go b/go/vt/wrangler/workflow_test.go index 95f2c61fb36..63da455d91f 100644 --- a/go/vt/wrangler/workflow_test.go +++ b/go/vt/wrangler/workflow_test.go @@ -428,7 +428,6 @@ func expectReshardQueries(t *testing.T, tme *testShardMigraterEnv) { dbclient.addInvariant("select * from _vt.vreplication where id = 1", runningResult(1)) dbclient.addInvariant("select * from _vt.vreplication where id = 2", runningResult(2)) dbclient.addInvariant("insert into _vt.resharding_journal", noResult) - } targetQueries := []string{ @@ -455,8 +454,10 @@ func expectReshardQueries(t *testing.T, tme *testShardMigraterEnv) { dbclient.addInvariant("update _vt.vreplication set message = 'FROZEN'", noResult) dbclient.addInvariant("delete from _vt.vreplication where id in (1)", noResult) dbclient.addInvariant("delete from _vt.copy_state where vrepl_id in (1)", noResult) - } + tme.tmeDB.AddQuery("select 1 from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1", noResult) + tme.tmeDB.AddQuery("select 1 from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 2", noResult) + } func expectMoveTablesQueries(t *testing.T, tme *testMigraterEnv) { @@ -487,7 +488,6 @@ func expectMoveTablesQueries(t *testing.T, tme *testMigraterEnv) { "int64|varchar|varchar|varchar|varchar"), ""), ) - //select pos, state, message from _vt.vreplication where id=1 } for _, dbclient := range tme.dbSourceClients { @@ -530,4 +530,7 @@ func expectMoveTablesQueries(t *testing.T, tme *testMigraterEnv) { tme.tmeDB.AddQuery("drop table vt_ks2.t1", noResult) tme.tmeDB.AddQuery("drop table vt_ks2.t2", noResult) tme.tmeDB.AddQuery("update _vt.vreplication set message='Picked source tablet: cell:\"cell1\" uid:10 ' where id=1", noResult) + tme.tmeDB.AddQuery("select 1 from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1", noResult) + tme.tmeDB.AddQuery("select 1 from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 2", noResult) + } From 50ca3885a5459ccf6e057fe75928f1e6e6df13b7 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Wed, 20 Jan 2021 18:07:27 +0100 Subject: [PATCH 7/8] Fix unit test merge issues Signed-off-by: Rohit Nayak --- go/vt/wrangler/workflow_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/go/vt/wrangler/workflow_test.go b/go/vt/wrangler/workflow_test.go index 63da455d91f..849ffffb890 100644 --- a/go/vt/wrangler/workflow_test.go +++ b/go/vt/wrangler/workflow_test.go @@ -281,13 +281,13 @@ func TestMoveTablesV2Partial(t *testing.T) { wf.params.TabletTypes = "rdonly" wf.params.Cells = "cell1" require.NoError(t, testSwitchForward(t, wf)) - require.Equal(t, "Reads partially switched. Replica switched in cells: cell1. Rdonly not switched. Writes Not Switched", wf.CurrentState()) + require.Equal(t, "Reads partially switched. Replica not switched. Rdonly switched in cells: cell1. Writes Not Switched", wf.CurrentState()) tme.expectNoPreviousJournals() wf.params.TabletTypes = "rdonly" wf.params.Cells = "cell2" require.NoError(t, testSwitchForward(t, wf)) - require.Equal(t, "Reads partially switched. All Replica Reads Switched. Rdonly not switched. Writes Not Switched", wf.CurrentState()) + require.Equal(t, "Reads partially switched. Replica not switched. All Rdonly Reads Switched. Writes Not Switched", wf.CurrentState()) tme.expectNoPreviousJournals() wf.params.TabletTypes = "replica" @@ -304,13 +304,13 @@ func TestMoveTablesV2Partial(t *testing.T) { wf.params.TabletTypes = "replica" wf.params.Cells = "cell1" require.NoError(t, testSwitchForward(t, wf)) - require.Equal(t, "Reads partially switched. Replica not switched. Rdonly switched in cells: cell1. Writes Not Switched", wf.CurrentState()) + require.Equal(t, "Reads partially switched. Replica switched in cells: cell1. Rdonly switched in cells: cell1. Writes Not Switched", wf.CurrentState()) tme.expectNoPreviousJournals() wf.params.TabletTypes = "replica" wf.params.Cells = "cell2" require.NoError(t, testSwitchForward(t, wf)) - require.Equal(t, "Reads partially switched. Replica not switched. All Rdonly Reads Switched. Writes Not Switched", wf.CurrentState()) + require.Equal(t, "All Reads Switched. Writes Not Switched", wf.CurrentState()) } func TestMoveTablesV2Cancel(t *testing.T) { From 70a84912e3a2e5287b91338bb4abb08bcfb43195 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Wed, 20 Jan 2021 19:04:05 +0100 Subject: [PATCH 8/8] Fix e2e test merge issue Signed-off-by: Rohit Nayak --- go/test/endtoend/vreplication/vreplication_test_env.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/test/endtoend/vreplication/vreplication_test_env.go b/go/test/endtoend/vreplication/vreplication_test_env.go index cbfd02f42cc..e618432a7ef 100644 --- a/go/test/endtoend/vreplication/vreplication_test_env.go +++ b/go/test/endtoend/vreplication/vreplication_test_env.go @@ -41,7 +41,7 @@ var dryRunResultsSwitchWritesCustomerShard = []string{ var dryRunResultsReadCustomerShard = []string{ "Lock keyspace product", - "Switch reads for tables [customer] to keyspace customer for tablet types [REPLICA]", + "Switch reads for tables [customer] to keyspace customer for tablet types [REPLICA,RDONLY]", "Routing rules for tables [customer] will be updated", "Unlock keyspace product", }