Skip to content

Commit

Permalink
Ensure that we can switch before trying
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Aug 25, 2023
1 parent 4edb612 commit 0e048b2
Showing 1 changed file with 28 additions and 18 deletions.
46 changes: 28 additions & 18 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,8 +781,6 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
}

vdiff1(t, ksWorkflow, "")
catchup(t, customerTab1, workflow, workflowType)
catchup(t, customerTab2, workflow, workflowType)
switchReadsDryRun(t, workflowType, allCellNames, ksWorkflow, dryRunResultsReadCustomerShard)
switchReads(t, workflowType, allCellNames, ksWorkflow, false)
require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", query, query))
Expand All @@ -809,7 +807,6 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
catchup(t, productTab, workflow, "MoveTables")

vdiff1(t, "product.p2c_reverse", "")
catchup(t, productTab, workflow, "MoveTables")
if withOpenTx {
execVtgateQuery(t, vtgateConn, "", deleteOpenTxQuery)
}
Expand Down Expand Up @@ -1032,20 +1029,16 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
t.Fatalf("Reshard Create command failed with %+v\n", err)
}
targetShards = "," + targetShards + ","
waitForCatchup := func() {
for _, tab := range tablets {
if strings.Contains(targetShards, ","+tab.Shard+",") {
log.Infof("Waiting for vrepl to catch up on %s since it IS a target shard", tab.Shard)
catchup(t, tab, workflow, "Reshard")
} else {
log.Infof("Not waiting for vrepl to catch up on %s since it is NOT a target shard", tab.Shard)
continue
}
for _, tab := range tablets {
if strings.Contains(targetShards, ","+tab.Shard+",") {
log.Infof("Waiting for vrepl to catch up on %s since it IS a target shard", tab.Shard)
catchup(t, tab, workflow, "Reshard")
} else {
log.Infof("Not waiting for vrepl to catch up on %s since it is NOT a target shard", tab.Shard)
continue
}
}
waitForCatchup()
vdiff1(t, ksWorkflow, "")
waitForCatchup()
if dryRunResultSwitchReads != nil {
switchReadsDryRun(t, workflowType, allCellNames, ksWorkflow, dryRunResultSwitchReads)
}
Expand Down Expand Up @@ -1085,8 +1078,6 @@ func shardOrders(t *testing.T) {
catchup(t, customerTab1, workflow, workflowType)
catchup(t, customerTab2, workflow, workflowType)
vdiff1(t, ksWorkflow, "")
catchup(t, customerTab1, workflow, workflowType)
catchup(t, customerTab2, workflow, workflowType)
switchReads(t, workflowType, allCellNames, ksWorkflow, false)
switchWrites(t, workflowType, ksWorkflow, false)
moveTablesAction(t, "Complete", cell, workflow, sourceKs, targetKs, tables)
Expand Down Expand Up @@ -1135,8 +1126,6 @@ func shardMerchant(t *testing.T) {
catchup(t, merchantTab2, workflow, workflowType)

vdiff1(t, fmt.Sprintf("%s.%s", merchantKeyspace, workflow), "")
catchup(t, merchantTab1, workflow, workflowType)
catchup(t, merchantTab2, workflow, workflowType)
switchReads(t, workflowType, allCellNames, ksWorkflow, false)
switchWrites(t, workflowType, ksWorkflow, false)
printRoutingRules(t, vc, "After merchant movetables")
Expand Down Expand Up @@ -1412,6 +1401,7 @@ func switchReadsDryRun(t *testing.T, workflowType, cells, ksWorkflow string, dry
require.FailNowf(t, "Invalid workflow type for SwitchTraffic, must be MoveTables or Reshard",
"workflow type specified: %s", workflowType)
}
ensureCanSwitch(t, workflowType, cells, ksWorkflow)
output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=rdonly,replica",
"--dry_run", "SwitchTraffic", ksWorkflow)
require.NoError(t, err, fmt.Sprintf("Switching Reads DryRun Error: %s: %s", err, output))
Expand All @@ -1420,6 +1410,23 @@ func switchReadsDryRun(t *testing.T, workflowType, cells, ksWorkflow string, dry
}
}

func ensureCanSwitch(t *testing.T, workflowType, cells, ksWorkflow string) {
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
for {
_, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--dry_run", "SwitchTraffic", ksWorkflow)
if err == nil {
return
}
select {
case <-timer.C:
t.Fatalf("Did not become ready to switch traffic for %s before the timeout of %s", ksWorkflow, defaultTimeout)
default:
time.Sleep(defaultTick)
}
}
}

func switchReads(t *testing.T, workflowType, cells, ksWorkflow string, reverse bool) {
if workflowType != binlogdatapb.VReplicationWorkflowType_MoveTables.String() &&
workflowType != binlogdatapb.VReplicationWorkflowType_Reshard.String() {
Expand All @@ -1432,6 +1439,7 @@ func switchReads(t *testing.T, workflowType, cells, ksWorkflow string, reverse b
if reverse {
command = "ReverseTraffic"
}
ensureCanSwitch(t, workflowType, cells, ksWorkflow)
output, err = vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, "--tablet_types=rdonly",
command, ksWorkflow)
require.NoError(t, err, fmt.Sprintf("%s Error: %s: %s", command, err, output))
Expand All @@ -1451,6 +1459,7 @@ func switchWrites(t *testing.T, workflowType, ksWorkflow string, reverse bool) {
command = "ReverseTraffic"
}
const SwitchWritesTimeout = "91s" // max: 3 tablet picker 30s waits + 1
ensureCanSwitch(t, workflowType, "", ksWorkflow)
// Use vtctldclient for MoveTables SwitchTraffic ~ 50% of the time.
if workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables.String() && time.Now().Second()%2 == 0 {
parts := strings.Split(ksWorkflow, ".")
Expand Down Expand Up @@ -1546,6 +1555,7 @@ func generateInnoDBRowHistory(t *testing.T, sourceKS string, neededTrxHistory in
// expected length.
func waitForInnoDBHistoryLength(t *testing.T, tablet *cluster.VttabletProcess, expectedLength int64) {
timer := time.NewTimer(defaultTimeout)
defer timer.Stop()
historyLen := int64(0)
for {
res, err := tablet.QueryTablet(historyLenQuery, tablet.Keyspace, false)
Expand Down

0 comments on commit 0e048b2

Please sign in to comment.