Skip to content

Commit

Permalink
Add tests for reverse and abort
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Dec 26, 2020
1 parent 3fce881 commit 9dcd560
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 14 deletions.
2 changes: 1 addition & 1 deletion go/vt/wrangler/fake_dbclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (dc *fakeDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Re
}
}

log.Infof("Missing query: >%s<" + query)
log.Infof("Missing query: >>>>>>>>>>>>>>>>>>%s<<<<<<<<<<<<<<<", query)
return nil, fmt.Errorf("unexpected query: %s", query)
}

Expand Down
8 changes: 3 additions & 5 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,11 +423,9 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflow s
return 0, nil, err
}
if ts == nil {
if ts == nil {
errorMsg := fmt.Sprintf("workflow %s not found in keyspace %s", workflow, targetKeyspace)
wr.Logger().Errorf(errorMsg)
return 0, nil, fmt.Errorf(errorMsg)
}
errorMsg := fmt.Sprintf("workflow %s not found in keyspace %s", workflow, targetKeyspace)
wr.Logger().Errorf(errorMsg)
return 0, nil, fmt.Errorf(errorMsg)
}

var sw iswitcher
Expand Down
33 changes: 33 additions & 0 deletions go/vt/wrangler/traffic_switcher_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (

const vreplQueryks = "select id, source, message, cell, tablet_types from _vt.vreplication where workflow='test' and db_name='vt_ks'"
const vreplQueryks2 = "select id, source, message, cell, tablet_types from _vt.vreplication where workflow='test' and db_name='vt_ks2'"
const vreplQueryks1 = "select id, source, message, cell, tablet_types from _vt.vreplication where workflow='test_reverse' and db_name='vt_ks1'"

type testMigraterEnv struct {
ts *topo.Server
Expand Down Expand Up @@ -193,6 +194,31 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards,
)
}

for i, sourceShard := range sourceShards {
var rows []string
for j, targetShard := range targetShards {
bls := &binlogdatapb.BinlogSource{
Keyspace: "ks2",
Shard: targetShard,
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t1 where in_keyrange('%s')", sourceShard)),
}, {
Match: "t2",
Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t2 where in_keyrange('%s')", sourceShard)),
}},
},
}
rows = append(rows, fmt.Sprintf("%d|%v|||", j+1, bls))
}
tme.dbSourceClients[i].addInvariant(vreplQueryks1, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
rows...),
)
}

if err := tme.wr.saveRoutingRules(ctx, map[string][]string{
"t1": {"ks1.t1"},
"ks2.t1": {"ks1.t1"},
Expand Down Expand Up @@ -406,6 +432,13 @@ func (tme *testMigraterEnv) expectNoPreviousJournals() {
}
}

func (tme *testMigraterEnv) expectNoPreviousReverseJournals() {
// validate that no previous journals exist
for _, dbclient := range tme.dbTargetClients {
dbclient.addQueryRE(tsCheckJournals, &sqltypes.Result{}, nil)
}
}

func (tme *testShardMigraterEnv) forAllStreams(f func(i, j int)) {
for i := range tme.targetShards {
for j := range tme.sourceShards {
Expand Down
95 changes: 87 additions & 8 deletions go/vt/wrangler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func expectCopyProgressQueries(t *testing.T, tme *testMigraterEnv) {
db.AddQuery(query, result)

}

func TestMoveTablesV2(t *testing.T) {
ctx := context.Background()
p := &VReplicationWorkflowParams{
Expand All @@ -162,6 +163,57 @@ func TestMoveTablesV2(t *testing.T) {
tme.expectNoPreviousJournals()
require.NoError(t, wf.SwitchTraffic(DirectionForward))
require.Equal(t, WorkflowStateAllSwitched, wf.CurrentState())

tme.expectNoPreviousJournals()
tme.expectNoPreviousReverseJournals()
require.NoError(t, wf.ReverseTraffic())
require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState())
}

func TestAbortMoveTablesV2(t *testing.T) {
ctx := context.Background()
p := &VReplicationWorkflowParams{
Workflow: "test",
SourceKeyspace: "ks1",
TargetKeyspace: "ks2",
Tables: "t1,t2",
Cells: "cell1,cell2",
TabletTypes: "replica,rdonly,master",
Timeout: DefaultActionTimeout,
}
tme := newTestTableMigrater(ctx, t)
defer tme.stopTablets(t)
wf, err := tme.wr.NewVReplicationWorkflow(ctx, MoveTablesWorkflow, p)
require.NoError(t, err)
require.NotNil(t, wf)
require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState())
expectMoveTablesQueries(t, tme)
require.NoError(t, wf.Abort())
}

func TestReshardV2(t *testing.T) {
ctx := context.Background()
p := &VReplicationWorkflowParams{
Workflow: "test",
SourceKeyspace: "ks1",
TargetKeyspace: "ks2",
SourceShards: []string{"-40", "40-"},
TargetShards: []string{"-80", "80-"},
Cells: "cell1,cell2",
TabletTypes: "replica,rdonly,master",
Timeout: DefaultActionTimeout,
}
tme := newTestTableMigrater(ctx, t)
defer tme.stopTablets(t)
wf, err := tme.wr.NewVReplicationWorkflow(ctx, ReshardWorkflow, p)
require.NoError(t, err)
require.NotNil(t, wf)
require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState())
tme.expectNoPreviousJournals()
expectMoveTablesQueries(t, tme)
tme.expectNoPreviousJournals()
require.NoError(t, wf.SwitchTraffic(DirectionForward))
require.Equal(t, WorkflowStateAllSwitched, wf.CurrentState())
require.NoError(t, wf.Complete())
}

Expand All @@ -186,11 +238,25 @@ func expectMoveTablesQueries(t *testing.T, tme *testMigraterEnv) {
dbclient.addInvariant("select 1 from _vt.vreplication where db_name='vt_ks2' and workflow='test' and message!='FROZEN'", noResult)
dbclient.addInvariant("delete from _vt.vreplication where id in (1)", noResult)
dbclient.addInvariant("delete from _vt.copy_state where vrepl_id in (1)", noResult)

//
dbclient.addInvariant("insert into _vt.resharding_journal", noResult)
dbclient.addInvariant("select val from _vt.resharding_journal", noResult)
dbclient.addInvariant("select id, source, message, cell, tablet_types from _vt.vreplication where workflow='test_reverse' and db_name='vt_ks1'",
sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
""),
)
//select pos, state, message from _vt.vreplication where id=1
}

for _, dbclient := range tme.dbSourceClients {
dbclient.addInvariant("select val from _vt.resharding_journal", noResult)
dbclient.addInvariant("update _vt.vreplication set message = 'FROZEN'", noResult)
dbclient.addInvariant("insert into _vt.vreplication (workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name)", &sqltypes.Result{InsertID: uint64(1)})
dbclient.addInvariant("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (1)", noResult)
dbclient.addInvariant("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (2)", noResult)
dbclient.addInvariant("select id from _vt.vreplication where id = 1", resultid1)
dbclient.addInvariant("select id from _vt.vreplication where id = 2", resultid2)
dbclient.addInvariant("select id from _vt.vreplication where db_name = 'vt_ks1' and workflow = 'test_reverse'", resultid1)
dbclient.addInvariant("delete from _vt.vreplication where id in (1)", noResult)
dbclient.addInvariant("delete from _vt.copy_state where vrepl_id in (1)", noResult)
Expand All @@ -204,10 +270,23 @@ func expectMoveTablesQueries(t *testing.T, tme *testMigraterEnv) {
"varchar|varchar|varchar"),
"MariaDB/5-456-892|Running",
)
tme.dbTargetClients[0].addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil)
tme.dbTargetClients[0].addQuery("select pos, state, message from _vt.vreplication where id=2", state, nil)
tme.dbTargetClients[1].addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil)
tme.dbTargetClients[1].addQuery("select pos, state, message from _vt.vreplication where id=2", state, nil)
tme.tmeDB.AddQueryPattern("drop table vt_ks1.t1", &sqltypes.Result{})
tme.tmeDB.AddQueryPattern("drop table vt_ks1.t2", &sqltypes.Result{})
tme.dbTargetClients[0].addInvariant("select pos, state, message from _vt.vreplication where id=1", state)
tme.dbTargetClients[0].addInvariant("select pos, state, message from _vt.vreplication where id=2", state)
tme.dbTargetClients[1].addInvariant("select pos, state, message from _vt.vreplication where id=1", state)
tme.dbTargetClients[1].addInvariant("select pos, state, message from _vt.vreplication where id=2", state)

state = sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"pos|state|message",
"varchar|varchar|varchar"),
"MariaDB/5-456-893|Running",
)
tme.dbSourceClients[0].addInvariant("select pos, state, message from _vt.vreplication where id=1", state)
tme.dbSourceClients[0].addInvariant("select pos, state, message from _vt.vreplication where id=2", state)
tme.dbSourceClients[1].addInvariant("select pos, state, message from _vt.vreplication where id=1", state)
tme.dbSourceClients[1].addInvariant("select pos, state, message from _vt.vreplication where id=2", state)
tme.tmeDB.AddQuery("drop table vt_ks1.t1", noResult)
tme.tmeDB.AddQuery("drop table vt_ks1.t2", noResult)
tme.tmeDB.AddQuery("drop table vt_ks2.t1", noResult)
tme.tmeDB.AddQuery("drop table vt_ks2.t2", noResult)
tme.tmeDB.AddQuery("update _vt.vreplication set message='Picked source tablet: cell:\"cell1\" uid:10 ' where id=1", noResult)
}

0 comments on commit 9dcd560

Please sign in to comment.