Skip to content

Commit

Permalink
Merge pull request #7275 from planetscale/rn-vrepl-misc
Browse files Browse the repository at this point in the history
VReplication: Miscellaneous improvements
  • Loading branch information
rohit-nayak-ps authored Jan 20, 2021
2 parents d54b87c + 70a8491 commit 0945234
Show file tree
Hide file tree
Showing 13 changed files with 177 additions and 66 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ require (
github.com/martini-contrib/render v0.0.0-20150707142108-ec18f8345a11
github.com/mattn/go-sqlite3 v1.14.0
github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1
github.com/mitchellh/go-ps v1.0.0 // indirect
github.com/mitchellh/go-testing-interface v1.14.0 // indirect
github.com/mitchellh/mapstructure v1.2.3 // indirect
github.com/montanaflynn/stats v0.6.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,8 @@ github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXx
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-ps v1.0.0 h1:i6ampVEEF4wQFF+bkYfwYgY+F/uYJDktmvLPf7qIgjc=
github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/go-testing-interface v1.14.0 h1:/x0XQ6h+3U3nAyk1yx+bHPURrKa9sVVvYbuqZ7pIAtI=
github.com/mitchellh/go-testing-interface v1.14.0/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8=
Expand Down
20 changes: 10 additions & 10 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
)

const (
workflowActionStart = "Start"
workflowActionCreate = "Create"
workflowActionSwitchTraffic = "SwitchTraffic"
workflowActionReverseTraffic = "ReverseTraffic"
workflowActionComplete = "Complete"
Expand All @@ -58,9 +58,9 @@ var (
currentWorkflowType wrangler.VReplicationWorkflowType
)

func reshard2Start(t *testing.T, sourceShards, targetShards string) error {
func createReshardWorkflow(t *testing.T, sourceShards, targetShards string) error {
err := tstWorkflowExec(t, defaultCellName, workflowName, targetKs, targetKs,
"", workflowActionStart, "", sourceShards, targetShards)
"", workflowActionCreate, "", sourceShards, targetShards)
require.NoError(t, err)
time.Sleep(1 * time.Second)
catchup(t, targetTab1, workflowName, "Reshard")
Expand All @@ -69,12 +69,12 @@ func reshard2Start(t *testing.T, sourceShards, targetShards string) error {
return nil
}

func moveTables2Start(t *testing.T, tables string) error {
func createMoveTablesWorkflow(t *testing.T, tables string) error {
if tables == "" {
tables = tablesToMove
}
err := tstWorkflowExec(t, defaultCellName, workflowName, sourceKs, targetKs,
tables, workflowActionStart, "", "", "")
tables, workflowActionCreate, "", "", "")
require.NoError(t, err)
catchup(t, targetTab1, workflowName, "MoveTables")
catchup(t, targetTab2, workflowName, "MoveTables")
Expand All @@ -96,7 +96,7 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables,
}
args = append(args, "-v2")
switch action {
case workflowActionStart:
case workflowActionCreate:
if currentWorkflowType == wrangler.MoveTablesWorkflow {
args = append(args, "-source", sourceKs, "-tables", tables)
} else {
Expand Down Expand Up @@ -244,7 +244,7 @@ func testReshardV2Workflow(t *testing.T) {
currentWorkflowType = wrangler.ReshardWorkflow

createAdditionalCustomerShards(t, "-40,40-80,80-c0,c0-")
reshard2Start(t, "-80,80-", "-40,40-80,80-c0,c0-")
createReshardWorkflow(t, "-80,80-", "-40,40-80,80-c0,c0-")
if !strings.Contains(lastOutput, "Workflow started successfully") {
t.Fail()
}
Expand All @@ -259,7 +259,7 @@ func testMoveTablesV2Workflow(t *testing.T) {

// test basic forward and reverse flows
setupCustomerKeyspace(t)
moveTables2Start(t, "customer")
createMoveTablesWorkflow(t, "customer")
if !strings.Contains(lastOutput, "Workflow started successfully") {
t.Fail()
}
Expand All @@ -272,7 +272,7 @@ func testMoveTablesV2Workflow(t *testing.T) {
output, _ := vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...)
require.Contains(t, output, "No workflows found in keyspace customer")

moveTables2Start(t, "customer2")
createMoveTablesWorkflow(t, "customer2")
output, _ = vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...)
require.Contains(t, output, "Following workflow(s) found in keyspace customer: wf1")

Expand Down Expand Up @@ -523,7 +523,7 @@ func moveCustomerTableSwitchFlows(t *testing.T, cells []*Cell, sourceCellOrAlias
switchWrites(t, ksWorkflow, false)
validateWritesRouteToTarget(t)

switchWrites(t, ksWorkflow, true)
switchWrites(t, reverseKsWorkflow, true)
validateWritesRouteToSource(t)

validateReadsRouteToSource(t, "replica")
Expand Down
4 changes: 3 additions & 1 deletion go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,9 @@ func switchReadsDryRun(t *testing.T, cells, ksWorkflow string, dryRunResults []s
}

func switchReads(t *testing.T, cells, ksWorkflow string) {
output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cells, "-tablet_type=rdonly", ksWorkflow)
var output string
var err error
output, err = vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cells, "-tablet_type=rdonly", ksWorkflow)
require.NoError(t, err, fmt.Sprintf("SwitchReads Error: %s: %s", err, output))
output, err = vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+cells, "-tablet_type=replica", ksWorkflow)
require.NoError(t, err, fmt.Sprintf("SwitchReads Error: %s: %s", err, output))
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/vreplication_test_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var dryRunResultsSwitchWritesCustomerShard = []string{

var dryRunResultsReadCustomerShard = []string{
"Lock keyspace product",
"Switch reads for tables [customer] to keyspace customer for tablet types [REPLICA]",
"Switch reads for tables [customer] to keyspace customer for tablet types [REPLICA,RDONLY]",
"Routing rules for tables [customer] will be updated",
"Unlock keyspace product",
}
Expand Down
16 changes: 10 additions & 6 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1954,7 +1954,7 @@ func commandMoveTables(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
type VReplicationWorkflowAction string

const (
vReplicationWorkflowActionStart = "start"
vReplicationWorkflowActionCreate = "create"
vReplicationWorkflowActionSwitchTraffic = "switchtraffic"
vReplicationWorkflowActionReverseTraffic = "reversetraffic"
vReplicationWorkflowActionComplete = "complete"
Expand Down Expand Up @@ -2052,7 +2052,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
originalAction := action
action = strings.ToLower(action) // allow users to input action in a case-insensitive manner
switch action {
case vReplicationWorkflowActionStart:
case vReplicationWorkflowActionCreate:
switch workflowType {
case wrangler.MoveTablesWorkflow:
if *sourceKeyspace == "" {
Expand Down Expand Up @@ -2085,6 +2085,9 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
case vReplicationWorkflowActionSwitchTraffic, vReplicationWorkflowActionReverseTraffic:
vrwp.Cells = *cells
vrwp.TabletTypes = *tabletTypes
if vrwp.TabletTypes == "" {
vrwp.TabletTypes = "master,replica,rdonly"
}
vrwp.Timeout = *timeout
vrwp.EnableReverseReplication = *reverseReplication
case vReplicationWorkflowActionCancel:
Expand All @@ -2105,7 +2108,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
log.Warningf("NewVReplicationWorkflow returned error %+v", wf)
return err
}
if !wf.Exists() && action != vReplicationWorkflowActionStart {
if !wf.Exists() && action != vReplicationWorkflowActionCreate {
return fmt.Errorf("workflow %s does not exist", ksWorkflow)
}

Expand All @@ -2117,7 +2120,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
if copyProgress == nil {
wr.Logger().Printf("\nCopy Completed.\n")
} else {
wr.Logger().Printf("\nCopy Progress (approx.):\n")
wr.Logger().Printf("\nCopy Progress (approx):\n")
var tables []string
for table := range *copyProgress {
tables = append(tables, table)
Expand Down Expand Up @@ -2154,8 +2157,8 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
return printDetails()
case vReplicationWorkflowActionProgress:
return printCopyProgress()
case vReplicationWorkflowActionStart:
err = wf.Start()
case vReplicationWorkflowActionCreate:
err = wf.Create()
if err != nil {
return err
}
Expand Down Expand Up @@ -2200,6 +2203,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
case progress := <-progressCh:
if progress.running == progress.total {
wr.Logger().Printf("\nWorkflow started successfully with %d stream(s)\n", progress.total)
printDetails()
return nil
}
wr.Logger().Printf("%d%% ... ", 100*progress.running/progress.total)
Expand Down
1 change: 0 additions & 1 deletion go/vt/vtgate/discoverygateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ func TestDiscoveryGatewayWaitForTablets(t *testing.T) {
},
},
}

dg := NewDiscoveryGateway(context.Background(), hc, srvTopo, "local", 2)

// replica should only use local ones
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ func init() {
withDDL = withddl.New(allddls)
}

var tabletTypesStr = flag.String("vreplication_tablet_type", "REPLICA", "comma separated list of tablet types used as a source")
// this are the default tablet_types that will be used by the tablet picker to find sources for a vreplication stream
// it can be overridden by passing a different list to the MoveTables or Reshard commands
var tabletTypesStr = flag.String("vreplication_tablet_type", "MASTER,REPLICA", "comma separated list of tablet types used as a source")

// waitRetryTime can be changed to a smaller value for tests.
// A VReplication stream can be created by sending an insert statement
Expand Down
60 changes: 58 additions & 2 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"sync"
"time"

"vitess.io/vitess/go/vt/topotools"

"vitess.io/vitess/go/vt/vtgate/evalengine"

"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -346,6 +348,39 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl
return ts, ws, nil
}

func (wr *Wrangler) doCellsHaveRdonlyTablets(ctx context.Context, cells []string) (bool, error) {
areAnyRdonly := func(tablets []*topo.TabletInfo) bool {
for _, tablet := range tablets {
if tablet.Type == topodatapb.TabletType_RDONLY {
return true
}
}
return false
}

if len(cells) == 0 {
tablets, err := topotools.GetAllTabletsAcrossCells(ctx, wr.ts)
if err != nil {
return false, err
}
if areAnyRdonly(tablets) {
return true, nil
}

} else {
for _, cell := range cells {
tablets, err := topotools.GetAllTablets(ctx, wr.ts, cell)
if err != nil {
return false, err
}
if areAnyRdonly(tablets) {
return true, nil
}
}
}
return false, nil
}

// SwitchReads is a generic way of switching read traffic for a resharding workflow.
func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflow string, servedTypes []topodatapb.TabletType,
cells []string, direction TrafficSwitchDirection, dryRun bool) (*[]string, error) {
Expand All @@ -360,7 +395,8 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflow st
wr.Logger().Errorf(errorMsg)
return nil, fmt.Errorf(errorMsg)
}
wr.Logger().Infof("SwitchReads: %s.%s tt %+v, cells %+v, workflow state: %+v", targetKeyspace, workflow, servedTypes, cells, ws)
log.Infof("SwitchReads: %s.%s tt %+v, cells %+v, workflow state: %+v", targetKeyspace, workflow, servedTypes, cells, ws)
var switchReplicas, switchRdonly bool
for _, servedType := range servedTypes {
if servedType != topodatapb.TabletType_REPLICA && servedType != topodatapb.TabletType_RDONLY {
return nil, fmt.Errorf("tablet type must be REPLICA or RDONLY: %v", servedType)
Expand All @@ -371,6 +407,26 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflow st
if direction == DirectionBackward && servedType == topodatapb.TabletType_RDONLY && len(ws.RdonlyCellsSwitched) == 0 {
return nil, fmt.Errorf("requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched")
}
switch servedType {
case topodatapb.TabletType_REPLICA:
switchReplicas = true
case topodatapb.TabletType_RDONLY:
switchRdonly = true
}
}

// if there are no rdonly tablets in the cells ask to switch rdonly tablets as well so that routing rules
// are updated for rdonly as well. Otherwise vitess will not know that the workflow has completed and will
// incorrectly report that not all reads have been switched. User currently is forced to switch non-existent rdonly tablets
if switchReplicas && !switchRdonly {
var err error
rdonlyTabletsExist, err := wr.doCellsHaveRdonlyTablets(ctx, cells)
if err != nil {
return nil, err
}
if !rdonlyTabletsExist {
servedTypes = append(servedTypes, topodatapb.TabletType_RDONLY)
}
}

// If journals exist notify user and fail
Expand All @@ -380,7 +436,7 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflow st
return nil, err
}
if journalsExist {
wr.Logger().Errorf("Found a previous journal entry for %d", ts.id)
log.Infof("Found a previous journal entry for %d", ts.id)
}
var sw iswitcher
if dryRun {
Expand Down
5 changes: 3 additions & 2 deletions go/vt/wrangler/traffic_switcher_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,11 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe
tme.wr = New(logutil.NewConsoleLogger(), tme.ts, tmclient.NewTabletManagerClient())
tme.sourceShards = sourceShards
tme.targetShards = targetShards
tme.tmeDB = fakesqldb.New(t)

tabletID := 10
for _, shard := range sourceShards {
tme.sourceMasters = append(tme.sourceMasters, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_MASTER, nil, TabletKeyspaceShard(t, "ks", shard)))
tme.sourceMasters = append(tme.sourceMasters, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_MASTER, tme.tmeDB, TabletKeyspaceShard(t, "ks", shard)))
tabletID += 10

_, sourceKeyRange, err := topo.ValidateShardName(shard)
Expand All @@ -261,7 +262,7 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe
}

for _, shard := range targetShards {
tme.targetMasters = append(tme.targetMasters, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_MASTER, nil, TabletKeyspaceShard(t, "ks", shard)))
tme.targetMasters = append(tme.targetMasters, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_MASTER, tme.tmeDB, TabletKeyspaceShard(t, "ks", shard)))
tabletID += 10

_, targetKeyRange, err := topo.ValidateShardName(shard)
Expand Down
33 changes: 15 additions & 18 deletions go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,12 @@ func TestTableMigrateMainflow(t *testing.T) {
"ks2.t1": {"ks1.t1"},
"t2": {"ks1.t2"},
"ks2.t2": {"ks1.t2"},
"t1@rdonly": {"ks2.t1"},
"ks2.t1@rdonly": {"ks2.t1"},
"ks1.t1@rdonly": {"ks2.t1"},
"t2@rdonly": {"ks2.t2"},
"ks2.t2@rdonly": {"ks2.t2"},
"ks1.t2@rdonly": {"ks2.t2"},
"t1@rdonly": {"ks1.t1"},
"ks2.t1@rdonly": {"ks1.t1"},
"ks1.t1@rdonly": {"ks1.t1"},
"t2@rdonly": {"ks1.t2"},
"ks2.t2@rdonly": {"ks1.t2"},
"ks1.t2@rdonly": {"ks1.t2"},
"t1@replica": {"ks1.t1"},
"ks2.t1@replica": {"ks1.t1"},
"ks1.t1@replica": {"ks1.t1"},
Expand Down Expand Up @@ -526,10 +526,10 @@ func TestShardMigrateMainflow(t *testing.T) {
checkCellServedTypes(t, tme.ts, "ks:40-", "cell1", 2)
checkCellServedTypes(t, tme.ts, "ks:-80", "cell1", 1)
checkCellServedTypes(t, tme.ts, "ks:80-", "cell1", 1)
checkCellServedTypes(t, tme.ts, "ks:-40", "cell2", 2)
checkCellServedTypes(t, tme.ts, "ks:40-", "cell2", 2)
checkCellServedTypes(t, tme.ts, "ks:-80", "cell2", 1)
checkCellServedTypes(t, tme.ts, "ks:80-", "cell2", 1)
checkCellServedTypes(t, tme.ts, "ks:-40", "cell2", 1)
checkCellServedTypes(t, tme.ts, "ks:40-", "cell2", 1)
checkCellServedTypes(t, tme.ts, "ks:-80", "cell2", 2)
checkCellServedTypes(t, tme.ts, "ks:80-", "cell2", 2)
verifyQueries(t, tme.allDBClients)

tme.expectNoPreviousJournals()
Expand Down Expand Up @@ -1764,7 +1764,7 @@ func checkCellRouting(t *testing.T, wr *Wrangler, cell string, want map[string][
got[rr.FromTable] = append(got[rr.FromTable], rr.ToTables...)
}
if !reflect.DeepEqual(got, want) {
t.Errorf("srv rules for cell %s:\n%v, want\n%v", cell, got, want)
t.Fatalf("ERROR: routing rules don't match for cell %s:got\n%v, want\n%v", cell, got, want)
}
}

Expand Down Expand Up @@ -1799,10 +1799,8 @@ func checkServedTypes(t *testing.T, ts *topo.Server, keyspaceShard string, want
if err != nil {
t.Fatal(err)
}

if len(servedTypes) != want {
t.Errorf("shard %v has wrong served types: got: %v, want: %v", keyspaceShard, len(servedTypes), want)
}
require.Equal(t, want, len(servedTypes), fmt.Sprintf("shard %v has wrong served types: got: %v, want: %v",
keyspaceShard, len(servedTypes), want))
}

func checkCellServedTypes(t *testing.T, ts *topo.Server, keyspaceShard, cell string, want int) {
Expand All @@ -1823,9 +1821,8 @@ outer:
}
}
}
if count != want {
t.Errorf("serving types for keyspaceShard %s, cell %s: %d, want %d", keyspaceShard, cell, count, want)
}
require.Equal(t, want, count, fmt.Sprintf("serving types for keyspaceShard %s, cell %s: %d, want %d",
keyspaceShard, cell, count, want))
}

func checkIsMasterServing(t *testing.T, ts *topo.Server, keyspaceShard string, want bool) {
Expand Down
Loading

0 comments on commit 0945234

Please sign in to comment.