Skip to content

Commit

Permalink
Allow users to control VReplication DDL handling (#11532)
Browse files Browse the repository at this point in the history
* Allow users to control VReplication DDL handling

Signed-off-by: Matt Lord <[email protected]>

* Update vtadmin web protos

Signed-off-by: Matt Lord <[email protected]>

* Get Reshard working as well

Signed-off-by: Matt Lord <[email protected]>

* Make proto with go 1.18.7

Signed-off-by: Matt Lord <[email protected]>

* Elide bls.on_ddl in marshalled JSON output

Signed-off-by: Matt Lord <[email protected]>

* Add TestVreplicationDDLHandling e2e test

Signed-off-by: Matt Lord <[email protected]>

* Cleanup binlogdata[pb] import declarations

Signed-off-by: Matt Lord <[email protected]>

* Fix argument handling for moveTables()

Signed-off-by: Matt Lord <[email protected]>

* Minor changes after self review

Signed-off-by: Matt Lord <[email protected]>

* Address review comments

Signed-off-by: Matt Lord <[email protected]>

* Minor improvement to usage output

Signed-off-by: Matt Lord <[email protected]>

* Test EXEC_IGNORE too

Signed-off-by: Matt Lord <[email protected]>

* Update unit test

Signed-off-by: Matt Lord <[email protected]>

* Address review comments

Signed-off-by: Matt Lord <[email protected]>

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Nov 10, 2022
1 parent 161a05a commit 1ace3b4
Show file tree
Hide file tree
Showing 16 changed files with 1,566 additions and 1,250 deletions.
32 changes: 27 additions & 5 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,12 @@ func validateThatQueryExecutesOnTablet(t *testing.T, conn *mysql.Conn, tablet *c
return newCount == count+1
}

func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wantState string) {
// waitForWorkflowState waits for all of the given workflow's
// streams to reach the provided state. You can pass optional
// key value pairs of the form "key==value" to also wait for
// additional stream sub-state such as "Message==for vdiff".
// Invalid checks are ignored.
func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wantState string, fieldEqualityChecks ...string) {
done := false
timer := time.NewTimer(workflowStateTimeout)
log.Infof("Waiting for workflow %q to fully reach %q state", ksWorkflow, wantState)
Expand All @@ -253,9 +258,21 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa
tabletStreams.ForEach(func(streamId, streamInfos gjson.Result) bool { // for each stream
if streamId.String() == "PrimaryReplicationStatuses" {
streamInfos.ForEach(func(attributeKey, attributeValue gjson.Result) bool { // for each attribute in the stream
// we need to wait for all streams to have the desired state
state = attributeValue.Get("State").String()
if state != wantState {
done = false // we need to wait for all streams to have the desired state
if state == wantState {
for i := 0; i < len(fieldEqualityChecks); i++ {
if kvparts := strings.Split(fieldEqualityChecks[i], "=="); len(kvparts) == 2 {
key := kvparts[0]
val := kvparts[1]
res := attributeValue.Get(key).String()
if !strings.EqualFold(res, val) {
done = false
}
}
}
} else {
done = false
}
return true
})
Expand All @@ -270,8 +287,13 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa
}
select {
case <-timer.C:
require.FailNowf(t, "workflow %q did not fully reach the expected state of %q before the timeout of %s; last seen output: %s",
ksWorkflow, wantState, workflowStateTimeout, output)
var extraRequirements string
if len(fieldEqualityChecks) > 0 {
extraRequirements = fmt.Sprintf(" with the additional requirements of \"%v\"", fieldEqualityChecks)
}
require.FailNowf(t, "workflow state not reached",
"Workflow %q did not fully reach the expected state of %q%s before the timeout of %s; last seen output: %s",
ksWorkflow, wantState, extraRequirements, workflowStateTimeout, output)
default:
time.Sleep(defaultTick)
}
Expand Down
110 changes: 107 additions & 3 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,97 @@ func throttlerCheckSelf(tablet *cluster.VttabletProcess, app string) (resp *http
return resp, respBody, err
}

// TestVReplicationDDLHandling tests the DDL handling in
// VReplication for the values of IGNORE, STOP, and EXEC.
// NOTE: this is a manual test. It is not executed in the
// CI.
func TestVReplicationDDLHandling(t *testing.T) {
workflow := "onddl_test"
ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow)
table := "orders"
newColumn := "ddltest"
cell := "zone1"
shard := "0"
vc = NewVitessCluster(t, t.Name(), []string{cell}, mainClusterConfig)
defer vc.TearDown(t)
defaultCell = vc.Cells[cell]

if _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, sourceKs, shard, initialProductVSchema, initialProductSchema, 0, 0, 100, nil); err != nil {
t.Fatal(err)
}
if _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, targetKs, shard, "", "", 0, 0, 200, nil); err != nil {
t.Fatal(err)
}
vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", sourceKs, shard), 1)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", targetKs, shard), 1)
verifyClusterHealth(t, vc)

vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
sourceTab = vc.getPrimaryTablet(t, sourceKs, shard)
targetTab := vc.getPrimaryTablet(t, targetKs, shard)

insertInitialData(t)

_, err := vtgateConn.ExecuteFetch(fmt.Sprintf("use %s", sourceKs), 1, false)
require.NoError(t, err)

addColDDL := fmt.Sprintf("alter table %s add column %s varchar(64)", table, newColumn)
dropColDDL := fmt.Sprintf("alter table %s drop column %s", table, newColumn)
checkColQuerySource := fmt.Sprintf("select count(column_name) from information_schema.columns where table_schema='vt_%s' and table_name='%s' and column_name='%s'",
sourceKs, table, newColumn)
checkColQueryTarget := fmt.Sprintf("select count(column_name) from information_schema.columns where table_schema='vt_%s' and table_name='%s' and column_name='%s'",
targetKs, table, newColumn)

// Test IGNORE behavior
moveTables(t, defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl=IGNORE")
// Wait until we get through the copy phase...
catchup(t, targetTab, workflow, "MoveTables")
// Add new col on source
_, err = vtgateConn.ExecuteFetch(addColDDL, 1, false)
require.NoError(t, err, "error executing %q: %v", addColDDL, err)
// Confirm workflow is still running fine
waitForWorkflowState(t, vc, ksWorkflow, "Running")
// Confirm new col does not exist on target
waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(0)]]")
// Confirm new col does exist on source
waitForQueryResult(t, vtgateConn, sourceKs, checkColQuerySource, "[[INT64(1)]]")
cancelMoveTables(t, defaultCellName, workflow, sourceKs, targetKs, table)
// Drop the column on soruce to start fresh again
_, err = vtgateConn.ExecuteFetch(dropColDDL, 1, false)
require.NoError(t, err, "error executing %q: %v", dropColDDL, err)

// Test STOP behavior (new col now exists nowhere)
moveTables(t, defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl=STOP")
// Wait until we get through the copy phase...
catchup(t, targetTab, workflow, "MoveTables")
// Add new col on the source
_, err = vtgateConn.ExecuteFetch(addColDDL, 1, false)
require.NoError(t, err, "error executing %q: %v", addColDDL, err)
// Confirm that the worfklow stopped because of the DDL
waitForWorkflowState(t, vc, ksWorkflow, "Stopped", fmt.Sprintf("Message==Stopped at DDL %s", addColDDL))
// Confirm that the target does not have new col
waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(0)]]")
cancelMoveTables(t, defaultCellName, workflow, sourceKs, targetKs, table)

// Test EXEC behavior (new col now exists on source)
moveTables(t, defaultCellName, workflow, sourceKs, targetKs, table, "--on-ddl=EXEC")
// Wait until we get through the copy phase...
catchup(t, targetTab, workflow, "MoveTables")
// Confirm target has new col from copy phase
waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(1)]]")
// Drop col on source
_, err = vtgateConn.ExecuteFetch(dropColDDL, 1, false)
require.NoError(t, err, "error executing %q: %v", dropColDDL, err)
// Confirm workflow is still running fine
waitForWorkflowState(t, vc, ksWorkflow, "Running")
// Confirm new col was dropped on target
waitForQueryResult(t, vtgateConn, targetKs, checkColQueryTarget, "[[INT64(0)]]")
cancelMoveTables(t, defaultCellName, workflow, sourceKs, targetKs, table)
}

func TestVreplicationCopyThrottling(t *testing.T) {
workflow := "copy-throttling"
cell := "zone1"
Expand Down Expand Up @@ -1193,9 +1284,16 @@ func catchup(t *testing.T, vttablet *cluster.VttabletProcess, workflow, info str
vttablet.WaitForVReplicationToCatchup(t, workflow, fmt.Sprintf("vt_%s", vttablet.Keyspace), maxWait)
}

func moveTables(t *testing.T, cell, workflow, sourceKs, targetKs, tables string) {
if err := vc.VtctlClient.ExecuteCommand("MoveTables", "--", "--v1", "--cells="+cell, "--workflow="+workflow,
"--tablet_types="+"primary,replica,rdonly", sourceKs, targetKs, tables); err != nil {
func moveTables(t *testing.T, cell, workflow, sourceKs, targetKs, tables string, extraFlags ...string) {
var err error
if len(extraFlags) > 0 {
err = vc.VtctlClient.ExecuteCommand("MoveTables", "--", "--v1", "--cells="+cell, "--workflow="+workflow,
"--tablet_types="+"primary,replica,rdonly", strings.Join(extraFlags, " "), sourceKs, targetKs, tables)
} else {
err = vc.VtctlClient.ExecuteCommand("MoveTables", "--", "--v1", "--cells="+cell, "--workflow="+workflow,
"--tablet_types="+"primary,replica,rdonly", sourceKs, targetKs, tables)
}
if err != nil {
t.Fatalf("MoveTables command failed with %+v\n", err)
}
}
Expand All @@ -1205,6 +1303,12 @@ func moveTablesWithTabletTypes(t *testing.T, cell, workflow, sourceKs, targetKs,
t.Fatalf("MoveTables command failed with %+v\n", err)
}
}
func cancelMoveTables(t *testing.T, cell, workflow, sourceKs, targetKs, tables string) {
if err := vc.VtctlClient.ExecuteCommand("MoveTables", "--", "--source="+sourceKs, "--cells="+cell, "--tables="+tables,
"--tablet_types="+"primary,replica,rdonly", "Cancel", fmt.Sprintf("%s.%s", targetKs, workflow)); err != nil {
t.Fatalf("MoveTables Cancel command failed with %+v\n", err)
}
}
func applyVSchema(t *testing.T, vschema, keyspace string) {
err := vc.VtctlClient.ExecuteCommand("ApplyVSchema", "--", "--vschema", vschema, keyspace)
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 1ace3b4

Please sign in to comment.